Skip to content

batch output insert #3167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open

batch output insert #3167

wants to merge 7 commits into from

Conversation

mgukov
Copy link
Collaborator

@mgukov mgukov commented Jul 26, 2025

  • feat(be): batch task output insert
  • feat(be): batch task output insert, boltDB
  • feat(be): batch task output insert, boltDB
  • feat(be): batch task output insert, boltDB

@fiftin fiftin requested a review from Copilot July 26, 2025 18:52
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements batch processing for task output inserts to improve performance by reducing database operations. Instead of inserting task outputs one by one, the system now collects outputs in batches and flushes them either when reaching a batch size limit or at regular intervals.

  • Introduces batching mechanism with configurable batch size and flush interval
  • Adds new database methods for batch insertion in both SQL and BoltDB implementations
  • Updates task output structure to include stage ID for better tracking

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
services/tasks/TaskPool.go Implements batch collection and flushing logic with ticker-based intervals
db/sql/task.go Adds SQL batch insert method using bulk insert query
db/bolt/task.go Adds BoltDB batch insert method using transaction
db/Task.go Adds StageID field to TaskOutput struct
db/Store.go Adds batch insert method to TaskManager interface
db/sql/migrations/v2.16.2.sql Adds stage_id column to task__output table

Comment on lines 240 to 249
newOutput := db.TaskOutput{
TaskID: record.task.Task.ID,
Output: record.output,
Time: record.time,
}
taskOutput = append(taskOutput, newOutput)

currentOutput := record.task.currentOutput
record.task.currentOutput = &newOutput

Copy link
Preview

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newOutput variable is being modified after assignment to record.task.currentOutput. The StageID is set on line 275, but this modification won't be reflected in the task's currentOutput since it's a copy, not a reference to the same object.

Copilot uses AI. Check for mistakes.

Comment on lines 209 to 214
func (p *TaskPool) handleLogs() {
logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond)
logs := make([]logRecord, 0)

for record := range p.logger {
db.StoreSession(p.store, "logger", func() {
for {

Copy link
Preview

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This infinite loop has no exit condition and will prevent the goroutine from terminating gracefully. Consider adding a context or done channel to allow proper shutdown.

Copilot uses AI. Check for mistakes.

@@ -201,23 +207,47 @@ func (p *TaskPool) handleQueue() {
}

func (p *TaskPool) handleLogs() {
logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond)
Copy link
Preview

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ticker is never stopped, which can lead to a goroutine leak. Consider adding defer logTicker.Stop() or stopping it when the loop exits.

Suggested change
logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond)
logTicker := time.NewTicker(TaskOutputInsertIntervalMs * time.Millisecond)
defer logTicker.Stop()

Copilot uses AI. Check for mistakes.

func (p *TaskPool) flushLogs(logs *[]logRecord) {
if len(*logs) > 0 {
p.writeLogs(*logs)
*logs = make([]logRecord, 0)
Copy link
Preview

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new slice instead of clearing the existing one may cause unnecessary allocations. Consider using *logs = (*logs)[:0] to reuse the underlying array.

Suggested change
*logs = make([]logRecord, 0)
*logs = (*logs)[:0]

Copilot uses AI. Check for mistakes.

@fiftin
Copy link
Collaborator

fiftin commented Jul 27, 2025

@cursor review

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugbot free trial expires on August 10, 2025
Learn more in the Cursor dashboard.

log.Error(err)
return
}
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug

The new batching logic for task output introduces several critical issues:

  • Missing Database IDs: Stage parsing via MoveToNextStage now receives TaskOutput objects without database IDs. Previously, CreateTaskOutput would assign an ID before stage processing. The new batch insert (InsertTaskOutputBatch) occurs later and does not update the original objects with their generated IDs, breaking logic dependent on valid output IDs.
  • Incorrect Previous Output: Within the batch processing loop, record.task.currentOutput is incorrectly updated. Since record.task is the same TaskRunner instance for all records from a task, currentOutput := record.task.currentOutput retrieves the output from the previous iteration within the batch, not the actual previous output for the current log record. This breaks the stage progression logic that relies on the correct preceding output.
  • Inconsistent State on Failure: Task state (task.currentState, task.currentStage) is updated by MoveToNextStage before the batch of outputs is persisted. If the batch insert fails, the in-memory task state becomes inconsistent with the database.
Locations (3)

Fix in CursorFix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants