Skip to content

Commit db50c6d

Browse files
committed
feat: typestate pattern for SocketIoBuilder + tests fixes
1 parent 67bcc99 commit db50c6d

File tree

14 files changed

+144
-59
lines changed

14 files changed

+144
-59
lines changed

crates/engineioxide/Readme.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ impl EngineIoHandler for MyHandler {
5353
let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
5454
socket.emit(cnt.to_string()).ok();
5555
}
56-
fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
56+
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
5757
*socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
5858
}
59-
fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
59+
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
6060
}
6161

6262
// Create a new engineio layer

crates/engineioxide/src/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
//! type Data = ();
1616
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
1717
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
18-
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
19-
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
18+
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
19+
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
2020
//! }
2121
//!
2222
//! let config = EngineIoConfig::builder()
@@ -150,12 +150,12 @@ impl EngineIoConfigBuilder {
150150
/// println!("socket disconnect {}", socket.id);
151151
/// }
152152
///
153-
/// fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) {
153+
/// fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) {
154154
/// println!("Ping pong message {:?}", msg);
155155
/// socket.emit(msg).unwrap();
156156
/// }
157157
///
158-
/// fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) {
158+
/// fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) {
159159
/// println!("Ping pong binary message {:?}", data);
160160
/// socket.emit_binary(data).unwrap();
161161
/// }

crates/engineioxide/src/handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
//! let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
3131
//! socket.emit(cnt.to_string()).ok();
3232
//! }
33-
//! fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
33+
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
3434
//! *socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
3535
//! }
36-
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
36+
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
3737
//! }
3838
//!
3939
//! // Create an engine io service with the given handler

crates/engineioxide/src/layer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
//! type Data = ();
1616
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
1717
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
18-
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
19-
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
18+
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
19+
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
2020
//! }
2121
//! // Create a new engineio layer
2222
//! let layer = EngineIoLayer::new(Arc::new(MyHandler));

crates/engineioxide/src/service/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
//! type Data = ();
1818
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
1919
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
20-
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
21-
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
20+
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
21+
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
2222
//! }
2323
//!
2424
//! // Create a new engine.io service that will return a 404 not found response for other requests

crates/engineioxide/src/socket.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@
4646
//! fn on_disconnect(&self, socket: Arc<Socket<SocketState>>, reason: DisconnectReason) {
4747
//! let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
4848
//! }
49-
//! fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
49+
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
5050
//! *socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
5151
//! }
52-
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
52+
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
5353
//! }
5454
//!
5555
//! let svc = EngineIoService::new(Arc::new(MyHandler::default()));

crates/socketioxide-core/src/adapter.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,17 +180,22 @@ pub trait SocketEmitter: Send + Sync + 'static {
180180

181181
/// An adapter is responsible for managing the state of the namespace.
182182
/// This adapter can be implemented to share the state between multiple servers.
183-
/// The default adapter is the [`LocalAdapter`], which stores the state in memory.
183+
///
184+
/// A [`CoreLocalAdapter`] instance will be given when constructing this type, it will allow
185+
/// you to manipulate local sockets (emitting, fetching data, broadcasting).
184186
pub trait CoreAdapter<E: SocketEmitter>: Sized + Send + Sync + 'static {
185-
/// An error that can occur when using the adapter. The default [`LocalAdapter`] has an [`Infallible`] error.
187+
/// An error that can occur when using the adapter.
186188
type Error: StdError + Into<AdapterError> + Send + 'static;
187189
/// A shared state between all the namespace [`CoreAdapter`].
188190
/// This can be used to share a connection for example.
189191
type State: Send + Sync + 'static;
190192
/// A stream that emits the acknowledgments of multiple sockets.
191193
type AckStream: Stream<Item = AckStreamItem<E::AckError>> + FusedStream + Send + 'static;
192194

193-
/// Creates a new adapter with the given state and socket server.
195+
/// Creates a new adapter with the given state and local adapter.
196+
///
197+
/// The state is used to share a common state between all your adapters. E.G. a connection to a remote system.
198+
/// The local adapter is used to manipulate the local sockets.
194199
fn new(state: &Self::State, local: CoreLocalAdapter<E>) -> Self;
195200

196201
/// Initializes the adapter.
@@ -245,7 +250,7 @@ pub trait CoreAdapter<E: SocketEmitter>: Sized + Send + Sync + 'static {
245250
/// and return a stream of ack responses.
246251
///
247252
/// This method does not have default implementation because GAT cannot have default impls.
248-
/// https://github.com/rust-lang/rust/issues/29661
253+
/// <https://github.com/rust-lang/rust/issues/29661>
249254
fn broadcast_with_ack(
250255
&self,
251256
packet: Packet,

crates/socketioxide-core/src/errors.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub enum SocketError {
88
/// The socket channel is full.
99
/// You might need to increase the channel size with the [`SocketIoBuilder::max_buffer_size`] method.
1010
///
11-
/// [`SocketIoBuilder::max_buffer_size`]: crate::SocketIoBuilder#method.max_buffer_size
11+
/// [`SocketIoBuilder::max_buffer_size`]: https://docs.rs/socketioxide/latest/socketioxide/struct.SocketIoBuilder.html#method.max_buffer_size
1212
#[error("internal channel full error")]
1313
InternalChannelFull,
1414

@@ -17,7 +17,7 @@ pub enum SocketError {
1717
Closed,
1818
}
1919

20-
/// Error type for the [`Adapter`](crate::adapter::Adapter) trait.
20+
/// Error type for the [`CoreAdapter`](crate::adapter::CoreAdapter) trait.
2121
#[derive(Debug, thiserror::Error)]
2222
pub struct AdapterError(#[from] pub Box<dyn std::error::Error + Send>);
2323
impl fmt::Display for AdapterError {
@@ -37,7 +37,7 @@ pub enum DisconnectError {
3737
/// The socket channel is full.
3838
/// You might need to increase the channel size with the [`SocketIoBuilder::max_buffer_size`] method.
3939
///
40-
/// [`SocketIoBuilder::max_buffer_size`]: crate::SocketIoBuilder#method.max_buffer_size
40+
/// [`SocketIoBuilder::max_buffer_size`]: https://docs.rs/socketioxide/latest/socketioxide/struct.SocketIoBuilder.html#method.max_buffer_size
4141
#[error("internal channel full error")]
4242
InternalChannelFull,
4343

crates/socketioxide/docs/operators/emit_with_ack.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ To receive acknowledgments, an [`AckStream`] is returned. It can be used in two
1010
* As a [`Future`]: This will yield the first acknowledgment response received from the client, useful when expecting only one acknowledgment.
1111

1212
# Errors
13-
If packet encoding fails, an [`EncodeError`] is **immediately** returned.
13+
If packet encoding fails, an [`ParserError`] is **immediately** returned.
1414

1515
If the socket is full or if it is closed before receiving the acknowledgment,
1616
a [`SendError::Socket`] will be **immediately** returned, and the value to send will be given back.
@@ -28,7 +28,7 @@ an [`AckError::Decode`] will be yielded.
2828
[`AckError::Socket`]: crate::AckError::Socket
2929
[`AckError::Socket(SocketError::Closed)`]: crate::SocketError::Closed
3030
[`SendError::Socket`]: crate::SendError::Socket
31-
[`EncodeError`]: crate::EncodeError
31+
[`ParserError`]: crate::ParserError
3232
[`io::get_socket()`]: crate::SocketIo#method.get_socket
3333

3434
# Single-socket example

crates/socketioxide/src/client.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub struct Client<A: Adapter> {
3737
pub(crate) state: state::TypeMap![Send + Sync],
3838
}
3939

40-
/// ==== impl Client ====
40+
// ==== impl Client ====
4141

4242
impl<A: Adapter> Client<A> {
4343
pub fn new(
@@ -148,7 +148,7 @@ impl<A: Adapter> Client<A> {
148148
}
149149

150150
/// Adds a new namespace handler
151-
pub async fn add_ns<C, T>(&self, path: Cow<'static, str>, callback: C) -> Result<(), A::Error>
151+
pub fn add_ns<C, T>(&self, path: Cow<'static, str>, callback: C)
152152
where
153153
C: ConnectHandler<A, T>,
154154
T: Send + Sync + 'static,
@@ -162,9 +162,7 @@ impl<A: Adapter> Client<A> {
162162
&self.adapter_state,
163163
self.config.parser,
164164
);
165-
ns.adapter.clone().init().await?;
166165
self.nsps.write().unwrap().insert(path, ns);
167-
Ok(())
168166
}
169167

170168
pub fn add_dyn_ns<C, T>(&self, path: String, callback: C) -> Result<(), matchit::InsertError>
@@ -179,6 +177,16 @@ impl<A: Adapter> Client<A> {
179177
self.router.write().unwrap().insert(path, ns)
180178
}
181179

180+
/// Initializes all the namespace handlers
181+
///
182+
/// If an any error occurs while initializing a namespace, it is immediately returned
183+
pub async fn init_nsps(&self) -> Result<(), A::Error> {
184+
let nsps: Vec<_> = self.nsps.read().unwrap().values().cloned().collect();
185+
let futures = nsps.into_iter().map(|ns| ns.adapter.clone().init());
186+
futures_util::future::try_join_all(futures).await?;
187+
Ok(())
188+
}
189+
182190
/// Deletes a namespace handler and closes all the connections to it
183191
pub fn delete_ns(&self, path: &str) {
184192
#[cfg(feature = "v4")]

0 commit comments

Comments
 (0)