diff --git a/.vscode/settings.json b/.vscode/settings.json index b8648285..516fdb79 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,6 @@ { "eslint.workingDirectories": [ "ts" - ] -} \ No newline at end of file + ], + "rust-analyzer.cargo.features": ["connections", "vendored-openssl"], +} diff --git a/rs/Cargo.lock b/rs/Cargo.lock index ed13f699..ed2d48ee 100644 --- a/rs/Cargo.lock +++ b/rs/Cargo.lock @@ -1739,6 +1739,7 @@ dependencies = [ "async-trait", "chrono", "futures", + "hyper", "log", "opentelemetry", "rand 0.8.5", diff --git a/rs/Cargo.toml b/rs/Cargo.toml index b531fd89..08b21cbf 100644 --- a/rs/Cargo.toml +++ b/rs/Cargo.toml @@ -21,7 +21,7 @@ tungstenite = { version = "0.17", optional = true } uuid = { version = "0.8.2", features = ["v4"], optional = true } russh = { version = "0.34.0-beta.16", default-features = false, features = ["openssl", "flate2"], optional = true } russh-keys = { version = "0.22.0-beta.7", default-features = false, features = ["openssl"], optional = true } - +hyper = "0.14" [dev-dependencies] tokio = { version = "1.20", features = ["full"] } diff --git a/rs/src/connections/errors.rs b/rs/src/connections/errors.rs index d5884655..c9b51e04 100644 --- a/rs/src/connections/errors.rs +++ b/rs/src/connections/errors.rs @@ -26,4 +26,13 @@ pub enum TunnelError { #[error("port {0} already exists in the relay")] PortAlreadyExists(u32), + + #[error("proxy connection failed: {0}")] + ProxyConnectionFailed(std::io::Error), + + #[error("proxy handshake failed: {0}")] + ProxyHandshakeFailed(hyper::Error), + + #[error("proxy connect request failed: {0}")] + ProxyConnectRequestFailed(hyper::Error) } diff --git a/rs/src/connections/relay_tunnel_host.rs b/rs/src/connections/relay_tunnel_host.rs index a9387475..09316e4d 100644 --- a/rs/src/connections/relay_tunnel_host.rs +++ b/rs/src/connections/relay_tunnel_host.rs @@ -3,7 +3,7 @@ use std::{ collections::HashMap, - io, + env, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, pin::Pin, sync::Arc, @@ -26,12 +26,12 @@ use tokio::{ sync::{mpsc, oneshot, watch}, task::JoinHandle, }; -use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use uuid::Uuid; use super::{ errors::TunnelError, - ws::{build_websocket_request, AsyncRWWebSocket}, + ws::{build_websocket_request, connect_directly, connect_via_proxy, AsyncRWWebSocket}, }; /// Mapping of port numbers to senders to which new port connections should be @@ -46,6 +46,7 @@ type PortMap = HashMap>; /// tunnels via the appropriate methods on the RelayTunnelHost, no ports will be /// hosted until those methods are called. pub struct RelayTunnelHost { + pub proxy: Option, locator: TunnelLocator, host_id: Uuid, ports_tx: watch::Sender, @@ -133,6 +134,7 @@ impl RelayTunnelHost { let host_id = Uuid::new_v4(); let (ports_tx, ports_rx) = watch::channel(HashMap::new()); RelayTunnelHost { + proxy: env::var("HTTPS_PROXY").or(env::var("https_proxy")).ok(), host_id, locator, ports_tx, @@ -429,9 +431,12 @@ impl RelayTunnelHost { ], )?; - let (cnx, _) = connect_async(req) - .await - .map_err(TunnelError::WebSocketError)?; + let cnx = if let Some(proxy) = &self.proxy { + log::debug!("connecting via http_proxy on {}", proxy); + connect_via_proxy(req, proxy).await? + } else { + connect_directly(req).await? + }; Ok((cnx, endpoint)) } diff --git a/rs/src/connections/ws.rs b/rs/src/connections/ws.rs index 98c385d8..380969b3 100644 --- a/rs/src/connections/ws.rs +++ b/rs/src/connections/ws.rs @@ -6,9 +6,12 @@ use std::{io, pin::Pin, task::Poll, time::Duration}; use futures::{Future, Sink, Stream}; use tokio::{ io::{AsyncRead, AsyncWrite}, + net::TcpStream, time::{sleep, Instant, Sleep}, }; -use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; + +use crate::management::{HttpError, ResponseError}; use super::errors::TunnelError; @@ -212,6 +215,66 @@ where } } +pub(crate) async fn connect_directly( + ws_req: tungstenite::handshake::client::Request, +) -> Result>, TunnelError> { + let (ws, _) = connect_async(ws_req) + .await + .map_err(TunnelError::WebSocketError)?; + + Ok(ws) +} + +pub(crate) async fn connect_via_proxy( + ws_req: tungstenite::handshake::client::Request, + proxy_addr: &str, +) -> Result>, TunnelError> { + // format the remote authority, explicitly adding a port since it's + // required (by some proxies) in CONNECT + let authority = { + let port = ws_req.uri().port_u16().unwrap_or(443); + let hostname = ws_req.uri().host().expect("expected to have uri host"); + format!("{}:{}", hostname, port) + }; + + let stream = TcpStream::connect(proxy_addr) + .await + .map_err(TunnelError::ProxyConnectionFailed)?; + + let (mut request_sender, conn) = hyper::client::conn::handshake(stream) + .await + .map_err(TunnelError::ProxyHandshakeFailed)?; + + let conn = tokio::spawn(conn.without_shutdown()); + let connect_req = hyper::Request::connect(&authority) + .body(hyper::Body::empty()) + .expect("expected to make connect request"); + + let res = request_sender + .send_request(connect_req) + .await + .map_err(TunnelError::ProxyConnectRequestFailed)?; + + if !res.status().is_success() { + return Err(TunnelError::HttpError { + reason: "error sending tunnel CONNECT request", + error: HttpError::ResponseError(ResponseError { + url: reqwest::Url::parse(proxy_addr).unwrap(), + status_code: res.status(), + data: hyper::body::to_bytes(res.into_body()) + .await + .map(|b| String::from_utf8_lossy(&b).to_string()) + .ok(), + request_id: None, + }), + }); + } + + let tcp = conn.await.unwrap().unwrap().io; + let (ws_stream, _) = tokio_tungstenite::client_async_tls(ws_req, tcp).await?; + Ok(ws_stream) +} + /// Creates a websocket request with additional headers. This is annoyingly /// complicated. https://github.com/snapview/tungstenite-rs/issues/107 pub(crate) fn build_websocket_request(