Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,27 +378,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
}
}

req.Status = "" // We are not expecting a filter here but clear just in case
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the console never passes a status, I previously just dismissed any status filter received on the queues endpoint. I think this is wrong, so this PR only sets enqueued/pending if no status is present in the request.

filters := req.toListWorkflowsOptions()
filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending}))
if len(req.Status) == 0 {
filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending}))
}
filters = append(filters, WithQueuesOnly())
workflows, err := ListWorkflows(ctx, filters...)
if err != nil {
ctx.logger.Error("Failed to list queued workflows", "error", err)
http.Error(w, fmt.Sprintf("Failed to list queued workflows: %v", err), http.StatusInternalServerError)
return
}

// If not queue was specified, filter out non-queued workflows
if req.QueueName == nil {
filtered := make([]WorkflowStatus, 0, len(workflows))
for _, wf := range workflows {
if len(wf.QueueName) > 0 {
filtered = append(filtered, wf)
}
}
workflows = filtered
}

// Transform to UNIX timestamps before encoding
responseWorkflows := make([]map[string]any, len(workflows))
for i, wf := range workflows {
Expand Down
304 changes: 243 additions & 61 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func TestAdminServer(t *testing.T) {
// Should have exactly 3 workflows
assert.Equal(t, 3, len(workflows), "Expected exactly 3 workflows")

// Verify each workflow's input/output marshaling
// Verify each workflow's input/output marshalling
for _, wf := range workflows {
wfID := wf["WorkflowUUID"].(string)

Expand Down Expand Up @@ -530,74 +530,256 @@ func TestAdminServer(t *testing.T) {
assert.True(t, foundIDs4[workflowID2], "Expected to find second workflow ID in empty body results")
})

t.Run("TestDeactivate", func(t *testing.T) {
t.Run("Deactivate stops workflow scheduler", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(Config{
DatabaseURL: databaseURL,
AppName: "test-app",
AdminServer: true,
})
require.NoError(t, err)
t.Run("ListQueuedWorkflows", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(Config{
DatabaseURL: databaseURL,
AppName: "test-app",
AdminServer: true,
})
require.NoError(t, err)

// Track scheduled workflow executions
var executionCount atomic.Int32
// Create a workflow queue with limited concurrency to keep workflows enqueued
queue := NewWorkflowQueue(ctx, "test-queue", WithGlobalConcurrency(1))

// Register a scheduled workflow that runs every second
RegisterWorkflow(ctx, func(dbosCtx DBOSContext, scheduledTime time.Time) (string, error) {
executionCount.Add(1)
return fmt.Sprintf("executed at %v", scheduledTime), nil
}, WithSchedule("* * * * * *")) // Every second
// Define a blocking workflow that will hold up the queue
startEvent := NewEvent()
blockingChan := make(chan struct{})
blockingWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
startEvent.Set()
<-blockingChan // Block until channel is closed
return "blocked-" + input, nil
}
RegisterWorkflow(ctx, blockingWorkflow)

err = ctx.Launch()
// Define a regular non-blocking workflow
regularWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
return "regular-" + input, nil
}
RegisterWorkflow(ctx, regularWorkflow)

err = ctx.Launch()
require.NoError(t, err)

// Ensure cleanup
defer func() {
close(blockingChan) // Unblock any blocked workflows
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
}
}()

client := &http.Client{Timeout: 5 * time.Second}
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_QUEUED_WORKFLOWS_PATTERN, "POST /"))

/// Create a workflow that will not block the queue
h1, err := RunWorkflow(ctx, regularWorkflow, "regular", WithQueue(queue.Name))
require.NoError(t, err)
_, err = h1.GetResult()
require.NoError(t, err)

// Create the first queued workflow that will start processing and block
firstQueueHandle, err := RunWorkflow(ctx, blockingWorkflow, "blocking", WithQueue(queue.Name))
require.NoError(t, err)

startEvent.Wait()

// Create additional queued workflows that will remain in ENQUEUED status
var enqueuedHandles []WorkflowHandle[string]
for i := range 3 {
handle, err := RunWorkflow(ctx, blockingWorkflow, fmt.Sprintf("queued-%d", i), WithQueue(queue.Name))
require.NoError(t, err)
enqueuedHandles = append(enqueuedHandles, handle)
}

client := &http.Client{Timeout: 5 * time.Second}
// Create non-queued workflows that should NOT appear in queues-only results
var regularHandles []WorkflowHandle[string]
for i := range 2 {
handle, err := RunWorkflow(ctx, regularWorkflow, fmt.Sprintf("regular-%d", i))
require.NoError(t, err)
regularHandles = append(regularHandles, handle)
}

// Ensure cleanup
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
}
if client.Transport != nil {
client.Transport.(*http.Transport).CloseIdleConnections()
}
}()

