Skip to content

batch output insert #3167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/Store.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ type TaskManager interface {
DeleteTaskWithOutputs(projectID int, taskID int) error
GetTaskOutputs(projectID int, taskID int, params RetrieveQueryParams) ([]TaskOutput, error)
CreateTaskOutput(output TaskOutput) (TaskOutput, error)
InsertTaskOutputBatch(output []TaskOutput) error
CreateTaskStage(stage TaskStage) (TaskStage, error)
EndTaskStage(taskID int, stageID int, end time.Time, endOutputID int) error
CreateTaskStageResult(taskID int, stageID int, result map[string]any) error
Expand Down
9 changes: 5 additions & 4 deletions db/Task.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,11 @@ type TaskWithTpl struct {

// TaskOutput is the ansible log output from the task
type TaskOutput struct {
ID int `db:"id" json:"id"`
TaskID int `db:"task_id" json:"task_id"`
Time time.Time `db:"time" json:"time"`
Output string `db:"output" json:"output"`
ID int `db:"id" json:"id"`
TaskID int `db:"task_id" json:"task_id"`
Time time.Time `db:"time" json:"time"`
Output string `db:"output" json:"output"`
StageID *int `db:"stage_id" json:"stage_id"`
}

type TaskStageType string
Expand Down
16 changes: 16 additions & 0 deletions db/bolt/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ func (d *BoltDb) CreateTaskOutput(output db.TaskOutput) (db.TaskOutput, error) {
return newOutput.(db.TaskOutput), nil
}

func (d *BoltDb) InsertTaskOutputBatch(output []db.TaskOutput) error {
if len(output) == 0 {
return nil
}

return d.db.Update(func(tx *bbolt.Tx) error {
for _, out := range output {
_, err := d.createObjectTx(tx, out.TaskID, db.TaskOutputProps, out)
if err != nil {
return err
}
}
return nil
})
}

func (d *BoltDb) getTasks(projectID int, templateID *int, params db.RetrieveQueryParams) (tasksWithTpl []db.TaskWithTpl, err error) {
var tasks []db.Task

Expand Down
4 changes: 3 additions & 1 deletion db/sql/migrations/v2.16.2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ create table project__task_params

alter table project__integration drop task_params;
alter table project__schedule add task_params_id int references `project__task_params`(`id`);
alter table project__integration add task_params_id int references `project__task_params`(`id`);
alter table project__integration add task_params_id int references `project__task_params`(`id`);

alter table `task__output` add `stage_id` int null references `task__stage`(`id`);
22 changes: 22 additions & 0 deletions db/sql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,28 @@ func (d *SqlDb) CreateTaskOutput(output db.TaskOutput) (db.TaskOutput, error) {
return output, err
}

func (d *SqlDb) InsertTaskOutputBatch(output []db.TaskOutput) error {

if len(output) == 0 {
return nil
}

q := squirrel.Insert("task__output").
Columns("task_id", "output", "time", "stage_id")

for _, item := range output {
q = q.Values(item.TaskID, item.Output, item.Time.UTC(), item.StageID)
}

query, args, err := q.ToSql()
if err != nil {
return err
}

_, err = d.exec(query, args...)
return err
}

func (d *SqlDb) getTasks(projectID int, templateID *int, taskIDs []int, params db.RetrieveQueryParams, tasks *[]db.TaskWithTpl) (err error) {
fields := "task.*"
fields += ", tpl.playbook as tpl_playbook" +
Expand Down
79 changes: 61 additions & 18 deletions services/tasks/TaskPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
)

type logRecord struct {
task *TaskRunner
output string
time time.Time
task *TaskRunner
output string
time time.Time
currentStage *db.TaskStage
}

type EventType uint
Expand All @@ -36,6 +37,11 @@ const (
EventTypeEmpty EventType = 3
)

const (
TaskOutputBatchSize = 500
TaskOutputInsertIntervalMs = 500
)

type PoolEvent struct {
eventType EventType
task *TaskRunner
Expand Down Expand Up @@ -138,9 +144,7 @@ func (p *TaskPool) GetTaskByAlias(alias string) (task *TaskRunner) {
func (p *TaskPool) Run() {
ticker := time.NewTicker(5 * time.Second)

defer func() {
ticker.Stop()
}()
defer ticker.Stop()

go p.handleQueue()
go p.handleLogs()
Expand Down Expand Up @@ -201,23 +205,49 @@ func (p *TaskPool) handleQueue() {
}

func (p *TaskPool) handleLogs() {
logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond)
Copy link
Preview

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ticker is never stopped, which can lead to a goroutine leak. Consider adding defer logTicker.Stop() or stopping it when the loop exits.

Suggested change
logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond)
logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond)
defer logTicker.Stop()

Copilot uses AI. Check for mistakes.


for record := range p.logger {
db.StoreSession(p.store, "logger", func() {
defer logTicker.Stop()

newOutput, err := p.store.CreateTaskOutput(db.TaskOutput{
TaskID: record.task.Task.ID,
Output: record.output,
Time: record.time,
})
logs := make([]logRecord, 0)

if err != nil {
log.Error(err)
return
for {

select {
case record := <-p.logger:
logs = append(logs, record)

if len(logs) >= TaskOutputBatchSize {
p.flushLogs(&logs)
}
case <-logTicker.C:
p.flushLogs(&logs)
}
}
}

currentOutput := record.task.currentOutput
record.task.currentOutput = &newOutput
func (p *TaskPool) flushLogs(logs *[]logRecord) {
if len(*logs) > 0 {
p.writeLogs(*logs)
*logs = (*logs)[:0]
}
}

func (p *TaskPool) writeLogs(logs []logRecord) {

taskOutput := make([]db.TaskOutput, 0)

for _, record := range logs {
newOutput := db.TaskOutput{
TaskID: record.task.Task.ID,
Output: record.output,
Time: record.time,
}

currentOutput := record.task.currentOutput
record.task.currentOutput = &newOutput

db.StoreSession(p.store, "logger", func() {

newStage, newState, err := stage_parsers.MoveToNextStage(
p.store,
Expand All @@ -240,8 +270,21 @@ func (p *TaskPool) handleLogs() {
if newStage != nil {
record.task.currentStage = newStage
}

if record.task.currentStage != nil {
newOutput.StageID = &record.task.currentStage.ID
}
})
taskOutput = append(taskOutput, newOutput)
}

db.StoreSession(p.store, "logger", func() {
err := p.store.InsertTaskOutputBatch(taskOutput)
if err != nil {
log.Error(err)
return
}
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug

The new batching logic for task output introduces several critical issues:

  • Missing Database IDs: Stage parsing via MoveToNextStage now receives TaskOutput objects without database IDs. Previously, CreateTaskOutput would assign an ID before stage processing. The new batch insert (InsertTaskOutputBatch) occurs later and does not update the original objects with their generated IDs, breaking logic dependent on valid output IDs.
  • Incorrect Previous Output: Within the batch processing loop, record.task.currentOutput is incorrectly updated. Since record.task is the same TaskRunner instance for all records from a task, currentOutput := record.task.currentOutput retrieves the output from the previous iteration within the batch, not the actual previous output for the current log record. This breaks the stage progression logic that relies on the correct preceding output.
  • Inconsistent State on Failure: Task state (task.currentState, task.currentStage) is updated by MoveToNextStage before the batch of outputs is persisted. If the batch insert fails, the in-memory task state becomes inconsistent with the database.
Locations (3)

Fix in CursorFix in Web

}

func runTask(task *TaskRunner, p *TaskPool) {
Expand Down
Loading