Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions multiaddr/src/onion_addr.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{borrow::Cow, fmt};

use data_encoding::BASE32;

/// Represents an Onion v3 address
#[derive(Clone)]
pub struct Onion3Addr<'a>(Cow<'a, [u8; 35]>, u16);
Expand All @@ -19,6 +21,11 @@ impl Onion3Addr<'_> {
pub fn acquire<'b>(self) -> Onion3Addr<'b> {
Onion3Addr(Cow::Owned(self.0.into_owned()), self.1)
}

pub fn hash_string(&self) -> String {
let s = BASE32.encode(self.hash());
s.to_lowercase()
}
}

impl PartialEq for Onion3Addr<'_> {
Expand Down
3 changes: 1 addition & 2 deletions multiaddr/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ impl fmt::Display for Protocol<'_> {
Wss => write!(f, "/wss"),
Memory(port) => write!(f, "/memory/{}", port),
Onion3(addr) => {
let s = BASE32.encode(addr.hash());
write!(f, "/onion3/{}:{}", s.to_lowercase(), addr.port())
write!(f, "/onion3/{}:{}", addr.hash_string(), addr.port())
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions tentacle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ httparse = { version = "1.9", optional = true }
futures-timer = { version = "3.0.2", optional = true }

multiaddr = { path = "../multiaddr", package = "tentacle-multiaddr", version = "0.3.4" }
url = "2.5.4"
molecule = "0.8.0"

# upnp
Expand All @@ -48,6 +49,7 @@ tokio-rustls = { version = "0.26.0", optional = true }
# rand 0.8 not support wasm32
rand = "0.8"
socket2 = { version = "0.5.0", features = ["all"] }
fast-socks5 = "0.10.0"

[target.'cfg(target_family = "wasm")'.dependencies]
js-sys = "0.3"
Expand Down
59 changes: 53 additions & 6 deletions tentacle/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{io, sync::Arc, time::Duration};

use crate::service::config::TransformerContext;
use nohash_hasher::IntMap;
use tokio_util::codec::LengthDelimitedCodec;

Expand Down Expand Up @@ -217,9 +218,49 @@ where
#[cfg(not(target_family = "wasm"))]
pub fn tcp_config<F>(mut self, f: F) -> Self
where
F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
F: Fn(TcpSocket, TransformerContext) -> Result<TcpSocket, std::io::Error>
+ Send
+ Sync
+ 'static,
{
self.config.tcp_config.tcp = Arc::new(f);
self.config.tcp_config.tcp.socket_transformer = Arc::new(f);
self
}

fn must_parse_proxy_url(url: String) -> url::Url {
let url = url::Url::parse(&url)
.unwrap_or_else(|err| panic!("parse {} to url::Url failed: {}", url, err));
if url.scheme().ne("socks5") {
panic!("tentacle only support socks5 proxy");
}
url
}

/// Proxy config for tcp
///
/// Example: socks5://127.0.0.1:9050
/// Example: socks5://username:[email protected]:9050
#[cfg(not(target_family = "wasm"))]
pub fn tcp_proxy_config(mut self, proxy_url: String) -> Self {
let proxy_url: url::Url = Self::must_parse_proxy_url(proxy_url);
self.config.tcp_config.tcp.proxy_url = Some(proxy_url);
self
}

/// Onion config for tcp
///
/// Example: socks5://127.0.0.1:9050
#[cfg(not(target_family = "wasm"))]
pub fn tcp_onion_config(mut self, onion_url: String) -> Self {
let onion_url: url::Url = Self::must_parse_proxy_url(onion_url);
self.config.tcp_config.tcp.onion_url = Some(onion_url);
self
}

/// Onion config for proxy, use random username/password is set for proxy connection
#[cfg(not(target_family = "wasm"))]
pub fn tcp_proxy_random_auth(mut self, proxy_random_auth: bool) -> Self {
self.config.tcp_config.tcp.proxy_random_auth = proxy_random_auth;
self
}

Expand All @@ -228,9 +269,12 @@ where
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub fn tcp_config_on_ws<F>(mut self, f: F) -> Self
where
F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
F: Fn(TcpSocket, TransformerContext) -> Result<TcpSocket, std::io::Error>
+ Send
+ Sync
+ 'static,
{
self.config.tcp_config.ws = Arc::new(f);
self.config.tcp_config.ws.socket_transformer = Arc::new(f);
self
}

Expand All @@ -252,9 +296,12 @@ where
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub fn tcp_config_on_tls<F>(mut self, f: F) -> Self
where
F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
F: Fn(TcpSocket, TransformerContext) -> Result<TcpSocket, std::io::Error>
+ Send
+ Sync
+ 'static,
{
self.config.tcp_config.tls = Arc::new(f);
self.config.tcp_config.tls.socket_transformer = Arc::new(f);
self
}
}
Expand Down
34 changes: 18 additions & 16 deletions tentacle/src/channel/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,24 @@ impl<T> Queue<T> {
///
/// This function is unsafe because only one thread can call it at a time.
pub(super) unsafe fn pop(&self) -> PopResult<T> {
let tail = *self.tail.get();
let next = (*tail).next.load(Ordering::Acquire);

if !next.is_null() {
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
let ret = (*next).value.take().unwrap();
drop(Box::from_raw(tail));
return PopResult::Data(ret);
}
unsafe {
let tail = *self.tail.get();
let next = (*tail).next.load(Ordering::Acquire);

if !next.is_null() {
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
let ret = (*next).value.take().unwrap();
drop(Box::from_raw(tail));
return PopResult::Data(ret);
}

if self.head.load(Ordering::Acquire) == tail {
PopResult::Empty
} else {
PopResult::Inconsistent
if self.head.load(Ordering::Acquire) == tail {
PopResult::Empty
} else {
PopResult::Inconsistent
}
}
}

Expand All @@ -146,7 +148,7 @@ impl<T> Queue<T> {
/// This function is unsafe because only one thread can call it at a time.
pub(super) unsafe fn pop_spin(&self) -> Option<T> {
loop {
match self.pop() {
match unsafe { self.pop() } {
PopResult::Empty => return None,
PopResult::Data(t) => return Some(t),
// Inconsistent means that there will be a message to pop
Expand Down
3 changes: 3 additions & 0 deletions tentacle/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pub enum TransportErrorKind {
/// IO error
#[error("transport io error: `{0:?}`")]
Io(#[from] IOError),
/// Proxy server error
#[error("proxy error: `{0:?}`")]
ProxyError(IOError),
/// Protocol not support
#[error("multiaddr `{0:?}` is not supported")]
NotSupported(Multiaddr),
Expand Down
1 change: 1 addition & 0 deletions tentacle/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
all(target_family = "wasm", feature = "wasm-timer")
))]
mod generic_timer;
pub(crate) mod proxy;
#[cfg(all(not(target_family = "wasm"), feature = "tokio-runtime"))]
mod tokio_runtime;
#[cfg(target_family = "wasm")]
Expand Down
4 changes: 4 additions & 0 deletions tentacle/src/runtime/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#[cfg(not(target_family = "wasm"))]
pub(crate) mod socks5;
#[cfg(not(target_family = "wasm"))]
pub(crate) mod socks5_config;
41 changes: 41 additions & 0 deletions tentacle/src/runtime/proxy/socks5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use fast_socks5::{
AuthenticationMethod, Socks5Command,
client::{Config as ConnectConfig, Socks5Stream},
};
use tokio::net::TcpStream;

pub async fn connect(
socks_server: url::Url,
target_addr: String,
target_port: u16,
) -> Result<TcpStream, fast_socks5::SocksError> {
let auth = {
if socks_server.username().is_empty() {
AuthenticationMethod::None
} else {
AuthenticationMethod::Password {
username: socks_server.username().to_string(),
password: socks_server.password().unwrap_or_default().to_string(),
}
}
};
let socks_server_str = format!(
"{}:{}",
socks_server.host_str().ok_or_else(|| {
fast_socks5::SocksError::ArgumentInputError("socks_server should have host")
})?,
socks_server.port().ok_or_else(|| {
fast_socks5::SocksError::ArgumentInputError("socks_server should have port")
})?
);
Socks5Stream::connect_raw(
Socks5Command::TCPConnect,
socks_server_str,
target_addr,
target_port,
Some(auth),
ConnectConfig::default(),
)
.await
.map(|socket| socket.get_socket())
}
15 changes: 15 additions & 0 deletions tentacle/src/runtime/proxy/socks5_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use rand::Rng;

pub(crate) fn random_auth() -> (String, String) {
let username = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(8)
.map(char::from)
.collect();
let password = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(16)
.map(char::from)
.collect();
(username, password)
}
Loading