|
1 | | -import atexit |
2 | 1 | import logging |
3 | | -import threading |
4 | 2 | import uuid |
5 | 3 | from concurrent.futures import ThreadPoolExecutor |
6 | 4 | from datetime import datetime, timezone |
@@ -287,11 +285,6 @@ def __init__( |
287 | 285 | self.thread_pool_executor_worker_count = ( |
288 | 286 | registry_config.thread_pool_executor_worker_count |
289 | 287 | ) |
290 | | - if self.thread_pool_executor_worker_count > 0: |
291 | | - self._executor = ThreadPoolExecutor( |
292 | | - max_workers=self.thread_pool_executor_worker_count |
293 | | - ) |
294 | | - atexit.register(self._exit_handler) |
295 | 288 | self.purge_feast_metadata = registry_config.purge_feast_metadata |
296 | 289 | # Sync feast_metadata to projects table |
297 | 290 | # when purge_feast_metadata is set to True, Delete data from |
@@ -990,17 +983,14 @@ def process_project(project: Project): |
990 | 983 | r.infra.CopyFrom(self.get_infra(project_name).to_proto()) |
991 | 984 |
|
992 | 985 | projects_list = self.list_projects(allow_cache=False) |
993 | | - if self._executor: |
994 | | - logger.info( |
995 | | - f"Thread count before executor.map: {len(threading.enumerate())}" |
996 | | - ) |
997 | | - self._executor.map(process_project, projects_list) |
998 | | - logger.info( |
999 | | - f"Thread count after executor.map: {len(threading.enumerate())}" |
1000 | | - ) |
| 986 | + if self.thread_pool_executor_worker_count == 0: |
| 987 | + for project in projects_list: |
| 988 | + process_project(project) |
1001 | 989 | else: |
1002 | | - for p in projects_list: |
1003 | | - process_project(p) |
| 990 | + with ThreadPoolExecutor( |
| 991 | + max_workers=self.thread_pool_executor_worker_count |
| 992 | + ) as executor: |
| 993 | + executor.map(process_project, projects_list) |
1004 | 994 |
|
1005 | 995 | if last_updated_timestamps: |
1006 | 996 | r.last_updated.FromDatetime(max(last_updated_timestamps)) |
@@ -1427,10 +1417,3 @@ def get_project_metadata( |
1427 | 1417 | datetime.utcfromtimestamp(int(metadata_value)) |
1428 | 1418 | ) |
1429 | 1419 | return project_metadata_model |
1430 | | - |
1431 | | - def _exit_handler(self): |
1432 | | - if self._executor: |
1433 | | - logger.info("Shutting down SqlRegistry's ThreadPoolExecutor...") |
1434 | | - self._executor.shutdown(wait=False, cancel_futures=True) |
1435 | | - logger.info("ThreadPoolExecutor shut down successfully.") |
1436 | | - self._executor = None |
0 commit comments