Skip to content

Releases: sparckles/Robyn

v0.72.2 - fix breaking on unsupported methods

20 Sep 15:10

Choose a tag to compare

What's Changed

New Contributors

Full Changelog: v0.72.1...v0.72.2

v0.72.1 - update orjson to support every python version

30 Aug 00:20

Choose a tag to compare

What's Changed

Full Changelog: v0.72.0...v0.72.1

v0.72.0 - add configurable timeout and keep alive

23 Jul 12:20

Choose a tag to compare

What's Changed

Full Changelog: v0.71.4...v0.72.0

v0.71.4 - remove dead code and duplication

19 Jul 19:42

Choose a tag to compare

What's Changed

Full Changelog: v0.71.3...v0.71.4

v0.71.3 - fix duplicate routing in robyn app

17 Jul 17:06

Choose a tag to compare

What's Changed

Full Changelog: v0.71.2...v0.71.3

v0.71.2 - fix auth route conflict

09 Jul 18:26

Choose a tag to compare

What's Changed

Full Changelog: v0.71.1...v0.71.2

v0.71.1 - make SSE PEP compliant and improve implementation

07 Jul 15:13

Choose a tag to compare

What's Changed

  • fix: make SSE PEP compliant and improve implementation by @sansyrox in #1202
"""
Server-Sent Events (SSE) Example for Robyn

This example demonstrates how to implement Server-Sent Events using Robyn. SSE allows real-time server-to-client
communication over a single HTTP connection.
"""

import asyncio
import time

from robyn import Robyn, SSEMessage, SSEResponse

app = Robyn(__file__)


@app.get("/events")
def stream_events(request):
    """Basic SSE endpoint that sends a message every second"""

    def event_generator():
        """Generator function that yields SSE-formatted messages"""
        for i in range(10):
            yield SSEMessage(f"Message {i} - {time.strftime('%H:%M:%S')}", id=str(i))
            time.sleep(1)

        # Send a final message
        yield SSEMessage("Stream ended", event="end")

    return SSEResponse(event_generator())


@app.get("/events/json")
def stream_json_events(request):
    """SSE endpoint that sends JSON data"""
    import json

    def json_event_generator():
        """Generator that yields JSON data as SSE"""
        for i in range(5):
            data = {"id": i, "message": f"JSON message {i}", "timestamp": time.time(), "type": "notification"}
            yield SSEMessage(json.dumps(data), event="notification", id=str(i))
            time.sleep(2)

    return SSEResponse(json_event_generator())


@app.get("/events/named")
def stream_named_events(request):
    """SSE endpoint with named events"""

    def named_event_generator():
        """Generator that yields named SSE events"""
        events = [
            ("user_joined", "Alice joined the chat"),
            ("message", "Hello everyone!"),
            ("user_left", "Bob left the chat"),
            ("message", "How is everyone doing?"),
            ("user_joined", "Charlie joined the chat"),
        ]

        for i, (event_type, message) in enumerate(events):
            yield SSEMessage(message, event=event_type, id=str(i))
            time.sleep(1.5)

    return SSEResponse(named_event_generator())


@app.get("/events/async")
async def stream_async_events(request):
    """Async SSE endpoint demonstrating async generators"""

    async def async_event_generator():
        """Async generator for SSE events"""
        for i in range(8):
            # Simulate async work
            await asyncio.sleep(0.5)
            yield SSEMessage(f"Async message {i} - {time.strftime('%H:%M:%S')}", event="async", id=str(i))

    return SSEResponse(async_event_generator())


@app.get("/events/heartbeat")
def stream_heartbeat(request):
    """SSE endpoint that sends heartbeat messages"""

    def heartbeat_generator():
        """Generator that sends heartbeat pings"""
        counter = 0
        while counter < 20:  # Send 20 heartbeats
            yield SSE_Message(f"heartbeat {counter}", event="heartbeat", id=str(counter))
            counter += 1
            time.sleep(0.5)

        yield SSEMessage("heartbeat ended", event="end")

    return SSEResponse(heartbeat_generator())

Full Changelog: v0.71.0...v0.71.1

v0.71.0 - add support for server sent events

05 Jul 11:58

Choose a tag to compare

What's Changed

"""
Server-Sent Events (SSE) Example for Robyn

This example demonstrates how to implement Server-Sent Events using Robyn,
similar to FastAPI's implementation. SSE allows real-time server-to-client
communication over a single HTTP connection.
"""

import asyncio
import time

