11
11
import docker
12
12
import schedule
13
13
from opentelemetry import metrics
14
+ from opentelemetry import trace
14
15
15
16
from opik_backend .executor import CodeExecutorBase , ExecutionResult
16
17
from opik_backend .scoring_commands import PYTHON_SCORING_COMMAND
@@ -82,6 +83,8 @@ def __init__(self):
82
83
# Start the pool monitor
83
84
self ._start_pool_monitor ()
84
85
86
+ self .tracer = trace .get_tracer (__name__ )
87
+
85
88
atexit .register (self .cleanup )
86
89
87
90
def _start_pool_monitor (self ):
@@ -160,36 +163,38 @@ def _calculate_latency_ms(self, start_time):
160
163
def create_container (self ):
161
164
# Record the start time for detailed container creation metrics
162
165
start_time = time .time ()
166
+ with self .tracer .start_as_current_span ("docker.create_container" ):
167
+
168
+ new_container = self .client .containers .run (
169
+ image = f"{ self .docker_registry } /{ self .docker_image } :{ self .docker_tag } " ,
170
+ command = ["tail" , "-f" , "/dev/null" ], # a never ending process so Docker won't kill the container
171
+ mem_limit = "256mb" ,
172
+ cpu_shares = 2 ,
173
+ detach = True ,
174
+ network_disabled = self .network_disabled ,
175
+ security_opt = ["no-new-privileges" ],
176
+ labels = self .container_labels
177
+ )
163
178
164
- new_container = self .client .containers .run (
165
- image = f"{ self .docker_registry } /{ self .docker_image } :{ self .docker_tag } " ,
166
- command = ["tail" , "-f" , "/dev/null" ], # a never ending process so Docker won't kill the container
167
- mem_limit = "256mb" ,
168
- cpu_shares = 2 ,
169
- detach = True ,
170
- network_disabled = self .network_disabled ,
171
- security_opt = ["no-new-privileges" ],
172
- labels = self .container_labels
173
- )
174
-
175
- # Add the container to the pool
176
- self .container_pool .put (new_container )
177
-
178
- # Calculate and record the latency for the direct container creation
179
- latency = self ._calculate_latency_ms (start_time )
180
- container_creation_histogram .record (latency , attributes = {"method" : "create_container" })
179
+ # Add the container to the pool
180
+ self .container_pool .put (new_container )
181
181
182
- logger .info (f"Created container, id '{ new_container .id } ' in { latency :.3f} milliseconds" )
182
+ # Calculate and record the latency for the direct container creation
183
+ latency = self ._calculate_latency_ms (start_time )
184
+ container_creation_histogram .record (latency , attributes = {"method" : "create_container" })
185
+
186
+ logger .info (f"Created container, id '{ new_container .id } ' in { latency :.3f} milliseconds" )
183
187
184
188
def release_container (self , container ):
185
189
self .releaser_executor .submit (self .async_release , container )
186
190
187
191
def async_release (self , container ):
188
- # First, ensure we have enough containers in the pool
189
- self ._create_new_container ()
192
+ with self .tracer .start_as_current_span ("async_release" ):
193
+ # First, ensure we have enough containers in the pool
194
+ self ._create_new_container ()
190
195
191
- # Now stop and remove the old container
192
- self ._stopContainer (container )
196
+ # Now stop and remove the old container
197
+ self ._stopContainer (container )
193
198
194
199
def _create_new_container (self ):
195
200
try :
@@ -199,35 +204,37 @@ def _create_new_container(self):
199
204
logger .error (f"Error replacing container: { e } " )
200
205
201
206
def _stopContainer (self , container ):
202
- try :
203
- logger .info (f"Stopping container { container .id } . Will create a new one." )
207
+ with self .tracer .start_as_current_span ("docker.stop_container" ):
208
+ try :
209
+ logger .info (f"Stopping container { container .id } . Will create a new one." )
204
210
205
- # Record the start time
206
- start_time = time .time ()
211
+ # Record the start time
212
+ start_time = time .time ()
207
213
208
- # Remove the container
209
- container .remove (force = True )
214
+ # Remove the container
215
+ container .remove (force = True )
210
216
211
- # Calculate and record the latency
212
- latency = self ._calculate_latency_ms (start_time )
213
- container_stop_histogram .record (latency , attributes = {"method" : "stop_container" })
217
+ # Calculate and record the latency
218
+ latency = self ._calculate_latency_ms (start_time )
219
+ container_stop_histogram .record (latency , attributes = {"method" : "stop_container" })
214
220
215
- logger .info (f"Stopped container { container .id } in { latency :.3f} milliseconds" )
216
- except Exception as e :
217
- logger .error (f"Failed to stop container: { e } " )
221
+ logger .info (f"Stopped container { container .id } in { latency :.3f} milliseconds" )
222
+ except Exception as e :
223
+ logger .error (f"Failed to stop container: { e } " )
218
224
219
225
def get_container (self ):
220
- if self .stop_event .is_set ():
221
- raise RuntimeError ("Executor is shutting down, no containers available" )
222
-
223
- while not self .stop_event .is_set ():
224
- try :
225
- return self .container_pool .get (timeout = self .exec_timeout )
226
- except Exception as e :
227
- if self .stop_event .is_set ():
228
- raise RuntimeError ("Executor is shutting down, no containers available" )
229
-
230
- logger .warning (f"Couldn't get a container to execute after waiting for { self .exec_timeout } s. Will retry: { e } " )
226
+ with self .tracer .start_as_current_span ("get_container" ):
227
+ if self .stop_event .is_set ():
228
+ raise RuntimeError ("Executor is shutting down, no containers available" )
229
+
230
+ while not self .stop_event .is_set ():
231
+ try :
232
+ return self .container_pool .get (timeout = self .exec_timeout )
233
+ except Exception as e :
234
+ if self .stop_event .is_set ():
235
+ raise RuntimeError ("Executor is shutting down, no containers available" )
236
+
237
+ logger .warning (f"Couldn't get a container to execute after waiting for { self .exec_timeout } s. Will retry: { e } " )
231
238
232
239
def run_scoring (self , code : str , data : dict , payload_type : str | None = None ) -> dict :
233
240
if self .stop_event .is_set ():
@@ -239,38 +246,39 @@ def run_scoring(self, code: str, data: dict, payload_type: str | None = None) ->
239
246
logger .info (f"Scoring executor latency: { latency :.3f} milliseconds" )
240
247
get_container_histogram .record (latency , attributes = {"method" : "get_container" })
241
248
242
- try :
243
- future = self .scoring_executor .submit (
244
- container .exec_run ,
245
- cmd = ["python" , "-c" , PYTHON_SCORING_COMMAND , code , json .dumps (data ), payload_type or "" ],
246
- detach = False ,
247
- stdin = False ,
248
- tty = False
249
- )
250
-
251
- result = future .result (timeout = self .exec_timeout )
252
- exec_result = ExecutionResult (
253
- exit_code = result .exit_code ,
254
- output = result .output
255
- )
256
- latency = self ._calculate_latency_ms (start_time )
257
- logger .info (f"Scoring executor latency: { latency :.3f} milliseconds" )
258
-
259
- # Access ThreadPoolExecutor's internal work queue (private attribute)
260
- queue_size = self .scoring_executor ._work_queue .qsize ()
261
- scoring_executor_queue_size_gauge .set (queue_size )
262
-
263
- scoring_executor_histogram .record (latency , attributes = {"method" : "run_scoring" })
264
- return self .parse_execution_result (exec_result )
265
- except concurrent .futures .TimeoutError :
266
- logger .error (f"Execution timed out in container { container .id } " )
267
- return {"code" : 504 , "error" : "Server processing exceeded timeout limit." }
268
- except Exception as e :
269
- logger .error (f"An unexpected error occurred: { e } " )
270
- return {"code" : 500 , "error" : "An unexpected error occurred" }
271
- finally :
272
- # Stop and remove the container, then create a new one asynchronously
273
- self .release_container (container )
249
+ with self .tracer .start_as_current_span ("docker.run_scoring" ):
250
+ try :
251
+ future = self .scoring_executor .submit (
252
+ container .exec_run ,
253
+ cmd = ["python" , "-c" , PYTHON_SCORING_COMMAND , code , json .dumps (data ), payload_type or "" ],
254
+ detach = False ,
255
+ stdin = False ,
256
+ tty = False
257
+ )
258
+
259
+ result = future .result (timeout = self .exec_timeout )
260
+ exec_result = ExecutionResult (
261
+ exit_code = result .exit_code ,
262
+ output = result .output
263
+ )
264
+ latency = self ._calculate_latency_ms (start_time )
265
+ logger .info (f"Scoring executor latency: { latency :.3f} milliseconds" )
266
+
267
+ # Access ThreadPoolExecutor's internal work queue (private attribute)
268
+ queue_size = self .scoring_executor ._work_queue .qsize ()
269
+ scoring_executor_queue_size_gauge .set (queue_size )
270
+
271
+ scoring_executor_histogram .record (latency , attributes = {"method" : "run_scoring" })
272
+ return self .parse_execution_result (exec_result )
273
+ except concurrent .futures .TimeoutError :
274
+ logger .error (f"Execution timed out in container { container .id } " )
275
+ return {"code" : 504 , "error" : "Server processing exceeded timeout limit." }
276
+ except Exception as e :
277
+ logger .error (f"An unexpected error occurred: { e } " )
278
+ return {"code" : 500 , "error" : "An unexpected error occurred" }
279
+ finally :
280
+ # Stop and remove the container, then create a new one asynchronously
281
+ self .release_container (container )
274
282
275
283
def cleanup (self ):
276
284
"""Clean up all containers managed by this executor."""
0 commit comments