Skip to content

Commit

Permalink
rs: support HTTP_PROXY (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
connor4312 authored Feb 17, 2023
1 parent ad7bded commit 18a9188
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 10 deletions.
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"eslint.workingDirectories": [
"ts"
]
}
],
"rust-analyzer.cargo.features": ["connections", "vendored-openssl"],
}
1 change: 1 addition & 0 deletions rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tungstenite = { version = "0.17", optional = true }
uuid = { version = "0.8.2", features = ["v4"], optional = true }
russh = { version = "0.34.0-beta.16", default-features = false, features = ["openssl", "flate2"], optional = true }
russh-keys = { version = "0.22.0-beta.7", default-features = false, features = ["openssl"], optional = true }

hyper = "0.14"

[dev-dependencies]
tokio = { version = "1.20", features = ["full"] }
Expand Down
9 changes: 9 additions & 0 deletions rs/src/connections/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,13 @@ pub enum TunnelError {

#[error("port {0} already exists in the relay")]
PortAlreadyExists(u32),

#[error("proxy connection failed: {0}")]
ProxyConnectionFailed(std::io::Error),

#[error("proxy handshake failed: {0}")]
ProxyHandshakeFailed(hyper::Error),

#[error("proxy connect request failed: {0}")]
ProxyConnectRequestFailed(hyper::Error)
}
17 changes: 11 additions & 6 deletions rs/src/connections/relay_tunnel_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{
collections::HashMap,
io,
env, io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
pin::Pin,
sync::Arc,
Expand All @@ -26,12 +26,12 @@ use tokio::{
sync::{mpsc, oneshot, watch},
task::JoinHandle,
};
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use uuid::Uuid;

use super::{
errors::TunnelError,
ws::{build_websocket_request, AsyncRWWebSocket},
ws::{build_websocket_request, connect_directly, connect_via_proxy, AsyncRWWebSocket},
};

/// Mapping of port numbers to senders to which new port connections should be
Expand All @@ -46,6 +46,7 @@ type PortMap = HashMap<u32, mpsc::UnboundedSender<ForwardedPortConnection>>;
/// tunnels via the appropriate methods on the RelayTunnelHost, no ports will be
/// hosted until those methods are called.
pub struct RelayTunnelHost {
pub proxy: Option<String>,
locator: TunnelLocator,
host_id: Uuid,
ports_tx: watch::Sender<PortMap>,
Expand Down Expand Up @@ -133,6 +134,7 @@ impl RelayTunnelHost {
let host_id = Uuid::new_v4();
let (ports_tx, ports_rx) = watch::channel(HashMap::new());
RelayTunnelHost {
proxy: env::var("HTTPS_PROXY").or(env::var("https_proxy")).ok(),
host_id,
locator,
ports_tx,
Expand Down Expand Up @@ -429,9 +431,12 @@ impl RelayTunnelHost {
],
)?;

let (cnx, _) = connect_async(req)
.await
.map_err(TunnelError::WebSocketError)?;
let cnx = if let Some(proxy) = &self.proxy {
log::debug!("connecting via http_proxy on {}", proxy);
connect_via_proxy(req, proxy).await?
} else {
connect_directly(req).await?
};

Ok((cnx, endpoint))
}
Expand Down
65 changes: 64 additions & 1 deletion rs/src/connections/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use std::{io, pin::Pin, task::Poll, time::Duration};
use futures::{Future, Sink, Stream};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
time::{sleep, Instant, Sleep},
};
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};

use crate::management::{HttpError, ResponseError};

use super::errors::TunnelError;

Expand Down Expand Up @@ -212,6 +215,66 @@ where
}
}

pub(crate) async fn connect_directly(
ws_req: tungstenite::handshake::client::Request,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, TunnelError> {
let (ws, _) = connect_async(ws_req)
.await
.map_err(TunnelError::WebSocketError)?;

Ok(ws)
}

pub(crate) async fn connect_via_proxy(
ws_req: tungstenite::handshake::client::Request,
proxy_addr: &str,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, TunnelError> {
// format the remote authority, explicitly adding a port since it's
// required (by some proxies) in CONNECT
let authority = {
let port = ws_req.uri().port_u16().unwrap_or(443);
let hostname = ws_req.uri().host().expect("expected to have uri host");
format!("{}:{}", hostname, port)
};

let stream = TcpStream::connect(proxy_addr)
.await
.map_err(TunnelError::ProxyConnectionFailed)?;

let (mut request_sender, conn) = hyper::client::conn::handshake(stream)
.await
.map_err(TunnelError::ProxyHandshakeFailed)?;

let conn = tokio::spawn(conn.without_shutdown());
let connect_req = hyper::Request::connect(&authority)
.body(hyper::Body::empty())
.expect("expected to make connect request");

let res = request_sender
.send_request(connect_req)
.await
.map_err(TunnelError::ProxyConnectRequestFailed)?;

if !res.status().is_success() {
return Err(TunnelError::HttpError {
reason: "error sending tunnel CONNECT request",
error: HttpError::ResponseError(ResponseError {
url: reqwest::Url::parse(proxy_addr).unwrap(),
status_code: res.status(),
data: hyper::body::to_bytes(res.into_body())
.await
.map(|b| String::from_utf8_lossy(&b).to_string())
.ok(),
request_id: None,
}),
});
}

let tcp = conn.await.unwrap().unwrap().io;
let (ws_stream, _) = tokio_tungstenite::client_async_tls(ws_req, tcp).await?;
Ok(ws_stream)
}

/// Creates a websocket request with additional headers. This is annoyingly
/// complicated. https://github.com/snapview/tungstenite-rs/issues/107
pub(crate) fn build_websocket_request(
Expand Down

0 comments on commit 18a9188

Please sign in to comment.