from robyn import Robyn, SSE_Message, SSE_Response

app = Robyn(__file__)


@app.get("/events")
def stream_events(request):
    """Basic SSE endpoint that sends a message every second"""

    def event_generator():
        """Generator function that yields SSE-formatted messages"""
        for i in range(10):
            yield SSE_Message(f"Message {i} - {time.strftime('%H:%M:%S')}", id=str(i))
            time.sleep(1)

        # Send a final message
        yield SSE_Message("Stream ended", event="end")

    return SSE_Response(event_generator())


@app.get("/events/json")
def stream_json_events(request):
    """SSE endpoint that sends JSON data"""
    import json

    def json_event_generator():
        """Generator that yields JSON data as SSE"""
        for i in range(5):
            data = {"id": i, "message": f"JSON message {i}", "timestamp": time.time(), "type": "notification"}
            yield SSE_Message(json.dumps(data), event="notification", id=str(i))
            time.sleep(2)

    return SSE_Response(json_event_generator())


@app.get("/events/named")
def stream_named_events(request):
    """SSE endpoint with named events"""

    def named_event_generator():
        """Generator that yields named SSE events"""
        events = [
            ("user_joined", "Alice joined the chat"),
            ("message", "Hello everyone!"),
            ("user_left", "Bob left the chat"),
            ("message", "How is everyone doing?"),
            ("user_joined", "Charlie joined the chat"),
        ]

        for i, (event_type, message) in enumerate(events):
            yield SSE_Message(message, event=event_type, id=str(i))
            time.sleep(1.5)

    return SSE_Response(named_event_generator())


@app.get("/events/async")
async def stream_async_events(request):
    """Async SSE endpoint demonstrating async generators"""

    async def async_event_generator():
        """Async generator for SSE events"""
        for i in range(8):
            # Simulate async work
            await asyncio.sleep(0.5)
            yield SSE_Message(f"Async message {i} - {time.strftime('%H:%M:%S')}", event="async", id=str(i))

    return SSE_Response(async_event_generator())


@app.get("/events/heartbeat")
def stream_heartbeat(request):
    """SSE endpoint that sends heartbeat messages"""

    def heartbeat_generator():
        """Generator that sends heartbeat pings"""
        counter = 0
        while counter < 20:  # Send 20 heartbeats
            yield SSE_Message(f"heartbeat {counter}", event="heartbeat", id=str(counter))
            counter += 1
            time.sleep(0.5)

        yield SSE_Message("heartbeat ended", event="end")

    return SSE_Response(heartbeat_generator())

Full Changelog: v0.70.0...v0.71.0

v0.70.0 - Experimental AI Features (agents and mcp server)

25 Jun 10:11

Choose a tag to compare

What's Changed

Full write up https://sanskar.wtf/posts/the-future-of-robyn

Experimental AI Features

First web framework with built-in AI agents and memory.

from robyn import Robyn
from robyn.ai import agent, memory

app = Robyn(__file__)

@app.post("/chat")
async def chat(request):
    mem = memory(provider="inmemory", user_id="user")
    ai_agent = agent(runner="simple", memory=mem)

    result = await ai_agent.run(request.json().get("query"))
    return {"response": result["response"]}

MCP Support

Native Model Context Protocol integration.

# MCP Resources
@app.mcp.resource("time://current")
def current_time() -> str:
    return f"Current time: {datetime.now().isoformat()}"


@app.mcp.tool(
    name="echo",
    description="Echo back text",
    input_schema={"type": "object", "properties": {"text": {"type": "string", "description": "Text to echo"}}, "required": ["text"]},
)
def echo_tool(args):
    return args.get("text", "")


# MCP Prompts
@app.mcp.prompt(
    name="explain_code",
    description="Generate code explanation prompt",
    arguments=[
        {"name": "code", "description": "Code to explain", "required": True},
        {"name": "language", "description": "Programming language", "required": False},
    ],
)
def explain_code_prompt(args):
    code = args.get("code", "")
    language = args.get("language", "unknown")

    return f"""Please explain this {language} code:

{language}
{code}
Include:
1. What it does
2. How it works
3. Key concepts used
"""

Note

These features are very much experimental. Checkout the examples/ folder to see the new features in action.

Install: pip install robyn==0.70.0

Full Changelog: v0.69.0...v0.70.0

v0.69.0 - make robyn ~80% faster

18 Jun 12:26

Choose a tag to compare

What's Changed

Full Changelog: v0.68.0...v0.69.0