// Wait for 2-3 executions to verify scheduler is running
require.Eventually(t, func() bool {
return executionCount.Load() >= 2
}, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions")

// Call deactivate endpoint
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
req, err := http.NewRequest("GET", endpoint, nil)
require.NoError(t, err, "Failed to create deactivate request")

resp, err := client.Do(req)
require.NoError(t, err, "Failed to call deactivate endpoint")
defer resp.Body.Close()

// Verify endpoint returned 200 OK
assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from deactivate endpoint")

// Verify response body
body, err := io.ReadAll(resp.Body)
require.NoError(t, err, "Failed to read response body")
assert.Equal(t, "deactivated", string(body), "Expected 'deactivated' response body")

// Record count after deactivate and wait
countAfterDeactivate := executionCount.Load()
time.Sleep(4 * time.Second) // Wait long enough for multiple executions if scheduler was still running

// Verify no new executions occurred
finalCount := executionCount.Load()
assert.LessOrEqual(t, finalCount, countAfterDeactivate+1,
"Expected no new scheduled workflows after deactivate (had %d before, %d after)",
countAfterDeactivate, finalCount)
})
// Wait for regular workflows to complete
for _, h := range regularHandles {
_, err := h.GetResult()
require.NoError(t, err)
}

// Test 1: Query with empty body (should get all enqueued/pending queue workflows)
reqQueuesOnly, err := http.NewRequest(http.MethodPost, endpoint, nil)
require.NoError(t, err, "Failed to create queues_only request")
reqQueuesOnly.Header.Set("Content-Type", "application/json")

respQueuesOnly, err := client.Do(reqQueuesOnly)
require.NoError(t, err, "Failed to make queues_only request")
defer respQueuesOnly.Body.Close()

assert.Equal(t, http.StatusOK, respQueuesOnly.StatusCode)

var queuesOnlyWorkflows []map[string]any
err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows)
require.NoError(t, err, "Failed to decode queues_only workflows response")

// Should have exactly 3 enqueued workflows and 1 pending workflow
assert.Equal(t, 4, len(queuesOnlyWorkflows), "Expected exactly 4 workflows")

// Verify all returned workflows are from the queue and have ENQUEUED/PENDING status
for _, wf := range queuesOnlyWorkflows {
status, ok := wf["Status"].(string)
require.True(t, ok, "Status should be a string")
assert.True(t, status == "ENQUEUED" || status == "PENDING",
"Expected status to be ENQUEUED or PENDING, got %s", status)

queueName, ok := wf["QueueName"].(string)
require.True(t, ok, "QueueName should be a string")
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
}

// Verify that the enqueued workflow IDs match
enqueuedIDs := make(map[string]bool)
enqueuedIDs[firstQueueHandle.GetWorkflowID()] = true
for _, h := range enqueuedHandles {
enqueuedIDs[h.GetWorkflowID()] = true
}

for _, wf := range queuesOnlyWorkflows {
id, ok := wf["WorkflowUUID"].(string)
require.True(t, ok, "WorkflowUUID should be a string")
assert.True(t, enqueuedIDs[id], "Expected workflow ID %s to be in enqueued list", id)
}

// Test 2: Query with queue_name filter (should get only workflows from specific queue)
reqBodyQueueName := map[string]any{
"queue_name": queue.Name,
}
reqQueueName, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBodyQueueName)))
require.NoError(t, err, "Failed to create queue_name request")
reqQueueName.Header.Set("Content-Type", "application/json")

respQueueName, err := client.Do(reqQueueName)
require.NoError(t, err, "Failed to make queue_name request")
defer respQueueName.Body.Close()

assert.Equal(t, http.StatusOK, respQueueName.StatusCode)

var queueNameWorkflows []map[string]any
err = json.NewDecoder(respQueueName.Body).Decode(&queueNameWorkflows)
require.NoError(t, err, "Failed to decode queue_name workflows response")

// Should have 4 workflows from the queue (1 blocking running, 3 enqueued)
assert.Equal(t, 4, len(queueNameWorkflows), "Expected exactly 4 workflows from test-queue")

// All should have the queue name set
for _, wf := range queueNameWorkflows {
queueName, ok := wf["QueueName"].(string)
require.True(t, ok, "QueueName should be a string")
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
id, ok := wf["WorkflowUUID"].(string)
require.True(t, ok, "WorkflowUUID should be a string")
assert.True(t, enqueuedIDs[id], "Expected workflow ID %s to be in enqueued list", id)
}

