Skip to content

Commit 8984c1b

Browse files
committed
Use aclosing() around ws async gen
1 parent 2e3cac1 commit 8984c1b

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

piker/brokers/kraken/broker.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
Union,
3333
)
3434

35+
from async_generator import aclosing
3536
from bidict import bidict
3637
import pendulum
3738
import trio
@@ -316,6 +317,7 @@ async def trades_dialogue(
316317
),
317318
) as ws,
318319
trio.open_nursery() as n,
320+
aclosing(stream_messages(ws)) as stream,
319321
):
320322
# task local msg dialog tracking
321323
emsflow: dict[
@@ -339,7 +341,7 @@ async def trades_dialogue(
339341

340342
# enter relay loop
341343
await handle_order_updates(
342-
ws,
344+
stream,
343345
ems_stream,
344346
emsflow,
345347
ids,
@@ -351,7 +353,7 @@ async def trades_dialogue(
351353

352354

353355
async def handle_order_updates(
354-
ws: NoBsWs,
356+
ws_stream: NoBsWs,
355357
ems_stream: tractor.MsgStream,
356358
emsflow: dict[str, list[MsgUnion]],
357359
ids: bidict[str, int],
@@ -372,7 +374,7 @@ async def handle_order_updates(
372374
# on new trade clearing events (aka order "fills")
373375
trans: list[pp.Transaction]
374376

375-
async for msg in stream_messages(ws):
377+
async for msg in ws_stream:
376378
match msg:
377379
# process and relay clearing trade events to ems
378380
# https://docs.kraken.com/websockets/#message-ownTrades

0 commit comments

Comments
 (0)