Skip to content

Commit e6176c3

Browse files
author
River.Shi
committed
feat: Implement Redis backend for parameter storage and retrieval in AsyncCache; update version to 0.2.19
1 parent 3fe8741 commit e6176c3

File tree

12 files changed

+250
-59
lines changed

12 files changed

+250
-59
lines changed

nexustrader/base/connector.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ def __init__(
8787

8888
# Aggregator management - key: symbol, value: list of aggregators
8989
self._aggregators: Dict[str, List[KlineAggregator]] = {}
90-
self._msgbus.subscribe(topic="trade", handler=self._handle_trade_for_aggregators)
90+
self._msgbus.subscribe(
91+
topic="trade", handler=self._handle_trade_for_aggregators
92+
)
9193

9294
@property
9395
def account_type(self):
@@ -173,7 +175,9 @@ async def subscribe_kline_aggregator(
173175
await self.subscribe_trade(symbol)
174176

175177
# Create and register time kline aggregator
176-
aggregator = self._create_time_kline_aggregator(symbol, interval, build_with_no_updates)
178+
aggregator = self._create_time_kline_aggregator(
179+
symbol, interval, build_with_no_updates
180+
)
177181
self._add_aggregator(symbol, aggregator)
178182

179183
self._log.info(

nexustrader/base/ems.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ def __init__(
5858
] = {}
5959
self._private_connectors: Dict[AccountType, PrivateConnector] | None = None
6060
self._is_mock = is_mock
61-
62-
6361

6462
def _build(self, private_connectors: Dict[AccountType, PrivateConnector]):
6563
self._private_connectors = private_connectors
@@ -134,7 +132,7 @@ def _price_to_precision(
134132
precision_decimal, rounding=ROUND_FLOOR
135133
) * exp
136134
return format_price
137-
135+
138136
@abstractmethod
139137
def _instrument_id_to_account_type(
140138
self, instrument_id: InstrumentId

nexustrader/base/ws_client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def on_ws_disconnected(self, transport: WSTransport) -> None:
7878
transport (picows.WSTransport): WebSocket transport instance
7979
"""
8080
self._log.debug("Disconnected from Websocket.")
81-
81+
8282
def _decode_frame(self, frame: WSFrame) -> str:
8383
"""Decode the payload of a WebSocket frame safely.
8484
@@ -119,7 +119,7 @@ def on_ws_frame(self, transport: WSTransport, frame: WSFrame) -> None:
119119
return
120120
except Exception as e:
121121
import traceback
122-
122+
123123
self._log.error(
124124
f"Error processing message: {str(e)}\nTraceback: {traceback.format_exc()}\nws_frame: {self._decode_frame(frame)}"
125125
)
@@ -205,6 +205,9 @@ async def _connection_handler(self):
205205
await asyncio.sleep(self._reconnect_interval)
206206

207207
def _send(self, payload: dict):
208+
if not self.connected:
209+
self._log.warning(f"Websocket not connected. drop msg: {str(payload)}")
210+
return
208211
self._transport.send(WSMsgType.TEXT, msgspec.json.encode(payload))
209212

210213
def disconnect(self):

nexustrader/constants.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from throttled.asyncio import Throttled
99
from nautilus_trader.core.nautilus_pyo3 import LogColor # noqa
1010

11+
BACKEND_LITERAL = Literal["memory", "redis"]
12+
1113

1214
def is_sphinx_build():
1315
return "sphinx" in sys.modules
@@ -39,15 +41,8 @@ def get_postgresql_config():
3941
}
4042

4143

42-
def get_redis_config(in_docker: bool = False):
44+
def get_redis_config():
4345
try:
44-
if in_docker:
45-
return {
46-
"host": "redis",
47-
"db": settings.REDIS_DB,
48-
"password": settings.REDIS_PASSWORD,
49-
}
50-
5146
return {
5247
"host": settings.REDIS_HOST,
5348
"port": settings.REDIS_PORT,
@@ -356,6 +351,11 @@ class StorageType(Enum):
356351
POSTGRESQL = "postgresql"
357352

358353

354+
class ParamBackend(Enum):
355+
MEMORY = "memory"
356+
REDIS = "redis"
357+
358+
359359
class RateLimiter:
360360
@abstractmethod
361361
def __call__(self, endpoint: str) -> Throttled: ...

nexustrader/core/cache.py

Lines changed: 145 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
BookL2,
2424
)
2525
from nexustrader.constants import STATUS_TRANSITIONS, AccountType, KlineInterval
26-
from nexustrader.core.entity import TaskManager
26+
from nexustrader.core.entity import TaskManager, get_redis_client_if_available
2727
from nexustrader.core.nautilius_core import LiveClock, MessageBus, Logger
28-
from nexustrader.constants import StorageType
28+
from nexustrader.constants import StorageType, ParamBackend
2929
from nexustrader.backends import SQLiteBackend, PostgreSQLBackend
3030

3131

@@ -112,7 +112,10 @@ def __init__(
112112
self._position_lock = threading.RLock() # Lock for position updates
113113
self._order_lock = threading.RLock() # Lock for order updates
114114
self._balance_lock = threading.RLock() # Lock for balance updates
115-
self._param_lock = threading.RLock() # Lock for parameter updates
115+
116+
# Redis client for parameter storage (lazy initialization)
117+
self._redis_client = None
118+
self._redis_client_initialized = False
116119

117120
################# # base functions ####################
118121

@@ -463,24 +466,151 @@ def get_open_orders(
463466

464467
################ # parameter cache ###################
465468

466-
def get_param(self, key: str, default: Any = None) -> Any:
467-
"""Get a parameter from the cache"""
468-
with self._param_lock:
469+
def _ensure_redis_client(self) -> None:
470+
"""
471+
Lazy initialization of Redis client.
472+
Raises RuntimeError if Redis is not available.
473+
"""
474+
if not self._redis_client_initialized:
475+
self._redis_client = get_redis_client_if_available()
476+
self._redis_client_initialized = True
477+
if self._redis_client is None:
478+
raise RuntimeError(
479+
"Redis client is not available. Please ensure Redis is running and configured properly."
480+
)
481+
482+
def _get_redis_param_key(self, key: str) -> str:
483+
"""Generate Redis key for parameter storage"""
484+
return f"nexus:param:{self.strategy_id}:{self.user_id}:{key}"
485+
486+
def get_param(
487+
self,
488+
key: str,
489+
default: Any = None,
490+
param_backend: ParamBackend = ParamBackend.MEMORY,
491+
) -> Any:
492+
"""
493+
Get a parameter from the cache
494+
495+
Args:
496+
key: Parameter key
497+
default: Default value if key not found
498+
param_backend: Storage backend (MEMORY or REDIS)
499+
500+
Returns:
501+
Parameter value or default
502+
503+
Raises:
504+
RuntimeError: If param_backend is REDIS but Redis is not available
505+
"""
506+
if param_backend == ParamBackend.REDIS:
507+
self._ensure_redis_client()
508+
redis_key = self._get_redis_param_key(key)
509+
value = self._redis_client.get(redis_key)
510+
if value is None:
511+
return default
512+
# Deserialize the value
513+
return msgspec.json.decode(value)
514+
else:
469515
return self._mem_params.get(key, default)
470516

471-
def set_param(self, key: str, value: Any) -> None:
472-
"""Set a parameter in the cache"""
473-
with self._param_lock:
517+
def set_param(
518+
self, key: str, value: Any, param_backend: ParamBackend = ParamBackend.MEMORY
519+
) -> None:
520+
"""
521+
Set a parameter in the cache
522+
523+
Args:
524+
key: Parameter key
525+
value: Parameter value
526+
param_backend: Storage backend (MEMORY or REDIS)
527+
528+
Raises:
529+
RuntimeError: If param_backend is REDIS but Redis is not available
530+
"""
531+
if param_backend == ParamBackend.REDIS:
532+
self._ensure_redis_client()
533+
redis_key = self._get_redis_param_key(key)
534+
# Serialize the value
535+
serialized_value = msgspec.json.encode(value)
536+
self._redis_client.set(redis_key, serialized_value)
537+
else:
474538
self._mem_params[key] = value
475539

476-
def get_all_params(self) -> Dict[str, Any]:
477-
"""Get all parameters from the cache"""
478-
with self._param_lock:
540+
def get_all_params(
541+
self, param_backend: ParamBackend = ParamBackend.MEMORY
542+
) -> Dict[str, Any]:
543+
"""
544+
Get all parameters from the cache
545+
546+
Args:
547+
param_backend: Storage backend (MEMORY or REDIS)
548+
549+
Returns:
550+
Dictionary of all parameters
551+
552+
Raises:
553+
RuntimeError: If param_backend is REDIS but Redis is not available
554+
"""
555+
if param_backend == ParamBackend.REDIS:
556+
self._ensure_redis_client()
557+
pattern = f"nexus:param:{self.strategy_id}:{self.user_id}:*"
558+
params = {}
559+
# Use SCAN to iterate over keys matching the pattern
560+
cursor = 0
561+
while True:
562+
cursor, keys = self._redis_client.scan(cursor, match=pattern, count=100)
563+
for redis_key in keys:
564+
# Extract the parameter key from the Redis key
565+
key = (
566+
redis_key.decode()
567+
if isinstance(redis_key, bytes)
568+
else redis_key
569+
)
570+
param_key = key.split(":", 4)[-1] # Get the part after the last ':'
571+
value = self._redis_client.get(redis_key)
572+
if value is not None:
573+
params[param_key] = msgspec.json.decode(value)
574+
if cursor == 0:
575+
break
576+
return params
577+
else:
479578
return self._mem_params.copy()
480579

481-
def clear_param(self, key: Optional[str] = None) -> None:
482-
"""Clear parameter(s) from the cache"""
483-
with self._param_lock:
580+
def clear_param(
581+
self,
582+
key: Optional[str] = None,
583+
param_backend: ParamBackend = ParamBackend.MEMORY,
584+
) -> None:
585+
"""
586+
Clear parameter(s) from the cache
587+
588+
Args:
589+
key: Parameter key to clear (None to clear all)
590+
param_backend: Storage backend (MEMORY or REDIS)
591+
592+
Raises:
593+
RuntimeError: If param_backend is REDIS but Redis is not available
594+
"""
595+
if param_backend == ParamBackend.REDIS:
596+
self._ensure_redis_client()
597+
if key is None:
598+
# Clear all parameters matching the pattern
599+
pattern = f"nexus:param:{self.strategy_id}:{self.user_id}:*"
600+
cursor = 0
601+
while True:
602+
cursor, keys = self._redis_client.scan(
603+
cursor, match=pattern, count=100
604+
)
605+
if keys:
606+
self._redis_client.delete(*keys)
607+
if cursor == 0:
608+
break
609+
else:
610+
# Clear specific parameter
611+
redis_key = self._get_redis_param_key(key)
612+
self._redis_client.delete(redis_key)
613+
else:
484614
if key is None:
485615
# Clear all parameters
486616
self._mem_params.clear()

nexustrader/core/entity.py

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,8 @@ def is_redis_available() -> bool:
6969

7070
# Check if Redis server is accessible
7171
try:
72-
# Detect if running in Docker
73-
in_docker = False
74-
try:
75-
socket.gethostbyname("redis")
76-
in_docker = True
77-
except socket.gaierror:
78-
pass
79-
8072
# Get Redis config and test connection
81-
redis_config = get_redis_config(in_docker)
73+
redis_config = get_redis_config()
8274
client = redis.Redis(**redis_config)
8375
client.ping() # This will raise an exception if Redis is not accessible
8476
client.close()
@@ -98,18 +90,8 @@ def get_redis_client_if_available():
9890
return None
9991

10092
try:
101-
import socket
10293
from nexustrader.constants import get_redis_config
103-
104-
# Detect if running in Docker
105-
in_docker = False
106-
try:
107-
socket.gethostbyname("redis")
108-
in_docker = True
109-
except socket.gaierror:
110-
pass
111-
112-
redis_config = get_redis_config(in_docker)
94+
redis_config = get_redis_config()
11395
return redis.Redis(**redis_config)
11496
except Exception:
11597
return None

nexustrader/strategy.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@
4949
OrderSide,
5050
OrderType,
5151
TimeInForce,
52+
ParamBackend,
5253
# PositionSide,
5354
AccountType,
5455
SubmitType,
5556
ExchangeType,
5657
KlineInterval,
5758
TriggerType,
59+
BACKEND_LITERAL,
5860
)
5961

6062

@@ -1145,7 +1147,11 @@ def _on_mark_price(self, mark_price: MarkPrice):
11451147
self._subscriptions_ready[DataType.MARK_PRICE].input(mark_price)
11461148

11471149
def param(
1148-
self, name: str, value: Optional[Any] = None, default: Optional[Any] = None
1150+
self,
1151+
name: str,
1152+
value: Optional[Any] = None,
1153+
default: Optional[Any] = None,
1154+
backend: BACKEND_LITERAL = "memory",
11491155
) -> Any:
11501156
"""
11511157
Get or set a parameter in the cache.
@@ -1164,15 +1170,18 @@ def param(
11641170
# Get a parameter
11651171
rolling_n = self.param('rolling_n')
11661172
"""
1173+
param_backend = ParamBackend(backend)
11671174
if value is not None:
11681175
# Set parameter
1169-
self.cache.set_param(name, value)
1176+
self.cache.set_param(name, value, param_backend)
11701177
return None
11711178
else:
11721179
# Get parameter
1173-
return self.cache.get_param(name, default)
1180+
return self.cache.get_param(name, default, param_backend)
11741181

1175-
def clear_param(self, name: Optional[str] = None) -> None:
1182+
def clear_param(
1183+
self, name: Optional[str] = None, backend: BACKEND_LITERAL = "memory"
1184+
) -> None:
11761185
"""
11771186
Clear parameter(s) from the cache.
11781187
@@ -1186,7 +1195,7 @@ def clear_param(self, name: Optional[str] = None) -> None:
11861195
# Clear all parameters
11871196
self.clear_param()
11881197
"""
1189-
self.cache.clear_param(name)
1198+
self.cache.clear_param(name, ParamBackend(backend))
11901199

11911200
def set_timer(
11921201
self,

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "nexustrader"
3-
version = "0.2.18"
3+
version = "0.2.19"
44
description = "fastest python trading bot"
55
authors = [
66
{name = "River-Shi",email = "[email protected]"}

0 commit comments

Comments
 (0)