Skip to content

Commit 0c4b620

Browse files
authored
feat(engine): add region requirement on integration (#5932)
1 parent 48da39b commit 0c4b620

File tree

4 files changed

+66
-26
lines changed

4 files changed

+66
-26
lines changed

engine/api/workflow/execute_node_run.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func releaseMutex(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store ca
333333
ORDER BY workflow_run.num ASC
334334
LIMIT 1
335335
`
336-
waitingRunID, err := db.SelectInt(mutexQuery, workflowID, nodeName, string(sdk.StatusWaiting))
336+
waitingRunID, err := db.SelectInt(mutexQuery, workflowID, nodeName, sdk.StatusWaiting)
337337
if err != nil && err != sql.ErrNoRows {
338338
err = sdk.WrapError(err, "unable to load mutex-locked workflow node run id")
339339
ctx = sdk.ContextWithStacktrace(ctx, err)
@@ -427,7 +427,7 @@ func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage,
427427
}
428428

429429
_, next = telemetry.Span(ctx, "workflow.getIntegrationPlugins")
430-
integrationPlugins, err := getIntegrationPlugins(db, wr, nr)
430+
integrationConfigs, integrationPlugins, err := getIntegrationPlugins(db, wr, nr)
431431
if err != nil {
432432
return report, sdk.WrapError(err, "unable to get integration plugins requirement")
433433
}
@@ -468,7 +468,7 @@ jobLoop:
468468
}
469469

470470
_, next = telemetry.Span(ctx, "workflow.processNodeJobRunRequirements")
471-
jobRequirements, containsService, wm, err := processNodeJobRunRequirements(ctx, db, *job, nr, sdk.Groups(groups).ToIDs(), integrationPlugins)
471+
jobRequirements, containsService, wm, err := processNodeJobRunRequirements(ctx, db, *job, nr, sdk.Groups(groups).ToIDs(), integrationPlugins, integrationConfigs)
472472
next()
473473
if err != nil {
474474
spawnErrs.Join(*err)
@@ -577,25 +577,28 @@ jobLoop:
577577
return report, nil
578578
}
579579

580-
func getIntegrationPlugins(db gorp.SqlExecutor, wr *sdk.WorkflowRun, nr *sdk.WorkflowNodeRun) ([]sdk.GRPCPlugin, error) {
580+
func getIntegrationPlugins(db gorp.SqlExecutor, wr *sdk.WorkflowRun, nr *sdk.WorkflowNodeRun) ([]sdk.IntegrationConfig, []sdk.GRPCPlugin, error) {
581581
plugins := make([]sdk.GRPCPlugin, 0)
582-
var projectIntegrationModelID int64
582+
mapConfig := make([]sdk.IntegrationConfig, 0)
583+
584+
var projectIntegration *sdk.ProjectIntegration
583585
node := wr.Workflow.WorkflowData.NodeByID(nr.WorkflowNodeID)
584586
if node != nil && node.Context != nil {
585587
if node.Context.ProjectIntegrationID != 0 {
586588
pp, has := wr.Workflow.ProjectIntegrations[node.Context.ProjectIntegrationID]
587589
if has {
588-
projectIntegrationModelID = pp.Model.ID
590+
projectIntegration = &pp
589591
}
590592
}
591593
}
592594

593-
if projectIntegrationModelID > 0 {
594-
plugin, err := plugin.LoadByIntegrationModelIDAndType(db, projectIntegrationModelID, sdk.GRPCPluginDeploymentIntegration)
595+
if projectIntegration != nil && projectIntegration.Model.ID > 0 {
596+
mapConfig = append(mapConfig, projectIntegration.Config)
597+
plg, err := plugin.LoadByIntegrationModelIDAndType(db, projectIntegration.Model.ID, sdk.GRPCPluginDeploymentIntegration)
595598
if err != nil {
596-
return nil, sdk.NewErrorFrom(sdk.ErrNotFound, "Cannot find plugin for integration model id %d, %v", projectIntegrationModelID, err)
599+
return nil, nil, sdk.NewErrorFrom(sdk.ErrNotFound, "Cannot find plugin for integration model id %d, %v", projectIntegration.Model.ID, err)
597600
}
598-
plugins = append(plugins, *plugin)
601+
plugins = append(plugins, *plg)
599602
}
600603

601604
var artifactManagerInteg *sdk.WorkflowProjectIntegration
@@ -605,9 +608,10 @@ func getIntegrationPlugins(db gorp.SqlExecutor, wr *sdk.WorkflowRun, nr *sdk.Wor
605608
}
606609
}
607610
if artifactManagerInteg != nil {
611+
mapConfig = append(mapConfig, artifactManagerInteg.Config)
608612
plgs, err := plugin.LoadAllByIntegrationModelID(db, artifactManagerInteg.ProjectIntegration.Model.ID)
609613
if err != nil {
610-
return nil, sdk.NewErrorFrom(sdk.ErrNotFound, "Cannot find plugin for integration model id %d, %v", projectIntegrationModelID, err)
614+
return nil, nil, sdk.NewErrorFrom(sdk.ErrNotFound, "Cannot find plugin for integration model id %d, %v", artifactManagerInteg.ProjectIntegration.Model.ID, err)
611615
}
612616
platform := artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactManagerConfigPlatform]
613617
for _, plg := range plgs {
@@ -617,7 +621,7 @@ func getIntegrationPlugins(db gorp.SqlExecutor, wr *sdk.WorkflowRun, nr *sdk.Wor
617621
}
618622
}
619623

620-
return plugins, nil
624+
return mapConfig, plugins, nil
621625
}
622626

623627
func getExecutablesGroups(wr *sdk.WorkflowRun, nr *sdk.WorkflowNodeRun) ([]sdk.Group, error) {
@@ -721,7 +725,7 @@ func syncStage(ctx context.Context, db gorp.SqlExecutor, store cache.Store, stag
721725

722726
// NodeBuildParametersFromRun return build parameters from previous workflow run
723727
func NodeBuildParametersFromRun(wr sdk.WorkflowRun, id int64) ([]sdk.Parameter, error) {
724-
params := []sdk.Parameter{}
728+
params := make([]sdk.Parameter, 0)
725729

726730
nodesRun, ok := wr.WorkflowNodeRuns[id]
727731
if !ok || len(nodesRun) == 0 {

engine/api/workflow/process_requirements.go

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,40 @@ import (
1616

1717
// processNodeJobRunRequirements returns requirements list interpolated, and true or false if at least
1818
// one requirement is of type "Service"
19-
func processNodeJobRunRequirements(ctx context.Context, db gorp.SqlExecutor, j sdk.Job, run *sdk.WorkflowNodeRun, execsGroupIDs []int64, integrationPlugins []sdk.GRPCPlugin) (sdk.RequirementList, bool, *sdk.Model, *sdk.MultiError) {
19+
func processNodeJobRunRequirements(ctx context.Context, db gorp.SqlExecutor, j sdk.Job, run *sdk.WorkflowNodeRun, execsGroupIDs []int64, integrationPlugins []sdk.GRPCPlugin, integrationsConfigs []sdk.IntegrationConfig) (sdk.RequirementList, bool, *sdk.Model, *sdk.MultiError) {
2020
var requirements sdk.RequirementList
2121
var errm sdk.MultiError
2222
var containsService bool
2323
var model string
2424
var tmp = sdk.ParametersToMap(run.BuildParameters)
2525

26-
pluginsRequirements := []sdk.Requirement{}
26+
pluginsRequirements := make([]sdk.Requirement, 0)
2727
for _, p := range integrationPlugins {
2828
for _, b := range p.Binaries {
2929
pluginsRequirements = append(pluginsRequirements, b.Requirements...)
3030
}
3131
}
3232

33-
// as some plugin binaries can have same requirement, we deduplicate them
34-
pluginsRequirements = sdk.RequirementListDeduplicate(pluginsRequirements)
35-
3633
// then add plugins requirement to the action requirement
3734
j.Action.Requirements = append(j.Action.Requirements, pluginsRequirements...)
3835

36+
integrationRequirements := make([]sdk.Requirement, 0)
37+
for _, c := range integrationsConfigs {
38+
for k, v := range c {
39+
if v.Type != sdk.IntegrationConfigTypeRegion {
40+
continue
41+
}
42+
integrationRequirements = append(integrationRequirements, sdk.Requirement{
43+
Name: k,
44+
Type: sdk.RegionRequirement,
45+
Value: v.Value,
46+
})
47+
}
48+
}
49+
j.Action.Requirements = append(j.Action.Requirements, integrationRequirements...)
50+
51+
j.Action.Requirements = sdk.RequirementListDeduplicate(j.Action.Requirements)
52+
3953
for _, v := range j.Action.Requirements {
4054
name, errName := interpolate.Do(v.Name, tmp)
4155
if errName != nil {
@@ -75,8 +89,8 @@ func processNodeJobRunRequirements(ctx context.Context, db gorp.SqlExecutor, j s
7589
for _, req := range requirements {
7690
if req.Type == sdk.BinaryRequirement {
7791
var hasCapa bool
78-
for _, cap := range wm.RegisteredCapabilities {
79-
if cap.Value == req.Value {
92+
for _, capa := range wm.RegisteredCapabilities {
93+
if capa.Value == req.Value {
8094
hasCapa = true
8195
break
8296
}
@@ -90,14 +104,27 @@ func processNodeJobRunRequirements(ctx context.Context, db gorp.SqlExecutor, j s
90104
}
91105
}
92106

107+
regionRequirementMap := make(map[string]struct{})
108+
for _, r := range requirements {
109+
if r.Type != sdk.RegionRequirement {
110+
continue
111+
}
112+
if _, has := regionRequirementMap[r.Value]; !has {
113+
regionRequirementMap[r.Value] = struct{}{}
114+
}
115+
}
116+
if len(regionRequirementMap) > 1 {
117+
errm.Append(sdk.NewErrorFrom(sdk.ErrInvalidJobRequirement, "Cannot have multiple region requirements %v", regionRequirementMap))
118+
}
119+
93120
if errm.IsEmpty() {
94121
return requirements, containsService, wm, nil
95122
}
96123
return requirements, containsService, wm, &errm
97124
}
98125

99126
func prepareRequirementsToNodeJobRunParameters(reqs sdk.RequirementList) []sdk.Parameter {
100-
params := []sdk.Parameter{}
127+
params := make([]sdk.Parameter, 0)
101128
for _, r := range reqs {
102129
if r.Type == sdk.ServiceRequirement {
103130
k := fmt.Sprintf("job.requirement.%s.%s", strings.ToLower(r.Type), strings.ToLower(r.Name))

sdk/integration.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,8 @@ const (
305305
IntegrationConfigTypePassword = "password"
306306
// IntegrationConfigTypeBoolean represents a password configuration value
307307
IntegrationConfigTypeBoolean = "boolean"
308+
// IntegrationConfigTypeRegion represents a region requirement
309+
IntegrationConfigTypeRegion = "region"
308310

309311
IntegrationVariablePrefixDeployment = "deployment"
310312
IntegrationVariablePrefixArtifactManager = "artifact_manager"
@@ -315,6 +317,7 @@ type IntegrationConfigValue struct {
315317
Value string `json:"value" yaml:"value"`
316318
Type string `json:"type" yaml:"type"`
317319
Description string `json:"description,omitempty" yaml:"description,omitempty"`
320+
Static bool `json:"static,omitempty" yaml:"static,omitempty"`
318321
}
319322

320323
type IntegrationConfigMap map[string]IntegrationConfig

ui/src/app/views/project/show/integrations/list/project.integration.list.html

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,18 @@
2929
</label>
3030
</div>
3131
<div class="ten wide field">
32-
<input type="checkbox" [(ngModel)]="p.config[k].value" *ngIf="p.config[k].type === 'boolean'" (keydown)="p.hasChanged = true" [readonly]="p.model.public"/>
33-
<input type="text" [(ngModel)]="p.config[k].value" *ngIf="p.config[k].type === 'string'" (keydown)="p.hasChanged = true" [readonly]="p.model.public">
34-
<ng-container *ngIf="p.config[k].type === 'text'">
35-
<codemirror [(ngModel)]="p.config[k].value" [config]="codeMirrorConfig" #codeMirror (keydown)="p.hasChanged = true"></codemirror>
32+
<ng-container *ngIf="p.config[k].static">
33+
<input type="text" *ngIf="p.config[k].type !== 'password'" [(ngModel)]="p.config[k].value" readonly>
34+
<input type="password" *ngIf="p.config[k].type === 'password'" [(ngModel)]="p.config[k].value" readonly>
35+
</ng-container>
36+
<ng-container *ngIf="!p.config[k].static">
37+
<input type="checkbox" [(ngModel)]="p.config[k].value" *ngIf="p.config[k].type === 'boolean'" (keydown)="p.hasChanged = true" [readonly]="p.model.public"/>
38+
<input type="text" [(ngModel)]="p.config[k].value" *ngIf="p.config[k].type === 'string'" (keydown)="p.hasChanged = true" [readonly]="p.model.public">
39+
<ng-container *ngIf="p.config[k].type === 'text'">
40+
<codemirror [(ngModel)]="p.config[k].value" [config]="codeMirrorConfig" #codeMirror (keydown)="p.hasChanged = true"></codemirror>
41+
</ng-container>
42+
<input type="password" name="integrationpassword" [(ngModel)]="p.config[k].value" *ngIf="p.config[k].type === 'password'" (keydown)="p.hasChanged = true">
3643
</ng-container>
37-
<input type="password" name="integrationpassword" [(ngModel)]="p.config[k].value" *ngIf="p.config[k].type === 'password'" (keydown)="p.hasChanged = true">
3844
</div>
3945
</div>
4046
</div>

0 commit comments

Comments
 (0)