Skip to content

Commit c21fe27

Browse files
authored
feat(redis): redis adapter crate (#402)
1 parent 8ac02c0 commit c21fe27

File tree

54 files changed

+4036
-207
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+4036
-207
lines changed
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
name: adapter-setup
2+
services:
3+
valkey:
4+
image: valkey/valkey
5+
network_mode: host
6+
healthcheck:
7+
test: "valkey-cli ping"
8+
interval: 2s
9+
timeout: 5s
10+
redis-node-0:
11+
image: docker.io/bitnami/redis-cluster:7.0
12+
network_mode: host
13+
healthcheck:
14+
test: "redis-cli ping"
15+
interval: 2s
16+
timeout: 5s
17+
environment:
18+
- ALLOW_EMPTY_PASSWORD=yes
19+
- REDIS_PORT_NUMBER=7000
20+
- REDIS_CLUSTER_ANNOUNCE_PORT=7000
21+
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1 # host ip address
22+
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17000
23+
- REDIS_CLUSTER_DYNAMIC_IPS=no
24+
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
25+
26+
redis-node-1:
27+
image: docker.io/bitnami/redis-cluster:7.0
28+
network_mode: host
29+
healthcheck:
30+
test: "redis-cli ping"
31+
interval: 2s
32+
timeout: 5s
33+
environment:
34+
- ALLOW_EMPTY_PASSWORD=yes
35+
- REDIS_PORT_NUMBER=7001
36+
- REDIS_CLUSTER_ANNOUNCE_PORT=7001
37+
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17001
38+
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
39+
- REDIS_CLUSTER_DYNAMIC_IPS=no
40+
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
41+
42+
redis-node-2:
43+
image: docker.io/bitnami/redis-cluster:7.0
44+
network_mode: host
45+
healthcheck:
46+
test: "redis-cli ping"
47+
interval: 2s
48+
timeout: 5s
49+
environment:
50+
- ALLOW_EMPTY_PASSWORD=yes
51+
- REDIS_PORT_NUMBER=7002
52+
- REDIS_CLUSTER_ANNOUNCE_PORT=7002
53+
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17002
54+
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
55+
- REDIS_CLUSTER_DYNAMIC_IPS=no
56+
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
57+
58+
redis-node-3:
59+
image: docker.io/bitnami/redis-cluster:7.0
60+
network_mode: host
61+
healthcheck:
62+
test: "redis-cli ping"
63+
interval: 2s
64+
timeout: 5s
65+
environment:
66+
- ALLOW_EMPTY_PASSWORD=yes
67+
- REDIS_PORT_NUMBER=7003
68+
- REDIS_CLUSTER_ANNOUNCE_PORT=7003
69+
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17003
70+
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
71+
- REDIS_CLUSTER_DYNAMIC_IPS=no
72+
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
73+
74+
redis-node-4:
75+
image: docker.io/bitnami/redis-cluster:7.0
76+
network_mode: host
77+
healthcheck:
78+
test: "redis-cli ping"
79+
interval: 2s
80+
timeout: 5s
81+
environment:
82+
- ALLOW_EMPTY_PASSWORD=yes
83+
- REDIS_PORT_NUMBER=7004
84+
- REDIS_CLUSTER_ANNOUNCE_PORT=7004
85+
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17004
86+
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
87+
- REDIS_CLUSTER_DYNAMIC_IPS=no
88+
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
89+
90+
redis-node-5:
91+
image: docker.io/bitnami/redis-cluster:7.0
92+
network_mode: host
93+
healthcheck:
94+
test: "redis-cli ping"
95+
interval: 2s
96+
timeout: 5s
97+
depends_on:
98+
- redis-node-0
99+
- redis-node-1
100+
- redis-node-2
101+
- redis-node-3
102+
- redis-node-4
103+
environment:
104+
- ALLOW_EMPTY_PASSWORD=yes
105+
- REDIS_CLUSTER_REPLICAS=1
106+
- REDIS_PORT_NUMBER=7005
107+
- REDIS_CLUSTER_ANNOUNCE_PORT=7005
108+
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17005
109+
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
110+
- REDIS_CLUSTER_DYNAMIC_IPS=no
111+
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
112+
- REDIS_CLUSTER_CREATOR=yes

.github/workflows/github-ci.yml

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ jobs:
5656
target/
5757
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}-nightly
5858
- name: Check unused dependencies on default features
59-
run: RUSTFLAGS="--cfg fuzzing" cargo udeps --workspace
59+
run: RUSTFLAGS="--cfg fuzzing" cargo udeps
6060

