Skip to content

Commit 5d9f251

Browse files
River.Shiclaude
andcommitted
feat: Add SubscriptionManagementSystem with dynamic subscribe/unsubscribe support across all exchanges
Introduce a new SubscriptionManagementSystem (SMS) to centralize subscription management with support for both subscribing and unsubscribing from market data streams. This enhancement enables dynamic strategy execution where data subscriptions can be added or removed at runtime. Key changes: - Add new SubscriptionManagementSystem class with queue-based subscription handling - Implement unsubscribe methods for all data types (trade, bookl1, bookl2, kline, funding rate, index price, mark price) - Add support for kline and volume kline aggregator unsubscription with proper cleanup - Extend PublicConnector abstract base class with unsubscribe method signatures - Update all exchange connectors (Binance, Bybit, OKX, Bitget, Hyperliquid) with unsubscribe implementations - Integrate SMS into Engine for centralized subscription lifecycle management - Add DataReady tracking for subscription readiness with timeout support - Update version to 0.2.20 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent e6176c3 commit 5d9f251

File tree

21 files changed

+1594
-379
lines changed

21 files changed

+1594
-379
lines changed

nexustrader/base/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from nexustrader.base.api_client import ApiClient
44
from nexustrader.base.oms import OrderManagementSystem
55
from nexustrader.base.ems import ExecutionManagementSystem
6+
from nexustrader.base.sms import SubscriptionManagementSystem
67
from nexustrader.base.connector import (
78
PublicConnector,
89
PrivateConnector,
@@ -18,6 +19,7 @@
1819
"OrderManagementSystem",
1920
"ExecutionManagementSystem",
2021
"PublicConnector",
22+
"SubscriptionManagementSystem",
2123
"PrivateConnector",
2224
"MockLinearConnector",
2325
"RetryManager",

nexustrader/base/connector.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,21 @@ async def subscribe_trade(self, symbol: str | List[str]):
139139
"""Subscribe to the trade data"""
140140
pass
141141

142+
@abstractmethod
143+
async def unsubscribe_trade(self, symbol: str | List[str]):
144+
"""Unsubscribe from the trade data"""
145+
pass
146+
142147
@abstractmethod
143148
async def subscribe_bookl1(self, symbol: str | List[str]):
144149
"""Subscribe to the bookl1 data"""
145150
pass
146151

152+
@abstractmethod
153+
async def unsubscribe_bookl1(self, symbol: str | List[str]):
154+
"""Unsubscribe from the bookl1 data"""
155+
pass
156+
147157
@abstractmethod
148158
async def subscribe_kline(
149159
self,
@@ -159,6 +169,15 @@ async def subscribe_kline(
159169
"""
160170
pass
161171

172+
@abstractmethod
173+
async def unsubscribe_kline(
174+
self,
175+
symbol: str | List[str],
176+
interval: KlineInterval,
177+
):
178+
"""Unsubscribe from the kline data"""
179+
pass
180+
162181
async def subscribe_kline_aggregator(
163182
self,
164183
symbol: str,
@@ -184,6 +203,33 @@ async def subscribe_kline_aggregator(
184203
f"Time kline aggregator created for {symbol} with interval {interval}"
185204
)
186205

206+
async def unsubscribe_kline_aggregator(
207+
self,
208+
symbol: str,
209+
interval: KlineInterval,
210+
):
211+
"""Unsubscribe from time-based kline data using TimeKlineAggregator
212+
213+
Args:
214+
symbol: Symbol to unsubscribe from
215+
interval: Kline interval
216+
"""
217+
aggregators = self._aggregators.get(symbol, [])
218+
# Create a copy to avoid "list changed size during iteration" error
219+
for aggregator in aggregators.copy():
220+
if (
221+
isinstance(aggregator, TimeKlineAggregator)
222+
and aggregator.interval == interval
223+
):
224+
aggregator.stop()
225+
aggregators.remove(aggregator)
226+
self._log.info(
227+
f"Time kline aggregator stopped for {symbol} with interval {interval}"
228+
)
229+
if not aggregators:
230+
self._aggregators.pop(symbol, None)
231+
await self.unsubscribe_trade(symbol)
232+
187233
async def subscribe_volume_kline_aggregator(
188234
self, symbol: str, volume_threshold: float, volume_type: str
189235
):
@@ -206,26 +252,70 @@ async def subscribe_volume_kline_aggregator(
206252
f"Volume kline aggregator created for {symbol} with threshold {volume_threshold} and type {volume_type}"
207253
)
208254

255+
async def unsubscribe_volume_kline_aggregator(
256+
self, symbol: str, volume_threshold: float, volume_type: str
257+
):
258+
"""Unsubscribe from volume-based kline data using VolumeKlineAggregator
259+
260+
Args:
261+
symbol: Symbol to unsubscribe from
262+
volume_threshold: Volume threshold for the kline aggregator
263+
"""
264+
aggregators = self._aggregators.get(symbol, [])
265+
# Create a copy to avoid "list changed size during iteration" error
266+
for aggregator in aggregators.copy():
267+
if (
268+
isinstance(aggregator, VolumeKlineAggregator)
269+
and aggregator.volume_threshold == volume_threshold
270+
):
271+
aggregators.remove(aggregator)
272+
self._log.info(
273+
f"Volume kline aggregator stopped for {symbol} with threshold {volume_threshold} and type {volume_type}"
274+
)
275+
if not aggregators:
276+
self._aggregators.pop(symbol, None)
277+
await self.unsubscribe_trade(symbol)
278+
209279
@abstractmethod
210280
async def subscribe_bookl2(self, symbol: str | List[str], level: BookLevel):
211281
"""Subscribe to the bookl2 data"""
212282
pass
213283

284+
@abstractmethod
285+
async def unsubscribe_bookl2(self, symbol: str | List[str], level: BookLevel):
286+
"""Unsubscribe from the bookl2 data"""
287+
pass
288+
214289
@abstractmethod
215290
async def subscribe_funding_rate(self, symbol: str | List[str]):
216291
"""Subscribe to the funding rate data"""
217292
pass
218293

294+
@abstractmethod
295+
async def unsubscribe_funding_rate(self, symbol: str | List[str]):
296+
"""Unsubscribe from the funding rate data"""
297+
pass
298+
219299
@abstractmethod
220300
async def subscribe_index_price(self, symbol: str | List[str]):
221301
"""Subscribe to the index price data"""
222302
pass
223303

304+
@abstractmethod
305+
async def unsubscribe_index_price(self, symbol: str | List[str]):
306+
"""Unsubscribe from the index price data"""
307+
pass
308+
224309
@abstractmethod
225310
async def subscribe_mark_price(self, symbol: str | List[str]):
226311
"""Subscribe to the mark price data"""
227312
pass
228313

314+
@abstractmethod
315+
async def unsubscribe_mark_price(self, symbol: str | List[str]):
316+
"""Unsubscribe from the mark price data"""
317+
pass
318+
229319
def _create_time_kline_aggregator(
230320
self,
231321
symbol: str,

0 commit comments

Comments
 (0)