Skip to content

Commit fdc2597

Browse files
committed
Fix(datapipe): Do not set the last_run field as part of status update but
rather explicitly at start of analysis.
1 parent 84f3e15 commit fdc2597

File tree

4 files changed

+30
-13
lines changed

4 files changed

+30
-13
lines changed

cmd/api/src/daemons/datapipe/pipeline.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,11 @@ func (s *BHCEPipeline) Analyze(ctx context.Context) error {
207207

208208
defer measure.LogAndMeasure(slog.LevelInfo, "Graph Analysis")()
209209

210+
// Record the last time we started an analysis run
211+
if err := s.db.SetLastAnalysisStartTime(ctx); err != nil {
212+
return fmt.Errorf("update last analysis start time: %v", err)
213+
}
214+
210215
if err := RunAnalysisOperations(ctx, s.db, s.graphdb, s.cfg); err != nil {
211216
if errors.Is(err, ErrAnalysisFailed) {
212217
s.jobService.FailAnalyzedIngestJobs()

cmd/api/src/database/datapipestatus.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,26 @@ import (
2525

2626
type DatapipeStatusData interface {
2727
UpdateLastAnalysisCompleteTime(ctx context.Context) error
28+
SetLastAnalysisStartTime(ctx context.Context) error
2829
SetDatapipeStatus(ctx context.Context, status model.DatapipeStatus) error
2930
GetDatapipeStatus(ctx context.Context) (model.DatapipeStatusWrapper, error)
3031
}
3132

33+
// This should be called at the end of a successful analysis run (not always every analysis)
3234
func (s *BloodhoundDB) UpdateLastAnalysisCompleteTime(ctx context.Context) error {
3335
now := time.Now().UTC()
3436
return s.db.WithContext(ctx).Exec("UPDATE datapipe_status SET updated_at = ?, last_complete_analysis_at = ?", now, now).Error
3537
}
3638

39+
// This should be called at the start of analysis processing (not every datapipe tick, but start of real work)
40+
func (s *BloodhoundDB) SetLastAnalysisStartTime(ctx context.Context) error {
41+
now := time.Now().UTC()
42+
return s.db.WithContext(ctx).Exec("UPDATE datapipe_status SET updated_at = ?, last_analysis_run_at = ?", now, now).Error
43+
}
44+
3745
func (s *BloodhoundDB) SetDatapipeStatus(ctx context.Context, status model.DatapipeStatus) error {
3846
now := time.Now().UTC()
39-
// All queries will update the status and table update time
40-
updateSql := "UPDATE datapipe_status SET status = ?, updated_at = ?"
41-
42-
if status == model.DatapipeStatusAnalyzing {
43-
// Updates last run anytime we start analysis
44-
updateSql += ", last_analysis_run_at = ?;"
45-
return s.db.WithContext(ctx).Exec(updateSql, status, now, now).Error
46-
} else {
47-
// Otherwise, only update status and last update to the table
48-
updateSql += ";"
49-
return s.db.WithContext(ctx).Exec(updateSql, status, now).Error
50-
}
47+
return s.db.WithContext(ctx).Exec("UPDATE datapipe_status SET status = ?, updated_at = ?;", status, now).Error
5148
}
5249

5350
func (s *BloodhoundDB) GetDatapipeStatus(ctx context.Context) (model.DatapipeStatusWrapper, error) {

cmd/api/src/database/datapipestatus_integration_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ func TestDatapipeStatus(t *testing.T) {
4242
oldAnalysisTime := status.LastAnalysisRunAt
4343
err = db.SetDatapipeStatus(testCtx, model.DatapipeStatusAnalyzing)
4444
require.Nil(t, err)
45-
status, err = db.GetDatapipeStatus(testCtx)
45+
err = db.SetLastAnalysisStartTime(testCtx)
4646
require.Nil(t, err)
47+
status, err = db.GetDatapipeStatus(testCtx)
4748
assert.Equal(t, model.DatapipeStatusAnalyzing, status.Status)
4849
assert.True(t, oldAnalysisTime.Before(status.LastAnalysisRunAt))
4950
assert.True(t, status.LastCompleteAnalysisAt.IsZero())

cmd/api/src/database/mocks/db.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)