Skip to content

Streaming

Manoj Desai edited this page May 3, 2025 · 1 revision

Streaming

Python A2A provides comprehensive support for real-time streaming, enabling responsive user experiences and immediate feedback for long-running operations.

Streaming Basics

Streaming in A2A allows for:

  • Token-by-token delivery of LLM outputs
  • Progress updates for long-running tasks
  • Real-time data transmission
  • More responsive user experiences

Server-Side Streaming

To implement streaming on the server side, add a stream_response method to your agent:

from python_a2a import A2AServer, agent, run_server
import asyncio

@agent(
    name="Streaming Demo Agent",
    description="Demonstrates streaming capabilities"
)
class StreamingAgent(A2AServer):
    def handle_task(self, task):
        """Regular synchronous response for non-streaming requests."""
        return {"output": f"Non-streaming response for: {task.input}"}
    
    async def stream_response(self, message):
        """Stream a response token by token."""
        words = f"Streaming response for: {message}".split()
        
        for word in words:
            yield {"content": word + " "}
            await asyncio.sleep(0.2)  # Simulate thinking time

# Create and run the agent
if __name__ == "__main__":
    streaming_agent = StreamingAgent()
    run_server(streaming_agent, port=5000)

Client-Side Streaming

To consume a streaming response:

import asyncio
from python_a2a import HTTPClient

async def stream_example():
    client = HTTPClient("http://localhost:5000")
    
    print("Streaming response:")
    async for chunk in client.stream_response("Tell me a story about AI"):
        print(chunk.content, end="", flush=True)
    print("\nStreaming complete!")

if __name__ == "__main__":
    asyncio.run(stream_example())

Advanced Streaming Features

Progress Updates

Stream progress updates for long-running operations:

from python_a2a import A2AServer, agent, run_server
import asyncio
import time

@agent(
    name="Research Agent",
    description="Performs research with progress updates"
)
class ResearchAgent(A2AServer):
    async def stream_response(self, message):
        """Stream progress updates during research."""
        total_steps = 5
        
        yield {"content": f"Starting research on: {message}\n"}
        await asyncio.sleep(0.5)
        
        for step in range(1, total_steps + 1):
            progress = step / total_steps * 100
            yield {"content": f"Progress: {progress:.0f}% - Step {step}/{total_steps}\n"}
            
            # Simulate work
            await asyncio.sleep(1)
        
        yield {"content": f"\nResearch complete on: {message}\n\nFindings:\n"}
        await asyncio.sleep(0.5)
        
        findings = [
            "Discovery 1: Lorem ipsum dolor sit amet",
            "Discovery 2: Consectetur adipiscing elit",
            "Discovery 3: Sed do eiusmod tempor incididunt"
        ]
        
        for finding in findings:
            yield {"content": f"- {finding}\n"}
            await asyncio.sleep(0.3)

# Create and run the agent
if __name__ == "__main__":
    research_agent = ResearchAgent()
    run_server(research_agent, port=5000)

Structured Streaming

Stream structured data with metadata:

from python_a2a import A2AServer, agent, run_server
import asyncio
import json

@agent(
    name="Structured Streaming Agent",
    description="Streams structured data with metadata"
)
class StructuredStreamingAgent(A2AServer):
    async def stream_response(self, message):
        """Stream structured data with metadata."""
        # Header information
        yield {
            "content": "",
            "meta": {
                "type": "header",
                "total_items": 5
            }
        }
        await asyncio.sleep(0.2)
        
        # Stream data items
        for i in range(1, 6):
            yield {
                "content": f"Item {i} content\n",
                "meta": {
                    "type": "data",
                    "item_number": i,
                    "timestamp": asyncio.get_event_loop().time()
                }
            }
            await asyncio.sleep(0.5)
        
        # Footer information
        yield {
            "content": "\nAll items delivered successfully",
            "meta": {
                "type": "footer",
                "status": "complete",
                "processing_time": 2.5
            }
        }

# Create and run the agent
if __name__ == "__main__":
    structured_agent = StructuredStreamingAgent()
    run_server(structured_agent, port=5000)

To consume structured streaming:

import asyncio
from python_a2a import HTTPClient

