Skip to content

Commit 5603d99

Browse files
Add simple evaluate_item endpoint
Signed-off-by: Anuradha Karuppiah <[email protected]>
1 parent abf527b commit 5603d99

File tree

2 files changed

+138
-0
lines changed

2 files changed

+138
-0
lines changed

src/nat/front_ends/fastapi/fastapi_front_end_config.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
from nat.data_models.component_ref import ObjectStoreRef
2828
from nat.data_models.front_end import FrontEndBaseConfig
2929
from nat.data_models.step_adaptor import StepAdaptorConfig
30+
from nat.eval.evaluator.evaluator_model import EvalInputItem
31+
from nat.eval.evaluator.evaluator_model import EvalOutputItem
3032

3133
logger = logging.getLogger(__name__)
3234

@@ -133,6 +135,19 @@ class AsyncGenerationStatusResponse(BaseAsyncStatusResponse):
133135
description="Output of the generate request, this is only available if the job completed successfully.")
134136

135137

138+
class EvaluateItemRequest(BaseModel):
139+
"""Request model for single-item evaluation endpoint."""
140+
item: EvalInputItem = Field(description="Single evaluation input item to evaluate")
141+
evaluator_name: str = Field(description="Name of the evaluator to use (must match config)")
142+
143+
144+
class EvaluateItemResponse(BaseModel):
145+
"""Response model for single-item evaluation endpoint."""
146+
success: bool = Field(description="Whether the evaluation completed successfully")
147+
result: EvalOutputItem | None = Field(default=None, description="Evaluation result if successful")
148+
error: str | None = Field(default=None, description="Error message if evaluation failed")
149+
150+
136151
class FastApiFrontEndConfig(FrontEndBaseConfig, name="fastapi"):
137152
"""
138153
A FastAPI based front end that allows a NAT workflow to be served as a microservice.
@@ -232,6 +247,13 @@ class CrossOriginResourceSharing(BaseModel):
232247
description="Evaluates the performance and accuracy of the workflow on a dataset",
233248
)
234249

250+
evaluate_item: typing.Annotated[EndpointBase,
251+
Field(description="Endpoint for evaluating a single item.")] = EndpointBase(
252+
method="POST",
253+
path="/evaluate_item",
254+
description="Evaluate a single item with a specified evaluator",
255+
)
256+
235257
oauth2_callback_path: str | None = Field(
236258
default="/auth/redirect",
237259
description="OAuth2.0 authentication callback endpoint. If None, no OAuth2 callback endpoint is created.")

src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
from pydantic import Field
4040
from starlette.websockets import WebSocket
4141

42+
from nat.builder.eval_builder import WorkflowEvalBuilder
43+
from nat.builder.evaluator import EvaluatorInfo
4244
from nat.builder.function import Function
4345
from nat.builder.workflow_builder import WorkflowBuilder
4446
from nat.data_models.api_server import ChatRequest
@@ -51,11 +53,14 @@
5153
from nat.eval.config import EvaluationRunOutput
5254
from nat.eval.evaluate import EvaluationRun
5355
from nat.eval.evaluate import EvaluationRunConfig
56+
from nat.eval.evaluator.evaluator_model import EvalInput
5457
from nat.front_ends.fastapi.auth_flow_handlers.http_flow_handler import HTTPAuthenticationFlowHandler
5558
from nat.front_ends.fastapi.auth_flow_handlers.websocket_flow_handler import FlowState
5659
from nat.front_ends.fastapi.auth_flow_handlers.websocket_flow_handler import WebSocketAuthenticationFlowHandler
5760
from nat.front_ends.fastapi.fastapi_front_end_config import AsyncGenerateResponse
5861
from nat.front_ends.fastapi.fastapi_front_end_config import AsyncGenerationStatusResponse
62+
from nat.front_ends.fastapi.fastapi_front_end_config import EvaluateItemRequest
63+
from nat.front_ends.fastapi.fastapi_front_end_config import EvaluateItemResponse
5964
from nat.front_ends.fastapi.fastapi_front_end_config import EvaluateRequest
6065
from nat.front_ends.fastapi.fastapi_front_end_config import EvaluateResponse
6166
from nat.front_ends.fastapi.fastapi_front_end_config import EvaluateStatusResponse
@@ -227,6 +232,50 @@ def __init__(self, config: Config):
227232
self._outstanding_flows: dict[str, FlowState] = {}
228233
self._outstanding_flows_lock = asyncio.Lock()
229234

235+
# Evaluator storage for single-item evaluation
236+
self._evaluators: dict[str, EvaluatorInfo] = {}
237+
self._eval_builder: WorkflowEvalBuilder | None = None
238+
239+
async def initialize_evaluators(self, config: Config):
240+
"""Initialize and store evaluators from config for single-item evaluation."""
241+
if not config.eval or not config.eval.evaluators:
242+
logger.info("No evaluators configured, skipping evaluator initialization")
243+
return
244+
245+
try:
246+
# Build evaluators using WorkflowEvalBuilder
247+
self._eval_builder = WorkflowEvalBuilder(general_config=config.general,
248+
eval_general_config=config.eval.general,
249+
registry=None)
250+
251+
# Enter the async context and keep it alive
252+
await self._eval_builder.__aenter__()
253+
254+
# Populate evaluators
255+
for name, evaluator_config in config.eval.evaluators.items():
256+
await self._eval_builder.add_evaluator(name, evaluator_config)
257+
self._evaluators[name] = self._eval_builder.get_evaluator(name)
258+
logger.info(f"Initialized evaluator: {name}")
259+
260+
logger.info(f"Successfully initialized {len(self._evaluators)} evaluators")
261+
262+
except Exception as e:
263+
logger.error(f"Failed to initialize evaluators: {e}")
264+
# Don't fail startup, just log the error
265+
self._evaluators = {}
266+
267+
async def cleanup_evaluators(self):
268+
"""Clean up evaluator resources on shutdown."""
269+
if self._eval_builder:
270+
try:
271+
await self._eval_builder.__aexit__(None, None, None)
272+
logger.info("Evaluator builder context cleaned up")
273+
except Exception as e:
274+
logger.error(f"Error cleaning up evaluator builder: {e}")
275+
finally:
276+
self._eval_builder = None
277+
self._evaluators.clear()
278+
230279
def get_step_adaptor(self) -> StepAdaptor:
231280

232281
return StepAdaptor(self.front_end_config.step_adaptor)
@@ -236,12 +285,16 @@ async def configure(self, app: FastAPI, builder: WorkflowBuilder):
236285
# Do things like setting the base URL and global configuration options
237286
app.root_path = self.front_end_config.root_path
238287

288+
# Initialize evaluators for single-item evaluation
289+
await self.initialize_evaluators(self._config)
290+
239291
await self.add_routes(app, builder)
240292

241293
async def add_routes(self, app: FastAPI, builder: WorkflowBuilder):
242294

243295
await self.add_default_route(app, SessionManager(await builder.build()))
244296
await self.add_evaluate_route(app, SessionManager(await builder.build()))
297+
await self.add_evaluate_item_route(app, SessionManager(await builder.build()))
245298
await self.add_static_files_route(app, builder)
246299
await self.add_authorization_route(app)
247300
await self.add_mcp_client_tool_list_route(app, builder)
@@ -439,6 +492,69 @@ async def get_jobs(http_request: Request, status: str | None = None) -> list[Eva
439492
else:
440493
logger.warning("Dask is not available, evaluation endpoints will not be added.")
441494

495+
async def add_evaluate_item_route(self, app: FastAPI, session_manager: SessionManager):
496+
"""Add the single-item evaluation endpoint to the FastAPI app."""
497+
498+
async def evaluate_single_item(request: EvaluateItemRequest, http_request: Request) -> EvaluateItemResponse:
499+
"""Handle single-item evaluation requests."""
500+
501+
async with session_manager.session(http_connection=http_request):
502+
503+
# Check if evaluator exists
504+
if request.evaluator_name not in self._evaluators:
505+
raise HTTPException(status_code=404,
506+
detail=f"Evaluator '{request.evaluator_name}' not found. "
507+
f"Available evaluators: {list(self._evaluators.keys())}")
508+
509+
try:
510+
# Get the evaluator
511+
evaluator = self._evaluators[request.evaluator_name]
512+
513+
# Run evaluation on single item
514+
result = await evaluator.evaluate_fn(EvalInput(eval_input_items=[request.item]))
515+
516+
# Extract the single output item
517+
if result.eval_output_items:
518+
output_item = result.eval_output_items[0]
519+
return EvaluateItemResponse(success=True, result=output_item, error=None)
520+
else:
521+
return EvaluateItemResponse(success=False, result=None, error="Evaluator returned no results")
522+
523+
except Exception as e:
524+
logger.exception(f"Error evaluating item with {request.evaluator_name}")
525+
return EvaluateItemResponse(success=False, result=None, error=f"Evaluation failed: {str(e)}")
526+
527+
# Register the route
528+
if self.front_end_config.evaluate_item.path:
529+
app.add_api_route(path=self.front_end_config.evaluate_item.path,
530+
endpoint=evaluate_single_item,
531+
methods=[self.front_end_config.evaluate_item.method],
532+
response_model=EvaluateItemResponse,
533+
description=self.front_end_config.evaluate_item.description,
534+
responses={
535+
404: {
536+
"description": "Evaluator not found",
537+
"content": {
538+
"application/json": {
539+
"example": {
540+
"detail": "Evaluator 'unknown' not found"
541+
}
542+
}
543+
}
544+
},
545+
500: {
546+
"description": "Internal Server Error",
547+
"content": {
548+
"application/json": {
549+
"example": {
550+
"detail": "Internal server error occurred"
551+
}
552+
}
553+
}
554+
}
555+
})
556+
logger.info(f"Added evaluate_item route at {self.front_end_config.evaluate_item.path}")
557+
442558
async def add_static_files_route(self, app: FastAPI, builder: WorkflowBuilder):
443559

444560
if not self.front_end_config.object_store:

0 commit comments

Comments
 (0)