@@ -82,6 +82,9 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
8282 max_pool_connections : int = 10
8383 """Max number of connections for async Dynamodb operations"""
8484
85+ keepalive_timeout : float = 12.0
86+ """Keep-alive timeout in seconds for async Dynamodb connections."""
87+
8588
8689class DynamoDBOnlineStore (OnlineStore ):
8790 """
@@ -97,7 +100,9 @@ class DynamoDBOnlineStore(OnlineStore):
97100
98101 async def initialize (self , config : RepoConfig ):
99102 await _get_aiodynamodb_client (
100- config .online_store .region , config .online_store .max_pool_connections
103+ config .online_store .region ,
104+ config .online_store .max_pool_connections ,
105+ config .online_store .keepalive_timeout ,
101106 )
102107
103108 async def close (self ):
@@ -272,7 +277,9 @@ async def online_write_batch_async(
272277 for entity_key , features , timestamp , _ in _latest_data_to_write (data )
273278 ]
274279 client = await _get_aiodynamodb_client (
275- online_config .region , config .online_store .max_pool_connections
280+ online_config .region ,
281+ online_config .max_pool_connections ,
282+ online_config .keepalive_timeout ,
276283 )
277284 await dynamo_write_items_async (client , table_name , items )
278285
@@ -377,7 +384,9 @@ def to_tbl_resp(raw_client_response):
377384 entity_id_batches .append (entity_id_batch )
378385
379386 client = await _get_aiodynamodb_client (
380- online_config .region , online_config .max_pool_connections
387+ online_config .region ,
388+ online_config .max_pool_connections ,
389+ online_config .keepalive_timeout ,
381390 )
382391 response_batches = await asyncio .gather (
383392 * [
@@ -536,14 +545,19 @@ def _get_aioboto_session():
536545 return _aioboto_session
537546
538547
539- async def _get_aiodynamodb_client (region : str , max_pool_connections : int ):
548+ async def _get_aiodynamodb_client (
549+ region : str , max_pool_connections : int , keepalive_timeout : float
550+ ):
540551 global _aioboto_client
541552 if _aioboto_client is None :
542553 logger .debug ("initializing the aiobotocore dynamodb client" )
543554 client_context = _get_aioboto_session ().create_client (
544555 "dynamodb" ,
545556 region_name = region ,
546- config = AioConfig (max_pool_connections = max_pool_connections ),
557+ config = AioConfig (
558+ max_pool_connections = max_pool_connections ,
559+ connector_args = {"keepalive_timeout" : keepalive_timeout },
560+ ),
547561 )
548562 context_stack = contextlib .AsyncExitStack ()
549563 _aioboto_client = await context_stack .enter_async_context (client_context )
0 commit comments