// Test 3: Query with status filter for PENDING (should get only the running workflow)
reqBodyPending := map[string]any{
"status": "PENDING",
}
reqPending, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBodyPending)))
require.NoError(t, err, "Failed to create pending status request")
reqPending.Header.Set("Content-Type", "application/json")

respPending, err := client.Do(reqPending)
require.NoError(t, err, "Failed to make pending status request")
defer respPending.Body.Close()

assert.Equal(t, http.StatusOK, respPending.StatusCode)

var pendingWorkflows []map[string]any
err = json.NewDecoder(respPending.Body).Decode(&pendingWorkflows)
require.NoError(t, err, "Failed to decode pending workflows response")

// Should have exactly 1 PENDING workflow (the first blocking workflow that's running)
assert.Equal(t, 1, len(pendingWorkflows), "Expected exactly 1 PENDING workflow")

// Verify it's the first workflow with PENDING status
status, ok := pendingWorkflows[0]["Status"].(string)
require.True(t, ok, "Status should be a string")
assert.Equal(t, "PENDING", status, "Expected status to be PENDING")

id, ok := pendingWorkflows[0]["WorkflowUUID"].(string)
require.True(t, ok, "WorkflowUUID should be a string")
assert.Equal(t, firstQueueHandle.GetWorkflowID(), id, "Expected the PENDING workflow to be the first blocking workflow")

queueName, ok := pendingWorkflows[0]["QueueName"].(string)
require.True(t, ok, "QueueName should be a string")
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
})

t.Run("TestDeactivate", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(Config{
DatabaseURL: databaseURL,
AppName: "test-app",
AdminServer: true,
})
require.NoError(t, err)

// Track scheduled workflow executions
var executionCount atomic.Int32

// Register a scheduled workflow that runs every second
RegisterWorkflow(ctx, func(dbosCtx DBOSContext, scheduledTime time.Time) (string, error) {
executionCount.Add(1)
return fmt.Sprintf("executed at %v", scheduledTime), nil
}, WithSchedule("* * * * * *")) // Every second

err = ctx.Launch()
require.NoError(t, err)

client := &http.Client{Timeout: 5 * time.Second}

// Ensure cleanup
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
}
if client.Transport != nil {
client.Transport.(*http.Transport).CloseIdleConnections()
}
}()

// Wait for 2-3 executions to verify scheduler is running
require.Eventually(t, func() bool {
return executionCount.Load() >= 2
}, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions")

// Call deactivate endpoint
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
req, err := http.NewRequest("GET", endpoint, nil)
require.NoError(t, err, "Failed to create deactivate request")

resp, err := client.Do(req)
require.NoError(t, err, "Failed to call deactivate endpoint")
defer resp.Body.Close()

// Verify endpoint returned 200 OK
assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from deactivate endpoint")

// Verify response body
body, err := io.ReadAll(resp.Body)
require.NoError(t, err, "Failed to read response body")
assert.Equal(t, "deactivated", string(body), "Expected 'deactivated' response body")

// Record count after deactivate and wait
countAfterDeactivate := executionCount.Load()
time.Sleep(4 * time.Second) // Wait long enough for multiple executions if scheduler was still running

// Verify no new executions occurred
finalCount := executionCount.Load()
assert.LessOrEqual(t, finalCount, countAfterDeactivate+1,
"Expected no new scheduled workflows after deactivate (had %d before, %d after)",
countAfterDeactivate, finalCount)
})
}

func mustMarshal(v any) []byte {
Expand Down
17 changes: 3 additions & 14 deletions dbos/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
opts = append(opts, WithLoadInput(req.Body.LoadInput))
opts = append(opts, WithLoadOutput(false)) // Don't load output for queued workflows
opts = append(opts, WithSortDesc(req.Body.SortDesc))
opts = append(opts, WithQueuesOnly()) // Only include workflows that are in queues

// Add status filter for queued workflows
queuedStatuses := make([]WorkflowStatusType, 0)
Expand Down Expand Up @@ -691,21 +692,9 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
return c.sendResponse(response, string(listQueuedWorkflowsMessage))
}

// If no queue name was specified, only include workflows that have a queue name
var filteredWorkflows []WorkflowStatus
if req.Body.QueueName == nil {
for _, wf := range workflows {
if wf.QueueName != "" {
filteredWorkflows = append(filteredWorkflows, wf)
}
}
} else {
filteredWorkflows = workflows
}

// Prepare response payload
formattedWorkflows := make([]listWorkflowsConductorResponseBody, len(filteredWorkflows))
for i, wf := range filteredWorkflows {
formattedWorkflows := make([]listWorkflowsConductorResponseBody, len(workflows))
for i, wf := range workflows {
formattedWorkflows[i] = formatListWorkflowsResponseBody(wf)
}

Expand Down
Loading
Loading