Skip to content

Commit 6de9941

Browse files
committed
Change requests applied
1 parent 385c777 commit 6de9941

File tree

2 files changed

+140
-43
lines changed

2 files changed

+140
-43
lines changed

crates/adapters/hyperliquid/bin/ws-exec.rs

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@ use std::time::Duration;
1717

1818
use nautilus_hyperliquid::{
1919
common::consts::{HyperliquidNetwork, ws_url},
20-
websocket::{
21-
client::HyperliquidWebSocketClient,
22-
messages::{HyperliquidWsRequest, SubscriptionRequest},
23-
},
20+
websocket::client::HyperliquidWebSocketClient,
2421
};
2522
use tokio::{pin, signal};
2623
use tracing::level_filters::LevelFilter;
@@ -37,28 +34,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3734
let network = HyperliquidNetwork::from_env();
3835
let ws_url = ws_url(network);
3936

40-
let (mut client, message_rx) = HyperliquidWebSocketClient::connect(ws_url).await?;
37+
let mut client = HyperliquidWebSocketClient::connect(ws_url).await?;
4138
tracing::info!("Connected to Hyperliquid WebSocket");
4239

4340
// Subscribe to execution channels
4441
let user_addr = std::env::var("HYPERLIQUID_USER_ADDRESS")
4542
.unwrap_or_else(|_| "0x0000000000000000000000000000000000000000".to_string());
4643

47-
// Subscribe to order updates
48-
let order_updates_request = HyperliquidWsRequest::Subscribe {
49-
subscription: SubscriptionRequest::OrderUpdates {
50-
user: user_addr.clone(),
51-
},
52-
};
53-
client.send(&order_updates_request).await?;
54-
55-
// Subscribe to user events
56-
let user_events_request = HyperliquidWsRequest::Subscribe {
57-
subscription: SubscriptionRequest::UserEvents {
58-
user: user_addr.clone(),
59-
},
60-
};
61-
client.send(&user_events_request).await?;
44+
// Subscribe to all user channels using the convenience method
45+
client.subscribe_all_user_channels(&user_addr).await?;
46+
tracing::info!("Subscribed to all user channels for {}", user_addr);
6247

6348
// Wait briefly to ensure subscriptions are active
6449
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -67,12 +52,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6752
let sigint = signal::ctrl_c();
6853
pin!(sigint);
6954

