-
Notifications
You must be signed in to change notification settings - Fork 638
Description
Search before asking
- I had searched in the issues and found no similar issues.
Feature Request
Overview
A2A (Agent-to-Agent Communication Protocol) is a high-performance protocol plugin for EventMesh, specifically designed to support asynchronous communication, collaboration, and task coordination between intelligent agents. The protocol is based on a protocol delegation pattern, reusing EventMesh's existing CloudEvents and HTTP protocol infrastructure, providing complete agent lifecycle management, message routing, state synchronization, and collaboration workflow functionality.
Core Features
1. Protocol Delegation Architecture
- Protocol Reuse: Delegation pattern based on CloudEvents and HTTP protocols, avoiding duplicate implementation
- Intelligent Routing: EnhancedProtocolPluginFactory provides high-performance caching and routing
- Performance Monitoring: ProtocolMetrics provides detailed operation statistics and error tracking
- Graceful Degradation: Supports independent operation mode when dependencies are missing
2. High-Performance Optimization
- Caching Mechanism: Protocol adapter preloading and caching, improving lookup performance
- Intelligent Routing: ProtocolRouter supports capability and priority-based message routing
- Batch Processing: Supports batch CloudEvent conversion and processing
- Thread Safety: Read-write locks ensure thread safety in high-concurrency scenarios
3. CloudEvents Integration
- Standards Compliance: Strictly follows CloudEvents extension naming conventions (lowercase)
- Extension Attributes: Supports A2A-specific extensions like protocol, protocolversion, messagetype
- Bidirectional Conversion: Lossless bidirectional conversion between A2A messages and CloudEvent
- Multi-Protocol Compatibility: Fully compatible with existing HTTP, gRPC, TCP protocols
4. Protocol Features
- Asynchronous Communication: Based on EventMesh's asynchronous event-driven architecture
- Scalability: Supports dynamic addition of new agent types and capabilities
- Fault Tolerance: Built-in fault detection and recovery mechanisms
- Java 8 Compatibility: Ensures full compatibility with Java 8 runtime environment
Architecture Design
Core Components
┌─────────────────────────────────────────────────────────────┐
│ EventMesh A2A Protocol v2.0 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Enhanced │ │ Protocol │ │ Protocol │ │
│ │ Protocol │ │ Router │ │ Metrics │ │
│ │ Factory │ │ (Routing) │ │(Monitoring) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ A2A │ │ Enhanced │ │ A2A │ │
│ │ Protocol │ │ A2A │ │ Protocol │ │
│ │ Adaptor │ │ Adaptor │ │ Transport │ │
│ │ (Basic) │ │(Delegation) │ │ Objects │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ EventMesh Protocol Infrastructure │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ CloudEvents │ │ HTTP │ │ gRPC │ │
│ │ Protocol │ │ Protocol │ │ Protocol │ │
│ │ (Delegated) │ │ (Delegated) │ │ (Delegated) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
Protocol Delegation Pattern
The A2A protocol adopts a delegation pattern to achieve high performance and high compatibility by reusing existing protocol infrastructure:
- A2AProtocolAdaptor: Basic A2A protocol adapter, focused on core A2A message processing
- EnhancedA2AProtocolAdaptor: Enhanced adapter that reuses CloudEvents and HTTP protocols through delegation pattern
- EnhancedProtocolPluginFactory: High-performance protocol factory providing caching, routing, and lifecycle management
- ProtocolRouter: Intelligent protocol router for protocol selection based on message characteristics
- ProtocolMetrics: Protocol performance monitoring providing detailed operation statistics and error tracking
Message Flow
- Agent Registration: Agents register with EventMesh, providing capabilities and metadata
- Message Sending: Agents send A2A messages to EventMesh
- Message Routing: EventMesh forwards messages to target agents based on routing rules
- Message Processing: Target agents process messages and return responses
- State Synchronization: Agents periodically synchronize state information
Protocol Message Format
CloudEvent Extension Format
The A2A protocol is based on CloudEvents standard, using standard CloudEvent format with A2A-specific extension attributes:
{
"specversion": "1.0",
"id": "a2a-1708293600-0.123456",
"source": "eventmesh-a2a",
"type": "org.apache.eventmesh.protocol.a2a.register",
"datacontenttype": "application/json",
"time": "2024-01-01T00:00:00Z",
"data": "{\"protocol\":\"A2A\",\"messageType\":\"REGISTER\"}",
"protocol": "A2A",
"protocolversion": "2.0",
"messagetype": "REGISTER",
"sourceagent": "agent-001",
"targetagent": "agent-002"
}
Extension Attribute Description
According to CloudEvents specification, all extension attribute names must use lowercase letters:
- protocol: Protocol type, fixed as "A2A"
- protocolversion: Protocol version, currently "2.0"
- messagetype: Message type (REGISTER, TASK_REQUEST, HEARTBEAT, etc.)
- sourceagent: Source agent ID
- targetagent: Target agent ID (optional)
- agentcapabilities: Agent capability list (comma-separated)
- collaborationid: Collaboration session ID (optional)
Basic Message Structure
The A2A protocol supports traditional JSON message format for compatibility with non-CloudEvents systems:
{
"protocol": "A2A",
"version": "2.0",
"messageId": "uuid",
"timestamp": "2024-01-01T00:00:00Z",
"sourceAgent": {
"agentId": "agent-001",
"agentType": "task-executor",
"capabilities": ["task-execution", "data-processing"]
},
"targetAgent": {
"agentId": "agent-002",
"agentType": "data-provider"
},
"messageType": "REQUEST|RESPONSE|NOTIFICATION|SYNC",
"payload": {},
"metadata": {
"priority": "HIGH|NORMAL|LOW",
"ttl": 300,
"correlationId": "correlation-uuid"
}
}
Message Types
1. Registration Message (REGISTER)
{
"messageType": "REGISTER",
"payload": {
"agentInfo": {
"agentId": "agent-001",
"agentType": "task-executor",
"version": "1.0.0",
"capabilities": ["task-execution", "data-processing"],
"endpoints": {
"grpc": "localhost:9090",
"http": "http://localhost:8080"
},
"resources": {
"cpu": "4 cores",
"memory": "8GB",
"storage": "100GB"
}
}
}
}
2. Task Request Message (TASK_REQUEST)
{
"messageType": "TASK_REQUEST",
"payload": {
"taskId": "task-001",
"taskType": "data-processing",
"parameters": {
"inputData": "data-source-url",
"processingRules": ["filter", "transform", "aggregate"],
"outputFormat": "json"
},
"constraints": {
"timeout": 300,
"priority": "HIGH",
"retryCount": 3
}
}
}
3. State Synchronization Message (STATE_SYNC)
{
"messageType": "STATE_SYNC",
"payload": {
"agentState": {
"status": "BUSY|IDLE|ERROR",
"currentTask": "task-001",
"progress": 75,
"metrics": {
"cpuUsage": 65.5,
"memoryUsage": 45.2,
"activeConnections": 10
}
}
}
}
Usage Guide
1. Configure EventMesh to Support A2A Protocol
The A2A protocol is automatically loaded as a plugin, requiring no additional configuration. Advanced features can be optionally enabled in EventMesh configuration files:
# eventmesh.properties (optional configuration)
eventmesh.protocol.a2a.enabled=true
eventmesh.protocol.a2a.config.path=conf/a2a-protocol-config.yaml
2. Using A2A Protocol Adapters
import org.apache.eventmesh.protocol.a2a.A2AProtocolAdaptor;
import org.apache.eventmesh.protocol.a2a.EnhancedA2AProtocolAdaptor;
// Using basic A2A protocol adapter
A2AProtocolAdaptor basicAdaptor = new A2AProtocolAdaptor();
basicAdaptor.initialize();
// Validate message
ProtocolTransportObject message = new TestProtocolTransportObject(
"{\"protocol\":\"A2A\",\"messageType\":\"REGISTER\"}"
);
boolean isValid = basicAdaptor.isValid(message);
// Convert to CloudEvent
CloudEvent cloudEvent = basicAdaptor.toCloudEvent(message);
// Using enhanced A2A protocol adapter (delegation pattern)
EnhancedA2AProtocolAdaptor enhancedAdaptor = new EnhancedA2AProtocolAdaptor();
enhancedAdaptor.initialize(); // Will attempt to load CloudEvents and HTTP protocol adapters
// Enhanced adapter supports more complex protocol delegation and routing
CloudEvent enhancedEvent = enhancedAdaptor.toCloudEvent(message);
3. Protocol Factory and Router Usage
import org.apache.eventmesh.protocol.api.EnhancedProtocolPluginFactory;
import org.apache.eventmesh.protocol.api.ProtocolRouter;
import org.apache.eventmesh.protocol.api.ProtocolMetrics;
// Get A2A protocol adapter (through factory)
ProtocolAdaptor<ProtocolTransportObject> adaptor =
EnhancedProtocolPluginFactory.getProtocolAdaptor("A2A");
// Check if protocol is supported
boolean supported = EnhancedProtocolPluginFactory.isProtocolSupported("A2A");
// Get all available protocols
List<String> protocols = EnhancedProtocolPluginFactory.getAvailableProtocolTypes();
// Use protocol router
ProtocolRouter router = ProtocolRouter.getInstance();
router.addRoutingRule("a2a-messages",
msg -> msg.toString().contains("A2A"),
"A2A");
// Monitor protocol performance
ProtocolMetrics metrics = ProtocolMetrics.getInstance();
var stats = metrics.getStats("A2A");
if (stats != null) {
System.out.println("A2A protocol total operations: " + stats.getTotalOperations());
System.out.println("A2A protocol errors: " + stats.getTotalErrors());
}
4. Defining Collaboration Workflows
import org.apache.eventmesh.runtime.core.protocol.a2a.CollaborationManager;
// Create workflow definition
List<WorkflowStep> steps = Arrays.asList(
new WorkflowStep(
"data-collection",
"Collect data from sources",
Arrays.asList("data-collection"),
Map.of("sources", Arrays.asList("source1", "source2")),
true, 30000, 3
),
new WorkflowStep(
"data-processing",
"Process collected data",
Arrays.asList("data-processing"),
Map.of("algorithm", "ml-pipeline"),
true, 60000, 3
)
);
WorkflowDefinition workflow = new WorkflowDefinition(
"data-pipeline",
"Data Processing Pipeline",
"End-to-end data processing workflow",
steps
);
// Register workflow
CollaborationManager.getInstance().registerWorkflow(workflow);
// Start collaboration session
String sessionId = CollaborationManager.getInstance().startCollaboration(
"data-pipeline",
Arrays.asList("agent-001", "agent-002"),
Map.of("batchSize", 1000)
);
5. Monitoring and Debugging
// Get all registered agents
List<AgentInfo> agents = A2AMessageHandler.getInstance().getAllAgents();
// Find agents with specific capabilities
List<AgentInfo> dataProcessors = A2AMessageHandler.getInstance()
.findAgentsByCapability("data-processing");
// Check agent status
boolean isAlive = A2AMessageHandler.getInstance().isAgentAlive("agent-001");
// Get collaboration status
CollaborationStatus status = A2AMessageHandler.getInstance()
.getCollaborationStatus(sessionId);
API Reference
A2AProtocolAdaptor
Basic A2A protocol adapter implementing the ProtocolAdaptor interface.
Main Methods
initialize()
: Initialize adapterdestroy()
: Destroy adaptergetProtocolType()
: Returns "A2A"getVersion()
: Returns "2.0"getPriority()
: Returns 80 (high priority)supportsBatchProcessing()
: Returns truegetCapabilities()
: Returns supported capability setisValid(ProtocolTransportObject)
: Validate if message is valid A2A messagetoCloudEvent(ProtocolTransportObject)
: Convert to CloudEventtoBatchCloudEvent(ProtocolTransportObject)
: Batch convert to CloudEventfromCloudEvent(CloudEvent)
: Convert from CloudEvent to A2A message
EnhancedA2AProtocolAdaptor
Enhanced A2A protocol adapter supporting protocol delegation pattern.
Features
- Protocol Delegation: Automatically delegate to CloudEvents and HTTP protocol adapters
- Graceful Degradation: Independent operation when dependent protocols are unavailable
- Intelligent Routing: Automatically select processing methods based on message type
- Fault Tolerance: Comprehensive error handling and recovery mechanisms
Main Methods
Same interface as A2AProtocolAdaptor, with additional support for:
- Automatic protocol delegation
- Fallback handling when dependencies fail
- Enhanced error recovery mechanisms
EnhancedProtocolPluginFactory
High-performance protocol plugin factory providing caching and lifecycle management.
Main Methods
getProtocolAdaptor(String)
: Get protocol adapter (supports caching)getProtocolAdaptorWithFallback(String, String)
: Get protocol adapter (supports fallback)getAvailableProtocolTypes()
: Get all available protocol typesgetProtocolAdaptorsByPriority()
: Get adapters sorted by prioritygetProtocolMetadata(String)
: Get protocol metadataisProtocolSupported(String)
: Check if protocol is supportedgetProtocolAdaptorsByCapability(String)
: Find adapters by capabilityshutdown()
: Shutdown all protocol adapters
ProtocolRouter
Intelligent protocol router supporting rule-based message routing.
Main Methods
getInstance()
: Get singleton instanceaddRoutingRule(String, Predicate, String)
: Add routing ruleremoveRoutingRule(String)
: Remove routing rulerouteMessage(ProtocolTransportObject)
: Route messagegetAllRoutingRules()
: Get all routing rules
ProtocolMetrics
Protocol performance monitoring providing detailed statistics.
Main Methods
getInstance()
: Get singleton instancerecordSuccess(String, String, long)
: Record successful operationrecordFailure(String, String, String)
: Record failed operationgetStats(String)
: Get protocol statisticsresetAllStats()
: Reset all statisticsgetOperationStats(String, String)
: Get specific operation statistics
Protocol Transport Objects
A2AProtocolTransportObject
Basic A2A protocol transport object wrapping CloudEvent and content.
SimpleA2AProtocolTransportObject
Simplified A2A protocol transport object for enhanced adapter fallback scenarios.
Configuration
A2A Protocol Configuration
Configuration file location: eventmesh-runtime/conf/a2a-protocol-config.yaml
Main configuration items:
a2a:
# Protocol version
version: "1.0"
# Message settings
message:
default-ttl: 300
default-priority: "NORMAL"
max-size: 1048576
# Registry settings
registry:
heartbeat-timeout: 30000
heartbeat-interval: 30000
max-agents: 1000
# Routing settings
routing:
intelligent-routing: true
load-balancing: true
strategy: "capability-based"
# Collaboration settings
collaboration:
workflow-enabled: true
max-concurrent-sessions: 100
default-workflow-timeout: 300000
Technical Features
Performance Metrics
- Message Throughput: Supports 10,000+ messages/second processing capability
- Latency: Local protocol conversion latency < 1ms, network latency < 10ms
- Concurrent Processing: Supports 1,000+ concurrent protocol adapter instances
- Memory Efficiency: Protocol caching and object pools reduce GC pressure
Compatibility
- Java Version: Fully compatible with Java 8 and above
- EventMesh Version: Compatible with EventMesh 1.11.0 and above
- CloudEvents: Follows CloudEvents 1.0 specification
- Protocol Standards: Compatible with HTTP/1.1, gRPC, TCP protocols
Scalability Features
- Horizontal Scaling: Supports load balancing across multiple EventMesh instances
- Protocol Pluggable: Dynamic loading of protocol adapters through SPI mechanism
- Routing Rules: Supports complex message routing and forwarding rules
- Monitoring Integration: Provides detailed performance metrics and health checks
Best Practices
1. Protocol Selection
- Basic Scenarios: Use A2AProtocolAdaptor for simple A2A message processing
- Complex Scenarios: Use EnhancedA2AProtocolAdaptor for protocol delegation and routing capabilities
- High-Performance Scenarios: Get caching and batch processing advantages through EnhancedProtocolPluginFactory
- Monitoring Scenarios: Integrate ProtocolMetrics for performance monitoring and tuning
2. Message Design
- CloudEvents First: Prioritize CloudEvents format for best compatibility
- Extension Naming: Strictly follow CloudEvents extension naming conventions (lowercase letters)
- Idempotency: Design idempotent message processing logic
- Error Handling: Implement comprehensive error handling and recovery mechanisms
3. Performance Optimization
- Caching Strategy: Utilize protocol factory's caching mechanism to reduce repeated loading
- Batch Processing: Use toBatchCloudEvent for batch message processing
- Asynchronous Processing: Leverage EventMesh's asynchronous architecture to improve concurrency performance
- Connection Reuse: Reuse existing protocol's network connection pools
4. Monitoring and Debugging
- Performance Monitoring: Use ProtocolMetrics to monitor protocol performance metrics
- Routing Tracking: Track message routing paths through ProtocolRouter
- Error Analysis: Analyze errors during protocol conversion and delegation processes
- Capacity Planning: Perform capacity planning and performance tuning based on monitoring data
Troubleshooting
Common Issues
-
Agent Registration Failure
- Check network connectivity
- Verify agent ID uniqueness
- Confirm EventMesh service status
-
Message Routing Failure
- Check if target agent is online
- Verify agent capability matching
- Review routing logs
-
Collaboration Workflow Timeout
- Check step timeout settings
- Verify agent response time
- Review workflow execution logs
-
Performance Issues
- Adjust thread pool configuration
- Optimize message size
- Check network latency
Log Analysis
A2A protocol log location: logs/a2a-protocol.log
Key log levels:
DEBUG
: Detailed protocol interaction informationINFO
: Important status changes and operationsWARN
: Potential issues and warningsERROR
: Errors and exception information
Extension Development
Custom Agent Types
public class CustomAgent extends SimpleA2AAgent {
public CustomAgent(String agentId, String agentType, String[] capabilities) {
super(agentId, agentType, capabilities);
}
@Override
protected Object processTask(String taskType, Map<String, Object> parameters) {
// Implement custom task processing logic
switch (taskType) {
case "custom-task":
return processCustomTask(parameters);
default:
return super.processTask(taskType, parameters);
}
}
private Object processCustomTask(Map<String, Object> parameters) {
// Custom task processing implementation
return Map.of("status", "completed", "customResult", "success");
}
}
Custom Message Types
// Define custom message type
public class CustomMessage extends A2AMessage {
private String customField;
public CustomMessage() {
super();
setMessageType("CUSTOM_MESSAGE");
}
public String getCustomField() {
return customField;
}
public void setCustomField(String customField) {
this.customField = customField;
}
}
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct *