Skip to content

Commit 7b71af7

Browse files
feat: making pre,post,deploy triggers flows idempotent (#4486)
* update common-lib * added idempotency code and common-lib version bump * added the optionsal validations pubsub msg pre-processing-hooks logic and added duplicate trigger check as a validation for pre,post,deploy trigger flows * bump common-lib * added callback logger func * sql scripts added * query fix * query fix * fix * refactoring * remove dag exececutor dependency from ciEeventHalndler Sevice * bump common-lib * move context to trigger context * add logs * migration version fix * remove test data * update script number --------- Co-authored-by: Kripansh <[email protected]>
1 parent 83889d6 commit 7b71af7

25 files changed

+752
-316
lines changed

api/restHandler/PipelineTriggerRestHandler.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,10 @@ func (handler PipelineTriggerRestHandlerImpl) OverrideConfig(w http.ResponseWrit
129129
}
130130
ctx := context.WithValue(r.Context(), "token", acdToken)
131131
_, span := otel.Tracer("orchestrator").Start(ctx, "workflowDagExecutor.ManualCdTrigger")
132-
mergeResp, err := handler.workflowDagExecutor.ManualCdTrigger(&overrideRequest, ctx)
132+
triggerContext := pipeline.TriggerContext{
133+
Context: ctx,
134+
}
135+
mergeResp, err := handler.workflowDagExecutor.ManualCdTrigger(triggerContext, &overrideRequest)
133136
span.End()
134137
if err != nil {
135138
handler.logger.Errorw("request err, OverrideConfig", "err", err, "payload", overrideRequest)
@@ -224,7 +227,10 @@ func (handler PipelineTriggerRestHandlerImpl) StartStopApp(w http.ResponseWriter
224227
return
225228
}
226229
ctx := context.WithValue(r.Context(), "token", acdToken)
227-
mergeResp, err := handler.workflowDagExecutor.StopStartApp(&overrideRequest, ctx)
230+
triggerContext := pipeline.TriggerContext{
231+
Context: ctx,
232+
}
233+
mergeResp, err := handler.workflowDagExecutor.StopStartApp(triggerContext, &overrideRequest)
228234
if err != nil {
229235
handler.logger.Errorw("service err, StartStopApp", "err", err, "payload", overrideRequest)
230236
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)

api/router/pubsub/ApplicationStatusHandler.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"github.com/devtron-labs/common-lib/pubsub-lib/model"
2626
"github.com/devtron-labs/devtron/pkg/app"
27+
"k8s.io/utils/pointer"
2728
"time"
2829

2930
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
@@ -74,7 +75,7 @@ func NewApplicationStatusHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pu
7475
}
7576
err := appStatusUpdateHandlerImpl.Subscribe()
7677
if err != nil {
77-
//logger.Error("err", err)
78+
// logger.Error("err", err)
7879
return nil
7980
}
8081
err = appStatusUpdateHandlerImpl.SubscribeDeleteStatus()
@@ -91,7 +92,6 @@ type ApplicationDetail struct {
9192

9293
func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
9394
callback := func(msg *model.PubSubMsg) {
94-
impl.logger.Debugw("APP_STATUS_UPDATE_REQ", "stage", "raw", "data", msg.Data)
9595
applicationDetail := ApplicationDetail{}
9696
err := json.Unmarshal([]byte(msg.Data), &applicationDetail)
9797
if err != nil {
@@ -109,10 +109,10 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
109109
_, err = impl.pipelineRepository.GetArgoPipelineByArgoAppName(app.ObjectMeta.Name)
110110
if err != nil && err == pg.ErrNoRows {
111111
impl.logger.Infow("this app not found in pipeline table looking in installed_apps table", "appName", app.ObjectMeta.Name)
112-
//if not found in pipeline table then search in installed_apps table
112+
// if not found in pipeline table then search in installed_apps table
113113
gitOpsDeployedAppNames, err := impl.installedAppRepository.GetAllGitOpsDeploymentAppName()
114114
if err != nil && err == pg.ErrNoRows {
115-
//no installed_apps found
115+
// no installed_apps found
116116
impl.logger.Errorw("no installed apps found", "err", err)
117117
return
118118
} else if err != nil {
@@ -127,17 +127,17 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
127127
devtronGitOpsAppName = app.ObjectMeta.Name
128128
}
129129
if slices.Contains(gitOpsDeployedAppNames, devtronGitOpsAppName) {
130-
//app found in installed_apps table hence setting flag to true
130+
// app found in installed_apps table hence setting flag to true
131131
isAppStoreApplication = true
132132
} else {
133-
//app neither found in installed_apps nor in pipeline table hence returning
133+
// app neither found in installed_apps nor in pipeline table hence returning
134134
return
135135
}
136136
}
137137
isSucceeded, pipelineOverride, err := impl.appService.UpdateDeploymentStatusAndCheckIsSucceeded(app, applicationDetail.StatusTime, isAppStoreApplication)
138138
if err != nil {
139139
impl.logger.Errorw("error on application status update", "err", err, "msg", string(msg.Data))
140-
//TODO - check update for charts - fix this call
140+
// TODO - check update for charts - fix this call
141141
if err == pg.ErrNoRows {
142142
// if not found in charts (which is for devtron apps) try to find in installed app (which is for devtron charts)
143143
_, err := impl.installedAppService.UpdateInstalledAppVersionStatus(app)
@@ -153,7 +153,10 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
153153
// invoke DagExecutor, for cd success which will trigger post stage if exist.
154154
if isSucceeded {
155155
impl.logger.Debugw("git hash history", "list", app.Status.History)
156-
err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(pipelineOverride)
156+
triggerContext := pipeline.TriggerContext{
157+
ReferenceId: pointer.String(msg.MsgId),
158+
}
159+
err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(triggerContext, pipelineOverride)
157160
if err != nil {
158161
impl.logger.Errorw("deployment success event error", "pipelineOverride", pipelineOverride, "err", err)
159162
return
@@ -162,7 +165,13 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
162165
impl.logger.Debugw("application status update completed", "app", app.Name)
163166
}
164167

165-
err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_UPDATE_TOPIC, callback)
168+
// add required logging here
169+
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
170+
return "", nil
171+
}
172+
173+
validations := impl.workflowDagExecutor.GetTriggerValidateFuncs()
174+
err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_UPDATE_TOPIC, callback, loggerFunc, validations...)
166175
if err != nil {
167176
impl.logger.Error(err)
168177
return err
@@ -173,7 +182,6 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
173182
func (impl *ApplicationStatusHandlerImpl) SubscribeDeleteStatus() error {
174183
callback := func(msg *model.PubSubMsg) {
175184

176-
impl.logger.Debugw("APP_STATUS_DELETE_REQ", "stage", "raw", "data", msg.Data)
177185
applicationDetail := ApplicationDetail{}
178186
err := json.Unmarshal([]byte(msg.Data), &applicationDetail)
179187
if err != nil {
@@ -191,7 +199,18 @@ func (impl *ApplicationStatusHandlerImpl) SubscribeDeleteStatus() error {
191199
impl.logger.Errorw("error in updating pipeline delete status", "err", err, "appName", app.Name)
192200
}
193201
}
194-
err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_DELETE_TOPIC, callback)
202+
203+
// add required logging here
204+
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
205+
applicationDetail := ApplicationDetail{}
206+
err := json.Unmarshal([]byte(msg.Data), &applicationDetail)
207+
if err != nil {
208+
return "unmarshal error on app delete status", []interface{}{"err", err}
209+
}
210+
return "got message for application status delete", []interface{}{"appName", applicationDetail.Application.Name, "namespace", applicationDetail.Application.Namespace, "deleteTimestamp", applicationDetail.Application.DeletionTimestamp}
211+
}
212+
213+
err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_DELETE_TOPIC, callback, loggerFunc)
195214
if err != nil {
196215
impl.logger.Errorw("error in subscribing to argo application status delete topic", "err", err)
197216
return err
@@ -210,7 +229,7 @@ func (impl *ApplicationStatusHandlerImpl) updateArgoAppDeleteStatus(app *v1alpha
210229
return errors.New("invalid nats message, pipeline already deleted")
211230
}
212231
if err == pg.ErrNoRows {
213-
//Helm app deployed using argocd
232+
// Helm app deployed using argocd
214233
var gitHash string
215234
if app.Operation != nil && app.Operation.Sync != nil {
216235
gitHash = app.Operation.Sync.Revision
@@ -229,7 +248,7 @@ func (impl *ApplicationStatusHandlerImpl) updateArgoAppDeleteStatus(app *v1alpha
229248
impl.logger.Errorw("App not found in database", "installedAppId", model.InstalledAppId, "err", err)
230249
return fmt.Errorf("app not found in database %s", err)
231250
} else if installedApp.DeploymentAppDeleteRequest == false {
232-
//TODO 4465 remove app from log after final RCA
251+
// TODO 4465 remove app from log after final RCA
233252
impl.logger.Infow("Deployment delete not requested for app, not deleting app from DB", "appName", app.Name, "app", app)
234253
return nil
235254
}

api/router/pubsub/CiEventHandler.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/devtron-labs/devtron/util"
3232
"go.uber.org/zap"
3333
"gopkg.in/go-playground/validator.v9"
34+
"k8s.io/utils/pointer"
3435
"time"
3536
)
3637

@@ -100,8 +101,6 @@ func NewCiEventHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pubsub.PubSu
100101

101102
func (impl *CiEventHandlerImpl) Subscribe() error {
102103
callback := func(msg *model.PubSubMsg) {
103-
impl.logger.Debugw("ci complete event received")
104-
//defer msg.Ack()
105104
ciCompleteEvent := CiCompleteEvent{}
106105
err := json.Unmarshal([]byte(string(msg.Data)), &ciCompleteEvent)
107106
if err != nil {
@@ -114,6 +113,10 @@ func (impl *CiEventHandlerImpl) Subscribe() error {
114113
return
115114
}
116115

116+
triggerContext := pipeline.TriggerContext{
117+
ReferenceId: pointer.String(msg.MsgId),
118+
}
119+
117120
if ciCompleteEvent.FailureReason != "" {
118121
req.FailureReason = ciCompleteEvent.FailureReason
119122
err := impl.webhookService.HandleCiStepFailedEvent(ciCompleteEvent.PipelineId, req)
@@ -136,7 +139,7 @@ func (impl *CiEventHandlerImpl) Subscribe() error {
136139
impl.logger.Error("Error while creating request for pipelineID", "pipelineId", ciCompleteEvent.PipelineId, "err", err)
137140
return
138141
}
139-
resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, request, detail.ImagePushedAt)
142+
resp, err := impl.ValidateAndHandleCiSuccessEvent(triggerContext, ciCompleteEvent.PipelineId, request, detail.ImagePushedAt)
140143
if err != nil {
141144
return
142145
}
@@ -146,28 +149,40 @@ func (impl *CiEventHandlerImpl) Subscribe() error {
146149

147150
} else {
148151
util.TriggerCIMetrics(ciCompleteEvent.Metrics, impl.ciEventConfig.ExposeCiMetrics, ciCompleteEvent.PipelineName, ciCompleteEvent.AppName)
149-
resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, req, &time.Time{})
152+
resp, err := impl.ValidateAndHandleCiSuccessEvent(triggerContext, ciCompleteEvent.PipelineId, req, &time.Time{})
150153
if err != nil {
151154
return
152155
}
153156
impl.logger.Debug(resp)
154157
}
155158
}
156-
err := impl.pubsubClient.Subscribe(pubsub.CI_COMPLETE_TOPIC, callback)
159+
160+
// add required logging here
161+
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
162+
ciCompleteEvent := CiCompleteEvent{}
163+
err := json.Unmarshal([]byte(string(msg.Data)), &ciCompleteEvent)
164+
if err != nil {
165+
return "error while unmarshalling json data", []interface{}{"error", err}
166+
}
167+
return "got message for ci-completion", []interface{}{"ciPipelineId", ciCompleteEvent.PipelineId, "workflowId", ciCompleteEvent.WorkflowId}
168+
}
169+
170+
validations := impl.webhookService.GetTriggerValidateFuncs()
171+
err := impl.pubsubClient.Subscribe(pubsub.CI_COMPLETE_TOPIC, callback, loggerFunc, validations...)
157172
if err != nil {
158173
impl.logger.Error(err)
159174
return err
160175
}
161176
return nil
162177
}
163178

164-
func (impl *CiEventHandlerImpl) ValidateAndHandleCiSuccessEvent(ciPipelineId int, request *pipeline.CiArtifactWebhookRequest, imagePushedAt *time.Time) (int, error) {
179+
func (impl *CiEventHandlerImpl) ValidateAndHandleCiSuccessEvent(triggerContext pipeline.TriggerContext, ciPipelineId int, request *pipeline.CiArtifactWebhookRequest, imagePushedAt *time.Time) (int, error) {
165180
validationErr := impl.validator.Struct(request)
166181
if validationErr != nil {
167182
impl.logger.Errorw("validation err, HandleCiSuccessEvent", "err", validationErr, "payload", request)
168183
return 0, validationErr
169184
}
170-
buildArtifactId, err := impl.webhookService.HandleCiSuccessEvent(ciPipelineId, request, imagePushedAt)
185+
buildArtifactId, err := impl.webhookService.HandleCiSuccessEvent(triggerContext, ciPipelineId, request, imagePushedAt)
171186
if err != nil {
172187
impl.logger.Error("Error while sending event for CI success for pipelineID",
173188
ciPipelineId, "request", request, "error", err)

api/router/pubsub/GitWebhookHandler.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,18 @@ func (impl *GitWebhookHandlerImpl) Subscribe() error {
6868
return
6969
}
7070
}
71-
err := impl.pubsubClient.Subscribe(pubsub.NEW_CI_MATERIAL_TOPIC, callback)
71+
72+
// add required logging here
73+
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
74+
ciPipelineMaterial := gitSensor.CiPipelineMaterial{}
75+
err := json.Unmarshal([]byte(string(msg.Data)), &ciPipelineMaterial)
76+
if err != nil {
77+
return "error while unmarshalling json response", []interface{}{"error", err}
78+
}
79+
return "got message for about new ci material", []interface{}{"ciPipelineMaterialId", ciPipelineMaterial.Id, "gitMaterialId", ciPipelineMaterial.GitMaterialId, "type", ciPipelineMaterial.Type}
80+
}
81+
82+
err := impl.pubsubClient.Subscribe(pubsub.NEW_CI_MATERIAL_TOPIC, callback, loggerFunc)
7283
if err != nil {
7384
impl.logger.Error("err", err)
7485
return err

api/router/pubsub/WorkflowStatusUpdateHandler.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ func NewWorkflowStatusUpdateHandlerImpl(logger *zap.SugaredLogger, pubsubClient
7171

7272
func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error {
7373
callback := func(msg *model.PubSubMsg) {
74-
impl.logger.Debug("received wf update request")
75-
//defer msg.Ack()
7674
wfStatus := v1alpha1.WorkflowStatus{}
7775
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
7876
if err != nil {
@@ -93,7 +91,20 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error {
9391
}
9492

9593
}
96-
err := impl.pubsubClient.Subscribe(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, callback)
94+
95+
// add required logging here
96+
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
97+
wfStatus := v1alpha1.WorkflowStatus{}
98+
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
99+
if err != nil {
100+
return "error while unmarshalling wf status update", []interface{}{"err", err, "msg", string(msg.Data)}
101+
}
102+
103+
workflowName, status, _, message, _, _ := pipeline.ExtractWorkflowStatus(wfStatus)
104+
return "got message for ci workflow status update ", []interface{}{"workflowName", workflowName, "status", status, "message", message}
105+
}
106+
107+
err := impl.pubsubClient.Subscribe(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, callback, loggerFunc)
97108

98109
if err != nil {
99110
impl.logger.Error("err", err)
@@ -104,16 +115,13 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error {
104115

105116
func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error {
106117
callback := func(msg *model.PubSubMsg) {
107-
impl.logger.Debug("received cd wf update request")
108-
//defer msg.Ack()
109118
wfStatus := v1alpha1.WorkflowStatus{}
110119
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
111120
if err != nil {
112121
impl.logger.Error("Error while unmarshalling wfStatus json object", "error", err)
113122
return
114123
}
115124

116-
impl.logger.Debugw("received cd wf update request body", "body", wfStatus)
117125
wfrId, wfrStatus, err := impl.cdHandler.UpdateWorkflow(wfStatus)
118126
impl.logger.Debugw("UpdateWorkflow", "wfrId", wfrId, "wfrStatus", wfrStatus)
119127
if err != nil {
@@ -170,7 +178,19 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error {
170178
}
171179
}
172180
}
173-
err := impl.pubsubClient.Subscribe(pubsub.CD_WORKFLOW_STATUS_UPDATE, callback)
181+
182+
// add required logging here
183+
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
184+
wfStatus := v1alpha1.WorkflowStatus{}
185+
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
186+
if err != nil {
187+
return "error while unmarshalling wfStatus json object", []interface{}{"error", err}
188+
}
189+
workflowName, status, _, message, _, _ := pipeline.ExtractWorkflowStatus(wfStatus)
190+
return "got message for cd workflow status", []interface{}{"workflowName", workflowName, "status", status, "message", message}
191+
}
192+
193+
err := impl.pubsubClient.Subscribe(pubsub.CD_WORKFLOW_STATUS_UPDATE, callback, loggerFunc)
174194
if err != nil {
175195
impl.logger.Error("err", err)
176196
return err

0 commit comments

Comments
 (0)