6161
- name: Check unused dependencies on all features
62-
run: RUSTFLAGS="--cfg fuzzing" cargo udeps --all-features --workspace
62+
run: RUSTFLAGS="--cfg fuzzing" cargo udeps --all-features
6363

6464
msrv:
6565
runs-on: ubuntu-latest
@@ -108,7 +108,7 @@ jobs:
108108
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
109109

110110
- name: check --feature-powerset
111-
run: cargo hack check --feature-powerset --no-dev-deps -p socketioxide -p engineioxide
111+
run: cargo hack check --feature-powerset --no-dev-deps -p socketioxide -p engineioxide -p socketioxide-redis
112112

113113
examples:
114114
runs-on: ubuntu-latest
@@ -269,3 +269,57 @@ jobs:
269269
- name: Client output
270270
if: always()
271271
run: cat client.txt
272+
adapter:
273+
runs-on: ubuntu-latest
274+
needs: [socket_io, engine_io]
275+
strategy:
276+
matrix:
277+
socketio-version: [v4, v4-msgpack, v5, v5-msgpack]
278+
adapter: [fred-e2e, redis-e2e, redis-cluster-e2e, fred-cluster-e2e]
279+
steps:
280+
- uses: actions/checkout@v4
281+
- uses: dtolnay/rust-toolchain@master
282+
with:
283+
toolchain: stable
284+
- uses: actions/cache@v4
285+
with:
286+
path: |
287+
~/.cargo/bin/
288+
~/.cargo/registry/index/
289+
~/.cargo/registry/cache/
290+
~/.cargo/git/db/
291+
target/
292+
key: ${{ runner.os }}-cargo-adapter
293+
- uses: actions/setup-node@v4
294+
with:
295+
node-version: 22
296+
- name: install adapter infra
297+
uses: hoverkraft-tech/[email protected]
298+
with:
299+
compose-file: ./.github/workflows/adapter-ci/docker-compose.yml
300+
- run: cd e2e/adapter && npm install && npm install ts-node --location=global
301+
- name: Install deps & run tests
302+
run: |
303+
PARSER=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f2 -s)
304+
VERSION=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f1)
305+
cargo build -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER
306+
cd e2e/adapter && CMD="cargo run -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER" ts-node client.ts
307+
- name: Server output
308+
if: always()
309+
run: cat e2e/adapter/*.log
310+
all_passed:
311+
runs-on: ubuntu-latest
312+
needs:
313+
[
314+
adapter,
315+
feature_set,
316+
format,
317+
udeps,
318+
msrv,
319+
examples,
320+
doctest,
321+
rust-clippy-analyze,
322+
]
323+
steps:
324+
- name: All passed
325+
run: echo "All tests passed"

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ hyper-util.version = "0.1"
4040
hyper = "1.5"
4141
pin-project-lite = "0.2"
4242
matchit = "0.8"
43+
rmp-serde = "1.3"
44+
rmp = "0.8"
45+
rustversion = "1"
4346

4447
# Dev deps
4548
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

crates/engineioxide/src/sid.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ pub struct Sid([u8; 16]);
1616
impl Sid {
1717
/// A zeroed session id
1818
pub const ZERO: Self = Self([0u8; 16]);
19-
/// Generate a new random session id (base64 10 chars)
19+
/// Generate a new random session id (base64 16 chars)
2020
pub fn new() -> Self {
2121
Self::default()
2222
}
2323

24-
/// Get the session id as a base64 10 chars string
25-
pub fn as_str(&self) -> &str {
24+
/// Get the session id as a base64 16 chars string
25+
pub const fn as_str(&self) -> &str {
2626
// SAFETY: SID is always a base64 chars string
2727
unsafe { std::str::from_utf8_unchecked(&self.0) }
2828
}

crates/parser-msgpack/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ readme.workspace = true
1515
[dependencies]
1616
bytes.workspace = true
1717
serde.workspace = true
18-
rmp = "0.8"
19-
rmp-serde = "1.3"
18+
rmp-serde.workspace = true
19+
rmp.workspace = true
2020
socketioxide-core = { version = "0.15.0", path = "../socketioxide-core" }
2121

2222
[dev-dependencies]

crates/socketioxide-core/src/adapter.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,26 @@ pub trait SocketEmitter: Send + Sync + 'static {
210210
fn server_id(&self) -> Uid;
211211
}
212212

213+
/// For static namespaces, the init response will be managed by the user.
214+
/// However, for dynamic namespaces, the socket.io client will manage the response.
215+
/// As it does not know the type of the response, the spawnable trait is used to spawn the response.
216+
/// Without the client having to know the type of the response.
217+
pub trait Spawnable {
218+
/// Spawn the response. Implementors should spawn the future with `tokio::spawn` if it is an async function.
219+
/// They should also print a `tracing::error` log in case of an error.
220+
fn spawn(self);
221+
}
222+
impl Spawnable for () {
223+
fn spawn(self) {}
224+
}
225+
226+
/// A trait to add a "defined" bound to adapter types.
227+
/// This allow the socket io library to implement function given a *defined* adapter
228+
/// and not a generic `A: Adapter`.
229+
///
230+
/// This is useful to force the user to handle potential init response type [`CoreAdapter::InitRes`].
231+
pub trait DefinedAdapter {}
232+
213233
/// An adapter is responsible for managing the state of the namespace.
214234
/// This adapter can be implemented to share the state between multiple servers.
215235
///
@@ -223,17 +243,17 @@ pub trait CoreAdapter<E: SocketEmitter>: Sized + Send + Sync + 'static {
223243
type State: Send + Sync + 'static;
224244
/// A stream that emits the acknowledgments of multiple sockets.
225245
type AckStream: Stream<Item = AckStreamItem<E::AckError>> + FusedStream + Send + 'static;
246+
/// A named result type for the initialization of the adapter.
247+
type InitRes: Spawnable + Send;
226248

227249
/// Creates a new adapter with the given state and local adapter.
228250
///
229251
/// The state is used to share a common state between all your adapters. E.G. a connection to a remote system.
230252
/// The local adapter is used to manipulate the local sockets.
231253
fn new(state: &Self::State, local: CoreLocalAdapter<E>) -> Self;
232254

233-
/// Initializes the adapter.
234-
fn init(self: Arc<Self>) -> impl Future<Output = Result<(), Self::Error>> + Send {
235-
future::ready(Ok(()))
236-
}
255+
/// Initializes the adapter. The on_success callback should be called when the adapter ready.
256+
fn init(self: Arc<Self>, on_success: impl FnOnce() + Send + 'static) -> Self::InitRes;
237257

238258
/// Closes the adapter.
239259
fn close(&self) -> impl Future<Output = Result<(), Self::Error>> + Send {

crates/socketioxide-redis/Cargo.toml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
[package]
2+
name = "socketioxide-redis"
3+
description = "Redis adapter for the socket.io protocol"
4+
version = "0.1.0"
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
authors.workspace = true
8+
repository.workspace = true
9+
homepage.workspace = true
10+
keywords.workspace = true
11+
categories.workspace = true
12+
license.workspace = true
13+
readme.workspace = true
14+
15+
[features]
16+
redis = ["dep:redis"]
17+
redis-cluster = ["redis", "redis/cluster-async"]
18+
fred = ["dep:fred"]
19+
default = ["redis"]
20+
21+
[dependencies]
22+
socketioxide-core = { version = "0.15", path = "../socketioxide-core" }
23+
futures-core.workspace = true
24+
futures-util.workspace = true
25+
pin-project-lite.workspace = true
26+
serde.workspace = true
27+
smallvec = { workspace = true, features = ["serde"] }
28+
tokio = { workspace = true, features = ["macros", "time", "rt", "sync"] }
29+
rmp-serde.workspace = true
30+
rmp.workspace = true
31+
bytes.workspace = true
32+
tracing.workspace = true
33+
thiserror.workspace = true
34+
35+
# Redis implementation
36+
fred = { version = "10", features = [
37+
"subscriber-client",
38+
"i-pubsub",
39+
], default-features = false, optional = true }
40+
redis = { version = "0.28", features = [
41+
"aio",
42+
"tokio-comp",
43+
"streams",
44+
], default-features = false, optional = true }
45+
46+
[dev-dependencies]
47+
tokio = { workspace = true, features = [
48+
"macros",
49+
"parking_lot",
50+
"rt-multi-thread",
51+
] }
52+
socketioxide = { path = "../socketioxide", features = [
53+
"tracing",
54+
"__test_harness",
55+
] }
56+
tracing-subscriber.workspace = true

0 commit comments

Comments
 (0)