Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/basic/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (p *basicMessageProcessor) ProcessMessage(
log.Infof("Created streaming task %s for processing", taskID)

// Subscribe to the task for streaming events
subscriber, err := handle.SubScribeTask(&taskID)
subscriber, err := handle.SubscribeTask(&taskID)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to task: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/jwks/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (p *pushNotificationMessageProcessor) ProcessMessage(
}

// Subscribe to the task for streaming events
subscriber, err := handle.SubScribeTask(&taskID)
subscriber, err := handle.SubscribeTask(&taskID)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to task: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/simple/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (p *simpleMessageProcessor) ProcessMessage(
}

// Subscribe to the task for streaming events
subscriber, err := handler.SubScribeTask(&taskID)
subscriber, err := handler.SubscribeTask(&taskID)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to task: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/streaming/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *streamingMessageProcessor) ProcessMessage(
}

// Subscribe to the task for streaming events
subscriber, err := handle.SubScribeTask(&taskID)
subscriber, err := handle.SubscribeTask(&taskID)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to task: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions taskmanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type TaskHandler interface {
// AddArtifact adds an artifact to the specified task.
AddArtifact(taskID *string, artifact protocol.Artifact, isFinal bool, needMoreData bool) error

// SubScribeTask subscribes to the task and returns the task subscriber.
SubScribeTask(taskID *string) (TaskSubscriber, error)
// SubscribeTask subscribes to the task and returns the task subscriber.
SubscribeTask(taskID *string) (TaskSubscriber, error)

// GetTask returns the task by taskID. Returns an error if the task cannot be found.
GetTask(taskID *string) (CancellableTask, error)
Expand Down
4 changes: 2 additions & 2 deletions taskmanager/memory_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (h *memoryTaskHandler) UpdateTaskState(
return nil
}

// SubScribeTask subscribes to the task
func (h *memoryTaskHandler) SubScribeTask(taskID *string) (TaskSubscriber, error) {
// SubscribeTask subscribes to the task
func (h *memoryTaskHandler) SubscribeTask(taskID *string) (TaskSubscriber, error) {
if taskID == nil || *taskID == "" {
return nil, fmt.Errorf("taskID cannot be nil or empty")
}
Expand Down
10 changes: 5 additions & 5 deletions taskmanager/memory_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestMemoryTaskHandler_AddArtifact(t *testing.T) {
}
}

func TestMemoryTaskHandler_SubScribeTask(t *testing.T) {
func TestMemoryTaskHandler_SubscribeTask(t *testing.T) {
handler, _ := setupTestHandler(t)

// First create a task
Expand All @@ -138,7 +138,7 @@ func TestMemoryTaskHandler_SubScribeTask(t *testing.T) {
}

// Subscribe to task
subscriber, err := handler.SubScribeTask(&taskID)
subscriber, err := handler.SubscribeTask(&taskID)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -304,9 +304,9 @@ func TestTaskHandlerErrors(t *testing.T) {
}
})

t.Run("SubScribeTask_NonExistentTask", func(t *testing.T) {
t.Run("SubscribeTask_NonExistentTask", func(t *testing.T) {
nonExistentTaskID := "non-existent-task"
_, err := handler.SubScribeTask(&nonExistentTaskID)
_, err := handler.SubscribeTask(&nonExistentTaskID)
if err == nil {
t.Error("Expected error for non-existent task")
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestTaskHandlerErrors(t *testing.T) {
t.Error("Expected error for nil task ID")
}

_, err = handler.SubScribeTask(nil)
_, err = handler.SubscribeTask(nil)
if err == nil {
t.Error("Expected error for nil task ID")
}
Expand Down
2 changes: 1 addition & 1 deletion taskmanager/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestMemoryTaskManager_OnSendMessageStream(t *testing.T) {
return nil, err
}

subscriber, err := handle.SubScribeTask(&taskID)
subscriber, err := handle.SubscribeTask(&taskID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion taskmanager/redis/example/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (p *ToLowerProcessor) processStreamingMode(
}

// Subscribe to the task
subscriber, err := handle.SubScribeTask(&taskID)
subscriber, err := handle.SubscribeTask(&taskID)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to task: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions taskmanager/redis/redis_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ func (h *taskHandler) AddArtifact(taskID *string, artifact protocol.Artifact, is
return nil
}

// SubScribeTask subscribes to the task and returns a TaskSubscriber.
func (h *taskHandler) SubScribeTask(taskID *string) (taskmanager.TaskSubscriber, error) {
// SubscribeTask subscribes to the task and returns a TaskSubscriber.
func (h *taskHandler) SubscribeTask(taskID *string) (taskmanager.TaskSubscriber, error) {
if taskID == nil || *taskID == "" {
return nil, fmt.Errorf("taskID cannot be nil or empty")
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (p *testProcessor) ProcessMessage(

if options.Streaming {
// For streaming requests, process in background and return StreamingEvents
subscriber, err := handle.SubScribeTask(stringPtr(taskID))
subscriber, err := handle.SubscribeTask(stringPtr(taskID))
if err != nil {
return nil, fmt.Errorf("failed to subscribe to task: %w", err)
}
Expand Down
Loading