Skip to content

Commit 9b987b8

Browse files
committed
Fix: Use FastAPI/uvicorn for healthcheck like worker implementation
- Create proper healthcheck server module using FastAPI/uvicorn - Follow exact pattern from worker healthcheck implementation - Manager process now handles healthcheck in background mode - Returns JSON response like worker healthcheck - Tests updated to expect JSON response
1 parent 3c61774 commit 9b987b8

File tree

3 files changed

+63
-42
lines changed

3 files changed

+63
-42
lines changed

src/prefect/cli/server.py

Lines changed: 23 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -583,28 +583,14 @@ def run_manager_process(
583583
# Start healthcheck server if requested
584584
if with_healthcheck:
585585
import threading
586-
from http.server import BaseHTTPRequestHandler, HTTPServer
587-
588-
class HealthHandler(BaseHTTPRequestHandler):
589-
def do_GET(self):
590-
if self.path == "/health":
591-
self.send_response(200)
592-
self.send_header("Content-type", "text/plain")
593-
self.end_headers()
594-
self.wfile.write(b"OK")
595-
else:
596-
self.send_response(404)
597-
self.end_headers()
598-
599-
def log_message(self, format, *args):
600-
pass # Suppress request logging
601586

602-
def run_healthcheck_server():
603-
server = HTTPServer(("0.0.0.0", 8080), HealthHandler)
604-
server.serve_forever()
587+
from prefect.server.services.healthcheck import build_healthcheck_server
605588

589+
healthcheck_server = build_healthcheck_server()
606590
healthcheck_thread = threading.Thread(
607-
target=run_healthcheck_server, daemon=True
591+
name="healthcheck-server-thread",
592+
target=healthcheck_server.run,
593+
daemon=True,
608594
)
609595
healthcheck_thread.start()
610596
logger.info("Healthcheck server started on port 8080")
@@ -673,32 +659,21 @@ def start_services(
673659
if not background:
674660
app.console.print("\n[blue]Starting services... Press CTRL+C to stop[/]\n")
675661

676-
# Start healthcheck server if requested
662+
# Import at runtime to avoid circular imports
663+
import threading
664+
665+
healthcheck_server = None
677666
healthcheck_thread = None
667+
678668
if with_healthcheck:
679-
import threading
680-
from http.server import BaseHTTPRequestHandler, HTTPServer
681-
682-
class HealthHandler(BaseHTTPRequestHandler):
683-
def do_GET(self):
684-
if self.path == "/health":
685-
self.send_response(200)
686-
self.send_header("Content-type", "text/plain")
687-
self.end_headers()
688-
self.wfile.write(b"OK")
689-
else:
690-
self.send_response(404)
691-
self.end_headers()
692-
693-
def log_message(self, format, *args):
694-
pass # Suppress request logging
695-
696-
def run_healthcheck_server():
697-
server = HTTPServer(("0.0.0.0", 8080), HealthHandler)
698-
server.serve_forever()
669+
from prefect.server.services.healthcheck import build_healthcheck_server
699670

671+
# Create and start the healthcheck server in a separate thread
672+
healthcheck_server = build_healthcheck_server()
700673
healthcheck_thread = threading.Thread(
701-
target=run_healthcheck_server, daemon=True
674+
name="healthcheck-server-thread",
675+
target=healthcheck_server.run,
676+
daemon=True,
702677
)
703678
healthcheck_thread.start()
704679
app.console.print("[green]Healthcheck server started on port 8080[/]")
@@ -707,6 +682,13 @@ def run_healthcheck_server():
707682
asyncio.run(Service.run_services())
708683
except KeyboardInterrupt:
709684
pass
685+
finally:
686+
if healthcheck_server and healthcheck_thread:
687+
logger.debug("Stopping healthcheck server...")
688+
healthcheck_server.should_exit = True
689+
healthcheck_thread.join()
690+
logger.debug("Healthcheck server stopped.")
691+
710692
app.console.print("\n[green]All services stopped.[/]")
711693
return
712694

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""
2+
Health check server for Prefect services, following the same pattern as the worker health check.
3+
"""
4+
5+
import uvicorn
6+
from fastapi import APIRouter, FastAPI, status
7+
from fastapi.responses import JSONResponse
8+
9+
10+
def build_healthcheck_server(log_level: str = "error") -> uvicorn.Server:
11+
"""
12+
Build a healthcheck FastAPI server for services.
13+
14+
Unlike the worker healthcheck, this is a simple liveness check that
15+
returns 200 OK if the services are running.
16+
17+
Args:
18+
log_level (str): the log level to use for the server
19+
20+
Returns:
21+
uvicorn.Server: the configured server ready to run
22+
"""
23+
app = FastAPI()
24+
router = APIRouter()
25+
26+
def perform_health_check():
27+
# Simple liveness check - if we can respond, services are running
28+
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "OK"})
29+
30+
router.add_api_route("/health", perform_health_check, methods=["GET"])
31+
app.include_router(router)
32+
33+
config = uvicorn.Config(
34+
app=app,
35+
host="0.0.0.0",
36+
port=8080,
37+
log_level=log_level,
38+
)
39+
return uvicorn.Server(config=config)

tests/cli/test_server_services.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def test_start_services_with_healthcheck(self, pid_file: Path):
157157
try:
158158
response = requests.get("http://localhost:8080/health", timeout=5)
159159
assert response.status_code == 200
160-
assert response.text == "OK"
160+
assert response.json() == {"message": "OK"}
161161
finally:
162162
# Always clean up
163163
invoke_and_assert(

0 commit comments

Comments
 (0)