Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions weather-mcp-server/.actor/pay_per_event.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
{
"actor-start": {
"eventTitle": "Weather MCP server startup",
"eventDescription": "Initial fee for starting the Weather MCP Server Actor",
"eventPriceUsd": 0.01
},
"get_current_weather": {
"eventTitle": "Current weather lookup",
"eventDescription": "Fee for getting current weather information for a specific city using the get_current_weather tool.",
Expand Down
6 changes: 4 additions & 2 deletions weather-mcp-server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
# You can also use any other image from Docker Hub.
FROM apify/actor-python:3.13

USER myuser

# Second, copy just requirements.txt into the Actor image,
# since it should be the only file that affects the dependency installation in the next step,
# in order to speed up the build.
COPY requirements.txt ./
COPY --chown=myuser:myuser requirements.txt ./

# Install the packages specified in requirements.txt,
# print the installed Python version, pip version,
Expand All @@ -23,7 +25,7 @@ RUN echo "Python version:" \
# Next, copy the remaining files and directories with the source code.
# Since we do this after installing the dependencies, quick builds will be really fast
# for most source file changes.
COPY . ./
COPY --chown=myuser:myuser . ./

# Use compileall to ensure the runnability of the Actor Python code.
RUN python3 -m compileall -q src/
Expand Down
14 changes: 8 additions & 6 deletions weather-mcp-server/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
apify<3.0.0
apify_client<2.0.0
fastapi==0.116.1
# Feel free to add your Python dependencies below. For formatting guidelines, see:
# https://pip.pypa.io/en/latest/reference/requirements-file-format/

apify < 4.0.0
apify-client < 3.0.0
fastapi==0.118.2
httpx>=0.24.0
mcp==1.13.1
mcp==1.16.0
mcp_weather_server==0.2.1
pydantic>=2.0.0
python-dateutil>=2.9.0.post0
sse-starlette>=3.0.2
uv>=0.7.8
uvicorn>=0.27.0
uvicorn>=0.27.0
2 changes: 2 additions & 0 deletions weather-mcp-server/src/const.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from enum import Enum

SESSION_TIMEOUT_SECS = 60


class ChargeEvents(str, Enum):
"""Event types for charging MCP operations.
Expand Down
9 changes: 4 additions & 5 deletions weather-mcp-server/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from apify import Actor

from .const import TOOL_WHITELIST, ChargeEvents
from .const import TOOL_WHITELIST
from .models import ServerType
from .server import ProxyServer

Expand Down Expand Up @@ -58,9 +58,8 @@ async def main() -> None:
Charging events are defined in .actor/pay_per_event.json
"""
async with Actor:
# Initialize and charge for Actor startup
# Initialize Actor
Actor.log.info('Starting MCP Server Actor')
await Actor.charge(ChargeEvents.ACTOR_START.value)

url = os.environ.get('ACTOR_STANDBY_URL', HOST)
if not STANDBY_MODE:
Expand All @@ -69,7 +68,7 @@ async def main() -> None:
f'Connect to {url}/mcp to establish a connection.\n'
'Learn more at https://mcp.apify.com/'
)
Actor.log.warning(msg)
Actor.log.info(msg)
await Actor.exit(status_message=msg)
return

Expand All @@ -81,7 +80,7 @@ async def main() -> None:
f"""
{{
"mcpServers": {{
"weather-mcp-server": {{
"{SERVER_NAME}": {{
"url": "{url}/mcp",
}}
}}
Expand Down
150 changes: 133 additions & 17 deletions weather-mcp-server/src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

from __future__ import annotations

import asyncio
import contextlib
import logging
import time
from typing import TYPE_CHECKING, Any

import httpx
Expand All @@ -24,6 +26,7 @@
from starlette.responses import JSONResponse, RedirectResponse, Response
from starlette.routing import Mount, Route

from .const import SESSION_TIMEOUT_SECS
from .event_store import InMemoryEventStore
from .mcp_gateway import create_gateway
from .models import RemoteServerParameters, ServerParameters, ServerType
Expand Down Expand Up @@ -146,6 +149,88 @@ def __init__( # noqa: PLR0913
self.port: int = port
self.actor_charge_function = actor_charge_function
self.tool_whitelist = tool_whitelist
# Track per-session activity and timers for inactivity shutdown
self._session_last_activity: dict[str, float] = {}
self._session_timers: dict[str, asyncio.Task] = {}
# Inactivity window (seconds) before we terminate a session (DELETE)
self._session_timeout_secs: int = SESSION_TIMEOUT_SECS

@staticmethod
def _log_request(request: Request) -> None:
"""Log incoming MCP transport request for diagnostics."""
logger.info(
'MCP transport request',
extra={
'method': request.method,
'path': str(request.url.path),
'mcp_session_id': request.headers.get('mcp-session-id'),
},
)

def _touch_session(self, session_id: str, session_manager: StreamableHTTPSessionManager) -> None:
"""Record activity for a session and (re)schedule inactivity termination."""
self._session_last_activity[session_id] = time.time()

# Cancel existing timer if present
timer = self._session_timers.get(session_id)
if timer and not timer.done():
timer.cancel()

async def _idle_close() -> None:
try:
await asyncio.sleep(self._session_timeout_secs)
# Verify that no new activity occurred during wait
last = self._session_last_activity.get(session_id, 0)
if time.time() - last < self._session_timeout_secs * 0.9:
return # Activity happened; skip
logger.info(f'Terminating idle MCP session {session_id}')

# Craft an internal ASGI DELETE request to close the session
scope = {
'type': 'http',
'http_version': '1.1',
'method': 'DELETE',
'scheme': 'http',
'path': '/mcp/',
'raw_path': b'/mcp/',
'query_string': b'',
'headers': [
(b'mcp-session-id', session_id.encode('utf-8')),
(b'accept', b'application/json, text/event-stream'),
],
'server': (self.host, self.port),
'client': ('127.0.0.1', 0),
}

async def _receive() -> dict[str, Any]:
return {'type': 'http.request', 'body': b'', 'more_body': False}

async def _send(_message: dict[str, Any]) -> None:
# Ignore internal response
return

await session_manager.handle_request(scope, _receive, _send) # type: ignore[arg-type]
self._cleanup_session_last_activity(session_id)
self._cleanup_session_timer(session_id)
except asyncio.CancelledError:
return
except Exception:
logger.exception('Failed to terminate idle session')

self._session_timers[session_id] = asyncio.create_task(_idle_close())

def _cleanup_session_last_activity(self, session_id: str) -> None:
"""Cleanup session tracking data."""
if session_id in self._session_last_activity:
del self._session_last_activity[session_id]

def _cleanup_session_timer(self, session_id: str) -> None:
"""Cancel and cleanup session timer."""
if session_id in self._session_timers:
timer = self._session_timers[session_id]
if not timer.done():
timer.cancel()
del self._session_timers[session_id]

@staticmethod
def _validate_config(client_type: ServerType, config: ServerParameters) -> ServerParameters | None:
Expand All @@ -162,7 +247,21 @@ def _validate_config(client_type: ServerType, config: ServerParameters) -> Serve
raise ValueError(f'Invalid server configuration: {e}') from e

@staticmethod
async def create_starlette_app(server_name: str, mcp_server: Server) -> Starlette:
def _create_capturing_send(
send: Send, session_id_from_resp: dict[str, str | None]
) -> Callable[[dict[str, Any]], Awaitable[None]]:
"""Create a send wrapper that captures session ID from response headers."""

async def capturing_send(message: dict[str, Any]) -> None:
if message.get('type') == 'http.response.start':
headers = {k.decode('latin-1').lower(): v.decode('latin-1') for k, v in message.get('headers', [])}
if sid := headers.get('mcp-session-id'):
session_id_from_resp['sid'] = sid
await send(message)

return capturing_send

async def create_starlette_app(self, mcp_server: Server) -> Starlette:
"""Create a Starlette app that exposes /mcp endpoint for Streamable HTTP transport."""
event_store = InMemoryEventStore()
session_manager = StreamableHTTPSessionManager(
Expand Down Expand Up @@ -195,7 +294,7 @@ async def handle_root(request: Request) -> st.Response:
if is_html_browser(request):
server_url = f'https://{request.headers.get("host", "localhost")}'
mcp_url = f'{server_url}/mcp'
return serve_html_page(server_name, mcp_url)
return serve_html_page(self.server_name, mcp_url)

return JSONResponse(
{
Expand Down Expand Up @@ -228,20 +327,37 @@ async def handle_oauth_authorization_server(_request: Request) -> st.Response:
# ASGI handler for Streamable HTTP connections
async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
# Check if this is a GET request from a browser
if scope['method'] == 'GET':
# Create a Request object to check headers
request = Request(scope, receive)
# Check if the request is from an HTML browser
if is_html_browser(request):
server_url = f'https://{request.headers.get("host", "localhost")}'
mcp_url = f'{server_url}/mcp'
response = serve_html_page(server_name, mcp_url)
# Send the HTML response
await response(scope, receive, send)
return
request = Request(scope, receive)
self._log_request(request)
if scope['method'] == 'GET' and is_html_browser(request):
server_url = f'https://{request.headers.get("host", "localhost")}'
mcp_url = f'{server_url}/mcp'
response = serve_html_page(self.server_name, mcp_url)
# Send the HTML response
await response(scope, receive, send)
return

if scope['method'] == 'DELETE':
await session_manager.handle_request(scope, receive, send)
if req_sid := request.headers.get('mcp-session-id'):
self._cleanup_session_last_activity(req_sid)
self._cleanup_session_timer(req_sid)
return

# For non-browser requests or non-GET requests, delegate to session manager
await session_manager.handle_request(scope, receive, send)
# Wrap `send` to capture the session ID from response headers of initialization
session_id_from_resp: dict[str, str | None] = {'sid': None}
capturing_send = self._create_capturing_send(send, session_id_from_resp)

# Log and touch existing session if present on request
if req_sid := request.headers.get('mcp-session-id'):
self._touch_session(req_sid, session_manager)

await session_manager.handle_request(scope, receive, capturing_send) # type: ignore[arg-type]

# If this was an initialization (no session id in request), capture from response and touch
if not req_sid and session_id_from_resp['sid']:
self._touch_session(session_id_from_resp['sid'], session_manager)

return Starlette(
debug=True,
Expand Down Expand Up @@ -278,7 +394,7 @@ async def start(self) -> None:
ClientSession(read_stream, write_stream) as session,
):
mcp_server = await create_gateway(session, self.actor_charge_function, self.tool_whitelist)
app = await self.create_starlette_app(self.server_name, mcp_server)
app = await self.create_starlette_app(mcp_server)
await self._run_server(app)

elif self.server_type == ServerType.SSE:
Expand All @@ -287,7 +403,7 @@ async def start(self) -> None:
ClientSession(read_stream, write_stream) as session,
):
mcp_server = await create_gateway(session, self.actor_charge_function, self.tool_whitelist)
app = await self.create_starlette_app(self.server_name, mcp_server)
app = await self.create_starlette_app(mcp_server)
await self._run_server(app)

elif self.server_type == ServerType.HTTP:
Expand All @@ -297,7 +413,7 @@ async def start(self) -> None:
ClientSession(read_stream, write_stream) as session,
):
mcp_server = await create_gateway(session, self.actor_charge_function, self.tool_whitelist)
app = await self.create_starlette_app(self.server_name, mcp_server)
app = await self.create_starlette_app(mcp_server)
await self._run_server(app)
else:
raise ValueError(f'Unknown server type: {self.server_type}')
Loading