Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/api/src/daemons/datapipe/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ func (s *BHCEPipeline) Analyze(ctx context.Context) error {

defer measure.LogAndMeasure(slog.LevelInfo, "Graph Analysis")()

// Record the last time we started an analysis run
if err := s.db.SetLastAnalysisStartTime(ctx); err != nil {
return fmt.Errorf("update last analysis start time: %v", err)
}

if err := RunAnalysisOperations(ctx, s.db, s.graphdb, s.cfg); err != nil {
if errors.Is(err, ErrAnalysisFailed) {
s.jobService.FailAnalyzedIngestJobs()
Expand Down
21 changes: 9 additions & 12 deletions cmd/api/src/database/datapipestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,26 @@ import (

type DatapipeStatusData interface {
UpdateLastAnalysisCompleteTime(ctx context.Context) error
SetLastAnalysisStartTime(ctx context.Context) error
SetDatapipeStatus(ctx context.Context, status model.DatapipeStatus) error
GetDatapipeStatus(ctx context.Context) (model.DatapipeStatusWrapper, error)
}

// This should be called at the end of a successful analysis run (not always every analysis)
func (s *BloodhoundDB) UpdateLastAnalysisCompleteTime(ctx context.Context) error {
now := time.Now().UTC()
return s.db.WithContext(ctx).Exec("UPDATE datapipe_status SET updated_at = ?, last_complete_analysis_at = ?", now, now).Error
}

// This should be called at the start of analysis processing (not every datapipe tick, but start of real work)
func (s *BloodhoundDB) SetLastAnalysisStartTime(ctx context.Context) error {
now := time.Now().UTC()
return s.db.WithContext(ctx).Exec("UPDATE datapipe_status SET updated_at = ?, last_analysis_run_at = ?", now, now).Error
}

func (s *BloodhoundDB) SetDatapipeStatus(ctx context.Context, status model.DatapipeStatus) error {
now := time.Now().UTC()
// All queries will update the status and table update time
updateSql := "UPDATE datapipe_status SET status = ?, updated_at = ?"

if status == model.DatapipeStatusAnalyzing {
// Updates last run anytime we start analysis
updateSql += ", last_analysis_run_at = ?;"
return s.db.WithContext(ctx).Exec(updateSql, status, now, now).Error
} else {
// Otherwise, only update status and last update to the table
updateSql += ";"
return s.db.WithContext(ctx).Exec(updateSql, status, now).Error
}
return s.db.WithContext(ctx).Exec("UPDATE datapipe_status SET status = ?, updated_at = ?;", status, now).Error
}

func (s *BloodhoundDB) GetDatapipeStatus(ctx context.Context) (model.DatapipeStatusWrapper, error) {
Expand Down
2 changes: 2 additions & 0 deletions cmd/api/src/database/datapipestatus_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func TestDatapipeStatus(t *testing.T) {
oldAnalysisTime := status.LastAnalysisRunAt
err = db.SetDatapipeStatus(testCtx, model.DatapipeStatusAnalyzing)
require.Nil(t, err)
err = db.SetLastAnalysisStartTime(testCtx)
require.Nil(t, err)
status, err = db.GetDatapipeStatus(testCtx)
require.Nil(t, err)
assert.Equal(t, model.DatapipeStatusAnalyzing, status.Status)
Expand Down
14 changes: 14 additions & 0 deletions cmd/api/src/database/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.