Skip to content

Commit 4a41afc

Browse files
authored
Merge pull request #5 from hummingbot/feat/improve_ev_loop_management
(feat) add handling of running ev loop
2 parents bfe9e44 + 43909a9 commit 4a41afc

File tree

1 file changed

+77
-19
lines changed

1 file changed

+77
-19
lines changed

hummingbot_api_client/sync_client.py

Lines changed: 77 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ def __init__(
6262
self._timeout = timeout
6363
self._async_client: Optional[HummingbotAPIClient] = None
6464
self._loop: Optional[asyncio.AbstractEventLoop] = None
65-
65+
self._created_loop: bool = False
66+
6667
# Type hints for dynamically created attributes
6768
if TYPE_CHECKING:
6869
self.accounts: AccountsRouter
@@ -76,12 +77,19 @@ def __init__(
7677
self.portfolio: PortfolioRouter
7778
self.scripts: ScriptsRouter
7879
self.trading: TradingRouter
79-
80+
8081
def __enter__(self) -> 'SyncHummingbotAPIClient':
8182
"""Enter context manager and initialize the async client."""
82-
self._loop = asyncio.new_event_loop()
83-
asyncio.set_event_loop(self._loop)
84-
83+
# Check if there's already a running event loop
84+
try:
85+
self._loop = asyncio.get_running_loop()
86+
self._created_loop = False
87+
except RuntimeError:
88+
# No running loop, create a new one
89+
self._loop = asyncio.new_event_loop()
90+
asyncio.set_event_loop(self._loop)
91+
self._created_loop = True
92+
8593
# Create and initialize the async client
8694
import aiohttp
8795
timeout_obj = aiohttp.ClientTimeout(total=self._timeout) if self._timeout else None
@@ -91,20 +99,53 @@ def __enter__(self) -> 'SyncHummingbotAPIClient':
9199
self._password,
92100
timeout=timeout_obj
93101
)
94-
self._loop.run_until_complete(self._async_client.init())
95-
102+
103+
# Initialize based on whether we created the loop
104+
if self._created_loop:
105+
self._loop.run_until_complete(self._async_client.init())
106+
else:
107+
# For existing loop, schedule coroutine as a task
108+
import concurrent.futures
109+
future = concurrent.futures.Future()
110+
111+
async def init_wrapper():
112+
try:
113+
await self._async_client.init()
114+
future.set_result(None)
115+
except Exception as e:
116+
future.set_exception(e)
117+
118+
asyncio.run_coroutine_threadsafe(init_wrapper(), self._loop)
119+
future.result() # Wait for completion
120+
96121
# Dynamically create sync wrappers for all routers
97122
self._wrap_routers()
98-
123+
99124
return self
100-
125+
101126
def __exit__(self, exc_type, exc_val, exc_tb):
102127
"""Exit context manager and cleanup resources."""
103128
if self._async_client:
104-
self._loop.run_until_complete(self._async_client.close())
105-
if self._loop:
129+
if self._created_loop:
130+
self._loop.run_until_complete(self._async_client.close())
131+
else:
132+
# For existing loop, use run_coroutine_threadsafe
133+
import concurrent.futures
134+
future = concurrent.futures.Future()
135+
136+
async def close_wrapper():
137+
try:
138+
await self._async_client.close()
139+
future.set_result(None)
140+
except Exception as e:
141+
future.set_exception(e)
142+
143+
asyncio.run_coroutine_threadsafe(close_wrapper(), self._loop)
144+
future.result() # Wait for completion
145+
146+
if self._created_loop and self._loop:
106147
self._loop.close()
107-
148+
108149
def _wrap_routers(self):
109150
"""Dynamically wrap all router methods to be synchronous."""
110151
# List of router attributes on the async client
@@ -113,28 +154,45 @@ def _wrap_routers(self):
113154
'connectors', 'controllers', 'docker', 'market_data',
114155
'portfolio', 'scripts', 'trading'
115156
]
116-
157+
117158
for router_name in router_attrs:
118159
if hasattr(self._async_client, router_name):
119160
async_router = getattr(self._async_client, router_name)
120-
sync_router = SyncRouterWrapper(async_router, self._loop)
161+
sync_router = SyncRouterWrapper(async_router, self._loop, self._created_loop)
121162
setattr(self, router_name, sync_router)
122163

123164

124165
class SyncRouterWrapper:
125166
"""Wrapper that converts async router methods to sync."""
126-
127-
def __init__(self, async_router: Any, loop: asyncio.AbstractEventLoop):
167+
168+
def __init__(self, async_router: Any, loop: asyncio.AbstractEventLoop, created_loop: bool):
128169
self._async_router = async_router
129170
self._loop = loop
130-
171+
self._created_loop = created_loop
172+
131173
def __getattr__(self, name: str) -> Any:
132174
"""Dynamically wrap async methods to be synchronous."""
133175
attr = getattr(self._async_router, name)
134-
176+
135177
if asyncio.iscoroutinefunction(attr):
136178
def sync_method(*args, **kwargs):
137-
return self._loop.run_until_complete(attr(*args, **kwargs))
179+
if self._created_loop:
180+
# We created the loop, so we can use run_until_complete
181+
return self._loop.run_until_complete(attr(*args, **kwargs))
182+
else:
183+
# Using existing loop, must use run_coroutine_threadsafe
184+
import concurrent.futures
185+
future = concurrent.futures.Future()
186+
187+
async def wrapper():
188+
try:
189+
result = await attr(*args, **kwargs)
190+
future.set_result(result)
191+
except Exception as e:
192+
future.set_exception(e)
193+
194+
asyncio.run_coroutine_threadsafe(wrapper(), self._loop)
195+
return future.result()
138196
return sync_method
139197

140198
return attr

0 commit comments

Comments
 (0)