async def structured_stream_example():
    client = HTTPClient("http://localhost:5000")
    
    print("Structured streaming:")
    async for chunk in client.stream_response("Send me structured data"):
        # Process content
        if chunk.content:
            print(chunk.content, end="", flush=True)
        
        # Process metadata
        if hasattr(chunk, 'meta') and chunk.meta:
            meta = chunk.meta
            if meta.get("type") == "header":
                print(f"\n[Header: {meta.get('total_items')} items expected]")
            elif meta.get("type") == "footer":
                print(f"\n[Footer: Status {meta.get('status')}, Processing time: {meta.get('processing_time')}s]")
            elif meta.get("type") == "data":
                print(f"\n[Data item {meta.get('item_number')}, Time: {meta.get('timestamp')}]")
    
    print("\nStructured streaming complete!")

if __name__ == "__main__":
    asyncio.run(structured_stream_example())

Integrating with LLM Services

Stream responses directly from LLM services:

from python_a2a import A2AServer, agent, run_server
from python_a2a.client.llm.openai import OpenAILLMClient
import os
import asyncio

@agent(
    name="OpenAI Streaming Agent",
    description="Streams responses directly from OpenAI"
)
class OpenAIStreamingAgent(A2AServer):
    def __init__(self):
        super().__init__()
        self.llm_client = OpenAILLMClient(
            api_key=os.environ.get("OPENAI_API_KEY"),
            model="gpt-4"
        )
    
    async def stream_response(self, message):
        """Stream a response directly from OpenAI."""
        system_message = "You are a helpful assistant that provides concise information."
        
        # Stream the completion from OpenAI
        async for chunk in self.llm_client.stream_complete(
            system=system_message,
            user=message
        ):
            yield {"content": chunk.content}

# Create and run the agent
if __name__ == "__main__":
    openai_agent = OpenAIStreamingAgent()
    run_server(openai_agent, port=5000)

Task-Based Streaming

Stream updates for long-running tasks:

from python_a2a import A2AServer, agent, run_server
from python_a2a.models import Task, TaskStatus
import asyncio
import uuid
import threading
import time

@agent(
    name="Task Streaming Agent",
    description="Streams updates for long-running tasks"
)
class TaskStreamingAgent(A2AServer):
    def __init__(self):
        super().__init__()
        self.tasks = {}
        self.task_worker_thread = threading.Thread(target=self.task_worker, daemon=True)
        self.task_worker_thread.start()
    
    def create_task(self, task_input):
        """Create a new task and return it."""
        task_id = str(uuid.uuid4())
        self.tasks[task_id] = {
            "status": TaskStatus.PENDING,
            "input": task_input,
            "progress": 0,
            "output": None,
            "created_at": time.time(),
            "updates": []
        }
        return Task(task_id=task_id, input=task_input, status=TaskStatus.PENDING)
    
    def get_task_status(self, task_id):
        """Get the current status of a task."""
        if task_id in self.tasks:
            return self.tasks[task_id]["status"]
        return None
    
    def get_task_output(self, task_id):
        """Get the output of a completed task."""
        if task_id in self.tasks and self.tasks[task_id]["status"] == TaskStatus.COMPLETED:
            return {"output": self.tasks[task_id]["output"]}
        return None
    
    async def stream_task_updates(self, task_id):
        """Stream updates for a specific task."""
        if task_id not in self.tasks:
            yield {"content": f"Error: Task {task_id} not found"}
            return
        
        # Send initial status
        task = self.tasks[task_id]
        yield {"content": f"Task {task_id} is {task['status']}. Progress: {task['progress']}%\n"}
        
        # Send existing updates
        for update in task["updates"]:
            yield {"content": f"Update: {update}\n"}
        
        # Stream new updates as they come in
        last_update_index = len(task["updates"])
        while task["status"] not in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
            # Check for new updates
            if len(task["updates"]) > last_update_index:
                new_updates = task["updates"][last_update_index:]
                for update in new_updates:
                    yield {"content": f"Update: {update}\n"}
                last_update_index = len(task["updates"])
            
            # Send progress updates
            yield {"content": f"Progress: {task['progress']}%\n"}
            
            # Wait before checking again
            await asyncio.sleep(0.5)
        
        # Send final status
        if task["status"] == TaskStatus.COMPLETED:
            yield {"content": f"\nTask completed! Result: {task['output']}"}
        else:
            yield {"content": f"\nTask failed!"}
    
    def task_worker(self):
        """Background worker that processes tasks."""
        while True:
            # Find pending tasks
            pending_tasks = [
                task_id for task_id, task in self.tasks.items()
                if task["status"] == TaskStatus.PENDING
            ]
            
            # Process each pending task
            for task_id in pending_tasks:
                # Start processing
                self.tasks[task_id]["status"] = TaskStatus.RUNNING
                self.tasks[task_id]["updates"].append("Starting task processing")
                
                # Simulate work
                for progress in range(0, 101, 10):
                    self.tasks[task_id]["progress"] = progress
                    
                    if progress % 30 == 0:
                        self.tasks[task_id]["updates"].append(f"Reached {progress}% completion")
                    
                    time.sleep(0.5)  # Simulate work
                
                # Complete the task
                self.tasks[task_id]["status"] = TaskStatus.COMPLETED
                self.tasks[task_id]["output"] = f"Processed input: {self.tasks[task_id]['input']}"
                self.tasks[task_id]["updates"].append("Task processing complete")
            
            # Check for new tasks every second
            time.sleep(1)

# Create and run the agent
if __name__ == "__main__":
    task_agent = TaskStreamingAgent()
    run_server(task_agent, port=5000)

A client for task-based streaming:

import asyncio
from python_a2a import HTTPClient

async def task_streaming_example():
    client = HTTPClient("http://localhost:5000")
    
    # Create a task
    task = client.create_task("Process this data")
    print(f"Created task with ID: {task.task_id}")
    
    # Stream updates for the task
    print("Streaming task updates:")
    async for chunk in client.stream_task_updates(task.task_id):
        print(chunk.content, end="", flush=True)
    
    # Get the final output
    output = client.get_task_output(task.task_id)
    print(f"\nFinal output: {output}")

if __name__ == "__main__":
    asyncio.run(task_streaming_example())

Streaming UI Integration

Guide for integrating streaming with different UI frameworks:

Web UI (JavaScript)

// Assuming a server endpoint at http://localhost:5000
async function streamResponse() {
  const userInput = document.getElementById('user-input').value;
  const outputElement = document.getElementById('output');
  
  // Clear previous output
  outputElement.innerHTML = '';
  
  try {
    // Create a fetch request with the streaming flag
    const response = await fetch('http://localhost:5000/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ message: userInput })
    });
    
    // Create a reader from the response body
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    // Read chunks as they arrive
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      // Decode and display the chunk
      const chunk = decoder.decode(value, { stream: true });
      const data = JSON.parse(chunk);
      
      // Append content to the output element
      outputElement.innerHTML += data.content;
      
      // Auto-scroll to the bottom
      outputElement.scrollTop = outputElement.scrollHeight;
    }
  } catch (error) {
    outputElement.innerHTML += `<div class="error">Error: ${error.message}</div>`;
  }
}

Command Line UI (Python)

import asyncio
import sys
from python_a2a import HTTPClient

async def cli_streaming():
    client = HTTPClient("http://localhost:5000")
    
    while True:
        # Get user input
        sys.stdout.write("\nEnter a message (or 'quit' to exit): ")
        sys.stdout.flush()
        user_input = input()
        
        if user_input.lower() in ['quit', 'exit', 'q']:
            break
        
        # Stream the response
        sys.stdout.write("Response: ")
        sys.stdout.flush()
        
        try:
            async for chunk in client.stream_response(user_input):
                sys.stdout.write(chunk.content)
                sys.stdout.flush()
            
            # End with a newline
            sys.stdout.write("\n")
        except Exception as e:
            sys.stdout.write(f"\nError: {e}\n")

if __name__ == "__main__":
    asyncio.run(cli_streaming())

Best Practices for Streaming

  1. Error Handling

    • Implement proper error handling in both server and client
    • Return error information in the stream format
    • Handle reconnection gracefully
  2. Performance

    • Use asynchronous code for streaming
    • Yield chunks at appropriate intervals
    • Monitor memory usage for long-running streams
  3. User Experience

    • Provide progress indicators
    • Show typing indicators for LLM outputs
    • Enable cancellation of streaming operations
  4. Structured Data

    • Include metadata with streamed content
    • Use consistent data structure for all chunks
    • Document the streaming format for consumers
  5. Testing

    • Test with different network conditions
    • Verify streaming behavior with large outputs
    • Check reconnection behavior after disconnects

By implementing streaming in your A2A agents, you can create more responsive and user-friendly experiences that provide immediate feedback and progress updates.

Clone this wiki locally