Skip to content

Commit 841b20f

Browse files
Nickcw6jibsen-vh
authored andcommitted
feat(circleci-plugin): incremental data collection (apache#7986)
* feat(api_collector_stateful): handle case were response has records from both before & after createdAfter of last collection * feat(circleci-plugin): incremental collection for pipelines * feat(api_collector_stateful): expose Input collector arg for StatefulFinalizableEntity to collect data based on previous collection * feat(circleci-plugin): incremental data collection for workflows * feat(circleci-plugin): incremental data collection for jobs * refactor(circleci-plugin): use common query param function * refactor(circleci-plugin): use BuildQueryParamsWithPageToken func when collecting unfinished job details
1 parent 8240c8d commit 841b20f

File tree

5 files changed

+219
-57
lines changed

5 files changed

+219
-57
lines changed

backend/helpers/pluginhelper/api/api_collector_stateful.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,19 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg
139139
createdAfter := manager.CollectorStateManager.GetSince()
140140
isIncremental := manager.CollectorStateManager.IsIncremental()
141141

142+
var inputIterator Iterator
143+
if args.CollectNewRecordsByList.BuildInputIterator != nil {
144+
inputIterator, err = args.CollectNewRecordsByList.BuildInputIterator(isIncremental, createdAfter)
145+
if err != nil {
146+
return nil, err
147+
}
148+
}
149+
142150
// step 1: create a collector to collect newly added records
143151
err = manager.InitCollector(ApiCollectorArgs{
144152
ApiClient: args.ApiClient,
145153
// common
154+
Input: inputIterator,
146155
Incremental: isIncremental,
147156
UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
148157
Query: func(reqData *RequestData) (url.Values, errors.Error) {
@@ -169,21 +178,41 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg
169178

170179
// time filter or diff sync
171180
if createdAfter != nil && args.CollectNewRecordsByList.GetCreated != nil {
172-
// if the first record of the page was created before createdAfter, return emtpy set and stop
181+
// if the first record of the page was created before createdAfter and not a zero value, return empty set and stop
173182
firstCreated, err := args.CollectNewRecordsByList.GetCreated(items[0])
174183
if err != nil {
175184
return nil, err
176185
}
177-
if firstCreated.Before(*createdAfter) {
186+
if firstCreated.Before(*createdAfter) && !firstCreated.IsZero() {
178187
return nil, ErrFinishCollect
179188
}
180-
// if the last record was created before createdAfter, return records and stop
189+
190+
// If last record was created before CreatedAfter, including a zero value, check each record individually
181191
lastCreated, err := args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
182192
if err != nil {
183193
return nil, err
184194
}
185195
if lastCreated.Before(*createdAfter) {
186-
return items, ErrFinishCollect
196+
var validItems []json.RawMessage
197+
// Only collect items that were created after the last successful collection to prevent duplicates
198+
for _, item := range items {
199+
itemCreatedAt, err := args.CollectNewRecordsByList.GetCreated(item)
200+
if err != nil {
201+
return nil, err
202+
}
203+
204+
if itemCreatedAt.IsZero() {
205+
// If zero then timestamp is null on the response - accept as valid for downstream processing
206+
validItems = append(validItems, item)
207+
continue
208+
}
209+
210+
if itemCreatedAt.Before(*createdAfter) {
211+
// Once we reach an item that was created before the last successful collection, stop & return
212+
return validItems, ErrFinishCollect
213+
}
214+
validItems = append(validItems, item)
215+
}
187216
}
188217
}
189218
return items, err
@@ -267,6 +296,7 @@ type FinalizableApiCollectorListArgs struct {
267296
Concurrency int // required for Undetermined Strategy, number of concurrent requests
268297
GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) // required for Sequential Strategy, to extract the next page cursor from the given response
269298
GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error) // required for Determined Strategy, to extract the total number of pages from the given response
299+
BuildInputIterator func(isIncremental bool, createdAfter *time.Time) (Iterator, errors.Error)
270300
}
271301

272302
// FinalizableApiCollectorDetailArgs is the arguments for the detail collector

backend/plugins/circleci/tasks/job_collector.go

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ limitations under the License.
1818
package tasks
1919

2020
import (
21+
"encoding/json"
22+
"reflect"
23+
"time"
24+
2125
"github.com/apache/incubator-devlake/core/dal"
2226
"github.com/apache/incubator-devlake/core/errors"
2327
"github.com/apache/incubator-devlake/core/plugin"
@@ -43,30 +47,68 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
4347
logger := taskCtx.GetLogger()
4448
logger.Info("collect jobs")
4549

46-
clauses := []dal.Clause{
47-
dal.Select("id, pipeline_id"),
48-
dal.From(&models.CircleciWorkflow{}),
49-
dal.Where("_tool_circleci_workflows.connection_id = ? and _tool_circleci_workflows.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug),
50-
}
50+
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
51+
RawDataSubTaskArgs: *rawDataSubTaskArgs,
52+
ApiClient: data.ApiClient,
53+
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
54+
PageSize: int(data.Options.PageSize),
55+
GetNextPageCustomData: ExtractNextPageToken,
56+
BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) {
57+
clauses := []dal.Clause{
58+
dal.Select("id, pipeline_id"), // pipeline_id not on individual job response but required for result
59+
dal.From(&models.CircleciWorkflow{}),
60+
dal.Where("connection_id = ? and project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
61+
}
5162

52-
db := taskCtx.GetDal()
53-
cursor, err := db.Cursor(clauses...)
54-
if err != nil {
55-
return err
56-
}
57-
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
58-
if err != nil {
59-
return err
60-
}
63+
if isIncremental {
64+
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
65+
}
66+
67+
db := taskCtx.GetDal()
68+
cursor, err := db.Cursor(clauses...)
69+
if err != nil {
70+
return nil, err
71+
}
72+
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
73+
},
74+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
75+
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
76+
Query: BuildQueryParamsWithPageToken,
77+
ResponseParser: ParseCircleciPageTokenResp,
78+
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
79+
},
80+
GetCreated: func(item json.RawMessage) (time.Time, errors.Error) {
81+
var job struct { // Individual job response lacks created_at field, so have to use started_at
82+
CreatedAt time.Time `json:"started_at"` // This will be null in some cases (e.g. queued, not_running, blocked)
83+
}
84+
if err := json.Unmarshal(item, &job); err != nil {
85+
return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal job")
86+
}
87+
return job.CreatedAt, nil
88+
},
89+
},
90+
CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{
91+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
92+
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", // The individual job endpoint has different fields so need to recollect all jobs for a workflow
93+
Query: BuildQueryParamsWithPageToken,
94+
ResponseParser: ParseCircleciPageTokenResp,
95+
AfterResponse: ignoreDeletedBuilds,
96+
},
97+
BuildInputIterator: func() (api.Iterator, errors.Error) {
98+
clauses := []dal.Clause{
99+
dal.Select("DISTINCT workflow_id"), // Only need to recollect jobs for a workflow once
100+
dal.From(&models.CircleciJob{}),
101+
dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')", data.Options.ConnectionId, data.Options.ProjectSlug),
102+
}
61103

62-
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
63-
RawDataSubTaskArgs: *rawDataSubTaskArgs,
64-
ApiClient: data.ApiClient,
65-
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
66-
Input: iterator,
67-
GetNextPageCustomData: ExtractNextPageToken,
68-
Query: BuildQueryParamsWithPageToken,
69-
ResponseParser: ParseCircleciPageTokenResp,
104+
db := taskCtx.GetDal()
105+
cursor, err := db.Cursor(clauses...)
106+
if err != nil {
107+
return nil, err
108+
}
109+
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciJob{}))
110+
},
111+
},
70112
})
71113
if err != nil {
72114
logger.Error(err, "collect jobs error")

backend/plugins/circleci/tasks/pipeline_collector.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ limitations under the License.
1818
package tasks
1919

2020
import (
21+
"encoding/json"
22+
"net/http"
23+
2124
"github.com/apache/incubator-devlake/core/errors"
2225
"github.com/apache/incubator-devlake/core/plugin"
2326
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -39,14 +42,39 @@ func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
3942
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_TABLE)
4043
logger := taskCtx.GetLogger()
4144
logger.Info("collect pipelines")
42-
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
43-
RawDataSubTaskArgs: *rawDataSubTaskArgs,
44-
ApiClient: data.ApiClient,
45-
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
46-
PageSize: int(data.Options.PageSize),
47-
GetNextPageCustomData: ExtractNextPageToken,
48-
Query: BuildQueryParamsWithPageToken,
49-
ResponseParser: ParseCircleciPageTokenResp,
45+
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
46+
RawDataSubTaskArgs: *rawDataSubTaskArgs,
47+
ApiClient: data.ApiClient,
48+
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
49+
PageSize: int(data.Options.PageSize),
50+
GetNextPageCustomData: ExtractNextPageToken,
51+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
52+
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
53+
Query: BuildQueryParamsWithPageToken,
54+
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
55+
data := CircleciPageTokenResp[[]json.RawMessage]{}
56+
err := api.UnmarshalResponse(res, &data)
57+
58+
if err != nil {
59+
return nil, err
60+
}
61+
filteredItems := []json.RawMessage{}
62+
for _, item := range data.Items {
63+
pipelineCreatedAt, err := extractCreatedAt(item)
64+
65+
if err != nil {
66+
return nil, err
67+
}
68+
if pipelineCreatedAt.Before(*timeAfter) {
69+
return filteredItems, api.ErrFinishCollect
70+
}
71+
filteredItems = append(filteredItems, item)
72+
}
73+
return filteredItems, nil
74+
},
75+
},
76+
GetCreated: extractCreatedAt,
77+
},
5078
})
5179
if err != nil {
5280
logger.Error(err, "collect pipelines error")

backend/plugins/circleci/tasks/shared.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ package tasks
1919

2020
import (
2121
"encoding/json"
22+
"net/http"
23+
"net/url"
24+
"time"
25+
2226
"github.com/apache/incubator-devlake/core/dal"
2327
"github.com/apache/incubator-devlake/core/errors"
2428
"github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
@@ -107,7 +111,7 @@ func ExtractNextPageToken(prevReqData *api.RequestData, prevPageResponse *http.R
107111
return res.NextPageToken, nil
108112
}
109113

110-
func BuildQueryParamsWithPageToken(reqData *api.RequestData) (url.Values, errors.Error) {
114+
func BuildQueryParamsWithPageToken(reqData *api.RequestData, _ *time.Time) (url.Values, errors.Error) {
111115
query := url.Values{}
112116
if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
113117
query.Set("page-token", pageToken)
@@ -120,3 +124,22 @@ func ParseCircleciPageTokenResp(res *http.Response) ([]json.RawMessage, errors.E
120124
err := api.UnmarshalResponse(res, &data)
121125
return data.Items, err
122126
}
127+
128+
func ignoreDeletedBuilds(res *http.Response) errors.Error {
129+
// CircleCI API will return a 404 response for a workflow/job that has been deleted
130+
// due to their data retention policy. We should ignore these errors.
131+
if res.StatusCode == http.StatusNotFound {
132+
return api.ErrIgnoreAndContinue
133+
}
134+
return nil
135+
}
136+
137+
func extractCreatedAt(item json.RawMessage) (time.Time, errors.Error) {
138+
var entity struct {
139+
CreatedAt time.Time `json:"created_at"`
140+
}
141+
if err := json.Unmarshal(item, &entity); err != nil {
142+
return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal item")
143+
}
144+
return entity.CreatedAt, nil
145+
}

backend/plugins/circleci/tasks/workflow_collector.go

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ limitations under the License.
1818
package tasks
1919

2020
import (
21+
"encoding/json"
22+
"net/http"
23+
"reflect"
24+
"time"
25+
2126
"github.com/apache/incubator-devlake/core/dal"
2227
"github.com/apache/incubator-devlake/core/errors"
2328
"github.com/apache/incubator-devlake/core/plugin"
@@ -43,30 +48,64 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error {
4348
logger := taskCtx.GetLogger()
4449
logger.Info("collect workflows")
4550

46-
clauses := []dal.Clause{
47-
dal.Select("id"),
48-
dal.From(&models.CircleciPipeline{}),
49-
dal.Where("_tool_circleci_pipelines.connection_id = ? and _tool_circleci_pipelines.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug),
50-
}
51+
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
52+
RawDataSubTaskArgs: *rawDataSubTaskArgs,
53+
ApiClient: data.ApiClient,
54+
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
55+
PageSize: int(data.Options.PageSize),
56+
GetNextPageCustomData: ExtractNextPageToken,
57+
BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) {
58+
clauses := []dal.Clause{
59+
dal.Select("id"),
60+
dal.From(&models.CircleciPipeline{}),
61+
dal.Where("connection_id = ? AND project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
62+
}
5163

52-
db := taskCtx.GetDal()
53-
cursor, err := db.Cursor(clauses...)
54-
if err != nil {
55-
return err
56-
}
57-
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{}))
58-
if err != nil {
59-
return err
60-
}
64+
if isIncremental {
65+
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
66+
}
67+
68+
db := taskCtx.GetDal()
69+
cursor, err := db.Cursor(clauses...)
70+
if err != nil {
71+
return nil, err
72+
}
73+
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{}))
74+
},
75+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
76+
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
77+
Query: BuildQueryParamsWithPageToken,
78+
ResponseParser: ParseCircleciPageTokenResp,
79+
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
80+
},
81+
GetCreated: extractCreatedAt,
82+
},
83+
CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{
84+
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
85+
UrlTemplate: "/v2/workflow/{{ .Input.Id }}",
86+
Query: nil,
87+
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
88+
var data json.RawMessage
89+
err := api.UnmarshalResponse(res, &data)
90+
return []json.RawMessage{data}, err
91+
},
92+
AfterResponse: ignoreDeletedBuilds,
93+
},
94+
BuildInputIterator: func() (api.Iterator, errors.Error) {
95+
clauses := []dal.Clause{
96+
dal.Select("id"),
97+
dal.From(&models.CircleciWorkflow{}),
98+
dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'on_hold', 'failing')", data.Options.ConnectionId, data.Options.ProjectSlug),
99+
}
61100

62-
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
63-
RawDataSubTaskArgs: *rawDataSubTaskArgs,
64-
ApiClient: data.ApiClient,
65-
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
66-
Input: iterator,
67-
GetNextPageCustomData: ExtractNextPageToken,
68-
Query: BuildQueryParamsWithPageToken,
69-
ResponseParser: ParseCircleciPageTokenResp,
101+
db := taskCtx.GetDal()
102+
cursor, err := db.Cursor(clauses...)
103+
if err != nil {
104+
return nil, err
105+
}
106+
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
107+
},
108+
},
70109
})
71110
if err != nil {
72111
logger.Error(err, "collect workflows error")

0 commit comments

Comments
 (0)