Skip to content

Commit b4ff89a

Browse files
committed
Improve position snapshot processing and incremental caching
- Add incremental snapshot caching to process only new snapshots - Add cache method `position_snapshot_ids()` - Add cache method `position_snapshot_bytes()` - Add housekeeping to prune stale snapshot entires - Add portfolio and snapshot caching tests
1 parent 3bcb40e commit b4ff89a

File tree

10 files changed

+1037
-43
lines changed

10 files changed

+1037
-43
lines changed

examples/live/binance/binance_futures_testnet_exec_tester.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@
5858
# snapshot_orders=True,
5959
# snapshot_positions=True,
6060
# snapshot_positions_interval_secs=5.0,
61+
purge_closed_orders_interval_mins=1, # Example of purging closed orders for HFT
62+
purge_closed_orders_buffer_mins=0, # Purged orders closed for at least an hour
63+
purge_closed_positions_interval_mins=1, # Example of purging closed positions for HFT
64+
purge_closed_positions_buffer_mins=0, # Purge positions closed for at least an hour
65+
purge_account_events_interval_mins=1, # Example of purging account events for HFT
66+
purge_account_events_lookback_mins=0, # Purge account events occurring more than an hour ago
67+
purge_from_database=True, # Set True with caution
68+
graceful_shutdown_on_exception=True,
6169
),
6270
cache=CacheConfig(
6371
# database=DatabaseConfig(),

nautilus_trader/cache/base.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,9 @@ cdef class CacheFacade:
189189
cpdef Position position(self, PositionId position_id)
190190
cpdef Position position_for_order(self, ClientOrderId client_order_id)
191191
cpdef PositionId position_id(self, ClientOrderId client_order_id)
192+
cpdef set[PositionId] position_snapshot_ids(self, InstrumentId instrument_id=*)
192193
cpdef list position_snapshots(self, PositionId position_id=*)
194+
cpdef list position_snapshot_bytes(self, PositionId position_id)
193195
cpdef list positions(self, Venue venue=*, InstrumentId instrument_id=*, StrategyId strategy_id=*, PositionSide side=*)
194196
cpdef list positions_open(self, Venue venue=*, InstrumentId instrument_id=*, StrategyId strategy_id=*, PositionSide side=*)
195197
cpdef list positions_closed(self, Venue venue=*, InstrumentId instrument_id=*, StrategyId strategy_id=*)

nautilus_trader/cache/base.pyx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,10 +447,18 @@ cdef class CacheFacade:
447447
"""Abstract method (implement in subclass)."""
448448
raise NotImplementedError("method `position_id` must be implemented in the subclass") # pragma: no cover
449449

450+
cpdef set[PositionId] position_snapshot_ids(self, InstrumentId instrument_id = None):
451+
"""Abstract method (implement in subclass)."""
452+
raise NotImplementedError("method `position_snapshot_ids` must be implemented in the subclass") # pragma: no cover
453+
450454
cpdef list position_snapshots(self, PositionId position_id = None):
451455
"""Abstract method (implement in subclass)."""
452456
raise NotImplementedError("method `position_snapshots` must be implemented in the subclass") # pragma: no cover
453457

458+
cpdef list position_snapshot_bytes(self, PositionId position_id):
459+
"""Abstract method (implement in subclass)."""
460+
raise NotImplementedError("method `position_snapshot_bytes` must be implemented in the subclass") # pragma: no cover
461+
454462
cpdef list positions(self, Venue venue = None, InstrumentId instrument_id = None, StrategyId strategy_id = None, PositionSide side = PositionSide.NO_POSITION_SIDE):
455463
"""Abstract method (implement in subclass)."""
456464
raise NotImplementedError("method `positions` must be implemented in the subclass") # pragma: no cover

nautilus_trader/cache/cache.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ cdef class Cache(CacheFacade):
9494
cdef dict _index_position_orders
9595
cdef dict _index_instrument_orders
9696
cdef dict _index_instrument_positions
97+
cdef dict _index_instrument_position_snapshots
9798
cdef dict _index_strategy_orders
9899
cdef dict _index_strategy_positions
99100
cdef dict _index_exec_algorithm_orders

nautilus_trader/cache/cache.pyx

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ cdef class Cache(CacheFacade):
153153
self._index_position_orders: dict[PositionId, set[ClientOrderId]] = {}
154154
self._index_instrument_orders: dict[InstrumentId, set[ClientOrderId]] = {}
155155
self._index_instrument_positions: dict[InstrumentId, set[PositionId]] = {}
156+
self._index_instrument_position_snapshots: dict[InstrumentId, set[PositionId]] = {}
156157
self._index_strategy_orders: dict[StrategyId, set[ClientOrderId]] = {}
157158
self._index_strategy_positions: dict[StrategyId, set[PositionId]] = {}
158159
self._index_exec_algorithm_orders: dict[ExecAlgorithmId, set[ClientOrderId]] = {}
@@ -959,6 +960,19 @@ cdef class Cache(CacheFacade):
959960
self._index_positions_open.discard(position_id)
960961
self._index_positions_closed.discard(position_id)
961962

963+
# Remove position snapshots and clean up index
964+
cdef set[PositionId] snapshot_position_ids
965+
cdef list[bytes] snapshots = self._position_snapshots.pop(position_id, None)
966+
967+
if snapshots is not None and position is not None:
968+
snapshot_position_ids = self._index_instrument_position_snapshots.get(position.instrument_id)
969+
if snapshot_position_ids:
970+
snapshot_position_ids.discard(position_id)
971+
972+
# Clean up
973+
if not snapshot_position_ids:
974+
self._index_instrument_position_snapshots.pop(position.instrument_id, None)
975+
962976
# Delete from database if requested
963977
if purge_from_database and self._database is not None:
964978
self._database.delete_position(position_id)
@@ -1027,6 +1041,7 @@ cdef class Cache(CacheFacade):
10271041
self._index_position_orders.clear()
10281042
self._index_instrument_orders.clear()
10291043
self._index_instrument_positions.clear()
1044+
self._index_instrument_position_snapshots.clear()
10301045
self._index_strategy_orders.clear()
10311046
self._index_strategy_positions.clear()
10321047
self._index_exec_algorithm_orders.clear()
@@ -2226,6 +2241,14 @@ cdef class Cache(CacheFacade):
22262241
else:
22272242
self._position_snapshots[position_id] = [position_pickled]
22282243

2244+
# Update snapshot index
2245+
cdef InstrumentId instrument_id = position.instrument_id
2246+
cdef set position_ids = self._index_instrument_position_snapshots.get(instrument_id)
2247+
if position_ids is not None:
2248+
position_ids.add(position_id)
2249+
else:
2250+
self._index_instrument_position_snapshots[instrument_id] = {position_id}
2251+
22292252
self._log.debug(f"Snapshot {repr(copied_position)}")
22302253

22312254
cpdef void snapshot_position_state(
@@ -4927,6 +4950,44 @@ cdef class Cache(CacheFacade):
49274950

49284951
return [pickle.loads(s) for s in snapshots]
49294952

4953+
cpdef set position_snapshot_ids(self, InstrumentId instrument_id = None):
4954+
"""
4955+
Return all position IDs for position snapshots with the given instrument filter.
4956+
4957+
Parameters
4958+
----------
4959+
instrument_id : InstrumentId, optional
4960+
The instrument ID query filter.
4961+
4962+
Returns
4963+
-------
4964+
set[PositionId]
4965+
4966+
"""
4967+
if instrument_id is not None:
4968+
return self._index_instrument_position_snapshots.get(instrument_id, set())
4969+
else:
4970+
# Return all position IDs that have snapshots
4971+
return set(self._position_snapshots.keys())
4972+
4973+
cpdef list position_snapshot_bytes(self, PositionId position_id):
4974+
"""
4975+
Return the raw pickled snapshot bytes for the given position ID.
4976+
4977+
Parameters
4978+
----------
4979+
position_id : PositionId
4980+
The position ID to get snapshot bytes for.
4981+
4982+
Returns
4983+
-------
4984+
list[bytes]
4985+
The list of pickled snapshot bytes, or empty list if no snapshots exist.
4986+
4987+
"""
4988+
Condition.not_none(position_id, "position_id")
4989+
return self._position_snapshots.get(position_id, [])
4990+
49304991
cpdef list positions(
49314992
self,
49324993
Venue venue = None,

nautilus_trader/portfolio/portfolio.pxd

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ from nautilus_trader.model.data cimport QuoteTick
2727
from nautilus_trader.model.events.account cimport AccountState
2828
from nautilus_trader.model.events.order cimport OrderEvent
2929
from nautilus_trader.model.events.position cimport PositionEvent
30+
from nautilus_trader.model.identifiers cimport AccountId
3031
from nautilus_trader.model.identifiers cimport InstrumentId
3132
from nautilus_trader.model.identifiers cimport PositionId
3233
from nautilus_trader.model.identifiers cimport Venue
@@ -55,6 +56,7 @@ cdef class Portfolio(PortfolioFacade):
5556
cdef dict[InstrumentId, Money] _unrealized_pnls
5657
cdef dict[InstrumentId, Money] _realized_pnls
5758
cdef dict[InstrumentId, Money] _snapshot_realized_pnls
59+
cdef dict[PositionId, int] _snapshot_processed_counts
5860
cdef dict[InstrumentId, Decimal] _net_positions
5961
cdef dict[PositionId, object] _bet_positions
6062
cdef object _index_bet_positions
@@ -84,7 +86,7 @@ cdef class Portfolio(PortfolioFacade):
8486
cdef void _update_instrument_id(self, InstrumentId instrument_id)
8587
cdef void _update_net_position(self, InstrumentId instrument_id, list positions_open)
8688
cdef object _net_position(self, InstrumentId instrument_id)
87-
cdef void _ensure_snapshot_pnls_cached(self)
89+
cdef void _ensure_snapshot_pnls_cached_for(self, InstrumentId instrument_id)
8890
cdef Money _calculate_realized_pnl(self, InstrumentId instrument_id)
8991
cdef Money _calculate_unrealized_pnl(self, InstrumentId instrument_id, Price price=*)
9092
cdef Price _get_price(self, Position position)

nautilus_trader/portfolio/portfolio.pyx

Lines changed: 118 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The portfolio can satisfy queries for account information, margin balances,
2424
total risk exposures and total net positions.
2525
"""
2626

27+
import pickle
2728
import warnings
2829
from collections import defaultdict
2930
from decimal import Decimal
@@ -147,6 +148,7 @@ cdef class Portfolio(PortfolioFacade):
147148
self._unrealized_pnls: dict[InstrumentId, Money] = {}
148149
self._realized_pnls: dict[InstrumentId, Money] = {}
149150
self._snapshot_realized_pnls: dict[InstrumentId, Money] = {}
151+
self._snapshot_processed_counts: dict[PositionId, int] = {}
150152
self._net_positions: dict[InstrumentId, Decimal] = {}
151153
self._bet_positions: dict[InstrumentId, object] = {}
152154
self._index_bet_positions: dict[InstrumentId, set[PositionId]] = defaultdict(set)
@@ -683,6 +685,7 @@ cdef class Portfolio(PortfolioFacade):
683685
self._unrealized_pnls.clear()
684686
self._pending_calcs.clear()
685687
self._snapshot_realized_pnls.clear()
688+
self._snapshot_processed_counts.clear()
686689
self.analyzer.reset()
687690

688691
self.initialized = False
@@ -709,44 +712,120 @@ cdef class Portfolio(PortfolioFacade):
709712
self._reset()
710713
self._log.info("DISPOSED")
711714

712-
cdef void _ensure_snapshot_pnls_cached(self):
713-
if self._snapshot_realized_pnls:
714-
return # Already cached
715+
cdef void _ensure_snapshot_pnls_cached_for(self, InstrumentId instrument_id):
716+
# Get all positions for this instrument (both open and closed)
717+
cdef set[PositionId] instrument_position_ids = self._cache.position_ids(venue=None, instrument_id=instrument_id)
715718

716-
cdef list[Position] all_snapshots = self._cache.position_snapshots()
717-
cdef Position snapshot
718-
cdef InstrumentId instrument_id
719-
cdef Money existing_pnl
719+
# Get snapshot position IDs for this instrument from index
720+
cdef set[PositionId] snapshot_position_ids = self._cache.position_snapshot_ids(instrument_id)
720721

721-
for snapshot in all_snapshots:
722-
if snapshot.realized_pnl is None:
723-
continue
722+
# Combine all position IDs (active and snapshot)
723+
cdef set[PositionId] position_ids = set()
724724

725-
instrument_id = snapshot.instrument_id
726-
existing_pnl = self._snapshot_realized_pnls.get(instrument_id)
725+
if instrument_position_ids:
726+
position_ids.update(instrument_position_ids)
727727

728-
if existing_pnl is not None:
729-
if existing_pnl.currency == snapshot.realized_pnl.currency:
730-
# Same currency, add the amounts
731-
self._snapshot_realized_pnls[instrument_id] = Money(
732-
existing_pnl.as_double() + snapshot.realized_pnl.as_double(),
733-
existing_pnl.currency,
734-
)
728+
position_ids.update(snapshot_position_ids)
729+
730+
if not position_ids:
731+
# Clear stale cached total when no positions or snapshots exist
732+
self._snapshot_realized_pnls.pop(instrument_id, None)
733+
return # No positions or snapshots for this instrument
734+
735+
cdef bint rebuild = False
736+
737+
cdef:
738+
PositionId position_id
739+
list position_id_snapshots
740+
int prev_count
741+
int curr_count
742+
743+
# Detect purge/reset (count regression) to trigger full rebuild for this instrument
744+
for position_id in position_ids:
745+
position_id_snapshots = self._cache.position_snapshot_bytes(position_id)
746+
curr_count = len(position_id_snapshots)
747+
prev_count = self._snapshot_processed_counts.get(position_id, 0)
748+
if prev_count > curr_count:
749+
rebuild = True
750+
break
751+
752+
cdef:
753+
Money existing_pnl
754+
Position snapshot
755+
756+
if rebuild:
757+
# Rebuild from scratch for this instrument
758+
self._snapshot_realized_pnls.pop(instrument_id, None)
759+
760+
for position_id in position_ids:
761+
position_id_snapshots = self._cache.position_snapshot_bytes(position_id)
762+
curr_count = len(position_id_snapshots)
763+
if curr_count:
764+
for s in position_id_snapshots:
765+
snapshot = pickle.loads(s)
766+
if snapshot.realized_pnl is None:
767+
continue
768+
769+
# Aggregate if same currency; otherwise log and keep existing
770+
existing_pnl = self._snapshot_realized_pnls.get(instrument_id)
771+
if existing_pnl is not None:
772+
if existing_pnl.currency == snapshot.realized_pnl.currency:
773+
self._snapshot_realized_pnls[instrument_id] = Money(
774+
existing_pnl.as_double() + snapshot.realized_pnl.as_double(),
775+
existing_pnl.currency,
776+
)
777+
else:
778+
self._log.warning(
779+
f"Cannot aggregate snapshot PnLs with different currencies for {instrument_id}: "
780+
f"{existing_pnl.currency} vs {snapshot.realized_pnl.currency}",
781+
)
782+
else:
783+
self._snapshot_realized_pnls[instrument_id] = snapshot.realized_pnl
784+
self._snapshot_processed_counts[position_id] = curr_count
785+
return
786+
787+
# Incremental path: only unpickle new entries
788+
for position_id in position_ids:
789+
position_id_snapshots = self._cache.position_snapshot_bytes(position_id)
790+
curr_count = len(position_id_snapshots)
791+
if curr_count == 0:
792+
continue
793+
prev_count = self._snapshot_processed_counts.get(position_id, 0)
794+
if prev_count >= curr_count:
795+
continue
796+
for idx in range(prev_count, curr_count):
797+
snapshot = pickle.loads(position_id_snapshots[idx])
798+
if snapshot.realized_pnl is None:
799+
continue
800+
existing_pnl = self._snapshot_realized_pnls.get(instrument_id)
801+
if existing_pnl is not None:
802+
if existing_pnl.currency == snapshot.realized_pnl.currency:
803+
self._snapshot_realized_pnls[instrument_id] = Money(
804+
existing_pnl.as_double() + snapshot.realized_pnl.as_double(),
805+
existing_pnl.currency,
806+
)
807+
else:
808+
self._log.warning(
809+
f"Cannot aggregate snapshot PnLs with different currencies for {instrument_id}: "
810+
f"{existing_pnl.currency} vs {snapshot.realized_pnl.currency}",
811+
)
735812
else:
736-
# Different currencies - log warning
737-
self._log.warning(
738-
f"Cannot aggregate snapshot PnLs with different currencies for {instrument_id}: "
739-
f"{existing_pnl.currency} vs {snapshot.realized_pnl.currency}",
740-
)
741-
else:
742-
# First snapshot for this instrument
743-
self._snapshot_realized_pnls[instrument_id] = Money(
744-
snapshot.realized_pnl.as_double(),
745-
snapshot.realized_pnl.currency,
746-
)
813+
self._snapshot_realized_pnls[instrument_id] = snapshot.realized_pnl
814+
self._snapshot_processed_counts[position_id] = curr_count
815+
816+
# Prune stale entries from processed counts
817+
cdef PositionId stale_position_id
818+
cdef list[PositionId] stale_ids = []
747819

748-
if self._debug and self._snapshot_realized_pnls:
749-
self._log.debug(f"Cached snapshot realized PnLs for {len(self._snapshot_realized_pnls)} instruments")
820+
for stale_position_id in self._snapshot_processed_counts:
821+
if stale_position_id not in position_ids:
822+
stale_ids.append(stale_position_id)
823+
824+
for stale_position_id in stale_ids:
825+
self._snapshot_processed_counts.pop(stale_position_id, None)
826+
827+
if self._debug and self._snapshot_realized_pnls.get(instrument_id) is not None:
828+
self._log.debug(f"Cached snapshot realized PnL for {instrument_id}")
750829

751830
# -- QUERIES --------------------------------------------------------------------------------------
752831

@@ -1533,8 +1612,8 @@ cdef class Portfolio(PortfolioFacade):
15331612
else:
15341613
currency = instrument.get_cost_currency()
15351614

1536-
# Ensure snapshot PnLs are cached (one-time cost)
1537-
self._ensure_snapshot_pnls_cached()
1615+
# Ensure snapshot PnLs for this instrument are cached (incremental)
1616+
self._ensure_snapshot_pnls_cached_for(instrument_id)
15381617

15391618
cdef:
15401619
list[Position] positions
@@ -1551,6 +1630,9 @@ cdef class Portfolio(PortfolioFacade):
15511630
instrument_id=instrument_id,
15521631
)
15531632

1633+
if self._debug:
1634+
self._log.debug(f"Found {len(positions)} positions for {instrument_id}")
1635+
15541636
if not positions:
15551637
# Check if we have cached snapshot PnL for this instrument
15561638
cached_snapshot_pnl = self._snapshot_realized_pnls.get(instrument_id)
@@ -1578,6 +1660,8 @@ cdef class Portfolio(PortfolioFacade):
15781660
if cached_snapshot_pnl_for_instrument is None:
15791661
pass # No snapshot PnL to add
15801662
elif cached_snapshot_pnl_for_instrument.currency == currency:
1663+
if self._debug:
1664+
self._log.debug(f"Adding cached snapshot PnL: {cached_snapshot_pnl_for_instrument}")
15811665
total_pnl += cached_snapshot_pnl_for_instrument.as_double()
15821666
else:
15831667
snapshot_xrate = self._cache.get_xrate(

0 commit comments

Comments
 (0)