70-
let stream = message_rx;
71-
tokio::pin!(stream);
72-
7355
loop {
7456
tokio::select! {
75-
Some(message) = stream.recv() => {
57+
Some(message) = client.next_event() => {
7658
tracing::debug!("{message:?}");
7759
}
7860
_ = &mut sigint => {

crates/adapters/hyperliquid/src/websocket/client.rs

Lines changed: 134 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,32 @@
1313
// limitations under the License.
1414
// -------------------------------------------------------------------------------------------------
1515

16+
use std::collections::HashSet;
17+
1618
use anyhow::Result;
1719
use nautilus_network::websocket::{WebSocketClient, WebSocketConfig, channel_message_handler};
1820
use tokio::sync::mpsc;
1921
use tokio_tungstenite::tungstenite::Message;
2022
use tracing::{debug, error, info, warn};
2123

22-
use crate::websocket::messages::{HyperliquidWsMessage, HyperliquidWsRequest};
24+
use crate::websocket::messages::{HyperliquidWsMessage, HyperliquidWsRequest, SubscriptionRequest};
2325

24-
/// Hyperliquid WebSocket client that wraps Nautilus WebSocketClient for lifecycle management.
26+
/// Low-level Hyperliquid WebSocket client that wraps Nautilus WebSocketClient.
27+
///
28+
/// This is the inner client that handles the transport layer and provides low-level
29+
/// WebSocket methods with `ws_*` prefixes.
2530
#[derive(Debug)]
26-
pub struct HyperliquidWebSocketClient {
27-
client: WebSocketClient,
31+
pub struct HyperliquidWebSocketInnerClient {
32+
inner: WebSocketClient,
33+
rx_inbound: mpsc::Receiver<HyperliquidWsMessage>,
34+
sent_subscriptions: HashSet<String>,
2835
_reader_task: tokio::task::JoinHandle<()>,
2936
}
3037

31-
impl HyperliquidWebSocketClient {
32-
/// Creates a new Hyperliquid WebSocket client with Nautilus' reconnection/backoff/heartbeat.
33-
/// Returns (client, rx) where `rx` yields Hyperliquid-native `HyperliquidWsMessage` events.
34-
pub async fn connect(url: &str) -> Result<(Self, mpsc::Receiver<HyperliquidWsMessage>)> {
38+
impl HyperliquidWebSocketInnerClient {
39+
/// Creates a new Hyperliquid WebSocket inner client with Nautilus' reconnection/backoff/heartbeat.
40+
/// Returns a client that owns the inbound message receiver.
41+
pub async fn connect(url: &str) -> Result<Self> {
3542
// Create message handler for receiving raw WebSocket messages
3643
let (message_handler, mut raw_rx) = channel_message_handler();
3744

@@ -99,46 +106,154 @@ impl HyperliquidWebSocketClient {
99106
});
100107

101108
let hl_client = Self {
102-
client,
109+
inner: client,
110+
rx_inbound,
111+
sent_subscriptions: HashSet::new(),
103112
_reader_task: reader_task,
104113
};
105114

106-
Ok((hl_client, rx_inbound))
115+
Ok(hl_client)
107116
}
108117

109-
/// Sends a Hyperliquid WebSocket request via the Nautilus WebSocket client.
110-
pub async fn send(&self, request: &HyperliquidWsRequest) -> Result<()> {
118+
/// Low-level method to send a Hyperliquid WebSocket request.
119+
pub async fn ws_send(&self, request: &HyperliquidWsRequest) -> Result<()> {
111120
let json = serde_json::to_string(request)?;
112121
debug!("Sending WS message: {}", json);
113-
self.client
122+
self.inner
114123
.send_text(json, None)
115124
.await
116125
.map_err(|e| anyhow::anyhow!(e))
117126
}
118127

128+
/// Low-level method to send a request only once (dedup by JSON serialization).
129+
pub async fn ws_send_once(&mut self, request: &HyperliquidWsRequest) -> Result<()> {
130+
let json = serde_json::to_string(request)?;
131+
if self.sent_subscriptions.contains(&json) {
132+
debug!("Skipping duplicate request: {}", json);
133+
return Ok(());
134+
}
135+
136+
debug!("Sending WS message: {}", json);
137+
self.inner
138+
.send_text(json.clone(), None)
139+
.await
140+
.map_err(|e| anyhow::anyhow!(e))?;
141+
142+
self.sent_subscriptions.insert(json);
143+
Ok(())
144+
}
145+
146+
/// Low-level method to subscribe to a specific channel.
147+
pub async fn ws_subscribe(&mut self, subscription: SubscriptionRequest) -> Result<()> {
148+
let request = HyperliquidWsRequest::Subscribe { subscription };
149+
self.ws_send_once(&request).await
150+
}
151+
152+
/// Get the next event from the WebSocket stream.
153+
/// Returns None when the connection is closed or the receiver is exhausted.
154+
pub async fn ws_next_event(&mut self) -> Option<HyperliquidWsMessage> {
155+
self.rx_inbound.recv().await
156+
}
157+
119158
/// Returns true if the WebSocket connection is active.
120159
pub fn is_active(&self) -> bool {
121-
self.client.is_active()
160+
self.inner.is_active()
122161
}
123162

124163
/// Returns true if the WebSocket is reconnecting.
125164
pub fn is_reconnecting(&self) -> bool {
126-
self.client.is_reconnecting()
165+
self.inner.is_reconnecting()
127166
}
128167

129168
/// Returns true if the WebSocket is disconnecting.
130169
pub fn is_disconnecting(&self) -> bool {
131-
self.client.is_disconnecting()
170+
self.inner.is_disconnecting()
132171
}
133172

134173
/// Returns true if the WebSocket is closed.
135174
pub fn is_closed(&self) -> bool {
136-
self.client.is_closed()
175+
self.inner.is_closed()
137176
}
138177

139178
/// Disconnect the WebSocket client.
140-
pub async fn disconnect(&mut self) -> Result<()> {
141-
self.client.disconnect().await;
179+
pub async fn ws_disconnect(&mut self) -> Result<()> {
180+
self.inner.disconnect().await;
142181
Ok(())
143182
}
144183
}
184+
185+
/// High-level Hyperliquid WebSocket client that provides standardized domain methods.
186+
///
187+
/// This is the outer client that wraps the inner client and provides Nautilus-specific
188+
/// functionality for WebSocket operations using standard domain methods.
189+
#[derive(Debug)]
190+
pub struct HyperliquidWebSocketClient {
191+
inner: HyperliquidWebSocketInnerClient,
192+
}
193+
194+
impl HyperliquidWebSocketClient {
195+
/// Creates a new Hyperliquid WebSocket client.
196+
pub async fn connect(url: &str) -> Result<Self> {
197+
let inner = HyperliquidWebSocketInnerClient::connect(url).await?;
198+
Ok(Self { inner })
199+
}
200+
201+
/// Subscribe to order updates for a specific user address.
202+
pub async fn subscribe_order_updates(&mut self, user: &str) -> Result<()> {
203+
let subscription = SubscriptionRequest::OrderUpdates {
204+
user: user.to_string(),
205+
};
206+
self.inner.ws_subscribe(subscription).await
207+
}
208+
209+
/// Subscribe to user events (fills, funding, liquidations) for a specific user address.
210+
pub async fn subscribe_user_events(&mut self, user: &str) -> Result<()> {
211+
let subscription = SubscriptionRequest::UserEvents {
212+
user: user.to_string(),
213+
};
214+
self.inner.ws_subscribe(subscription).await
215+
}
216+
217+
/// Subscribe to all user channels (order updates + user events) for convenience.
218+
pub async fn subscribe_all_user_channels(&mut self, user: &str) -> Result<()> {
219+
self.subscribe_order_updates(user).await?;
220+
self.subscribe_user_events(user).await?;
221+
Ok(())
222+
}
223+
224+
/// Get the next event from the WebSocket stream.
225+
/// Returns None when the connection is closed or the receiver is exhausted.
226+
pub async fn next_event(&mut self) -> Option<HyperliquidWsMessage> {
227+
self.inner.ws_next_event().await
228+
}
229+
230+
/// Returns true if the WebSocket connection is active.
231+
pub fn is_active(&self) -> bool {
232+
self.inner.is_active()
233+
}
234+
235+
/// Returns true if the WebSocket is reconnecting.
236+
pub fn is_reconnecting(&self) -> bool {
237+
self.inner.is_reconnecting()
238+
}
239+
240+
/// Returns true if the WebSocket is disconnecting.
241+
pub fn is_disconnecting(&self) -> bool {
242+
self.inner.is_disconnecting()
243+
}
244+
245+
/// Returns true if the WebSocket is closed.
246+
pub fn is_closed(&self) -> bool {
247+
self.inner.is_closed()
248+
}
249+
250+
/// Disconnect the WebSocket client.
251+
pub async fn disconnect(&mut self) -> Result<()> {
252+
self.inner.ws_disconnect().await
253+
}
254+
255+
/// Escape hatch: send raw requests for tests/power users.
256+
pub async fn send_raw(&mut self, request: &HyperliquidWsRequest) -> Result<()> {
257+
self.inner.ws_send(request).await
258+
}
259+
}

0 commit comments

Comments
 (0)