Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/sql/repository/CiArtifactRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,9 @@ func (impl CiArtifactRepositoryImpl) GetArtifactsByDataSourceAndComponentId(data

func (impl CiArtifactRepositoryImpl) FindCiArtifactByImagePaths(images []string) ([]CiArtifact, error) {
var ciArtifacts []CiArtifact
if len(images) == 0 {
return nil, nil
}
err := impl.dbConnection.
Model(&ciArtifacts).
Where(" image in (?) ", pg.In(images)).
Expand Down
14 changes: 14 additions & 0 deletions internal/sql/repository/CustomTagRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ImageTagRepository interface {
DeactivateImagePathReservationByImagePaths(tx *pg.Tx, imagePaths []string) error
DeactivateImagePathReservationByImagePathReservationIds(tx *pg.Tx, imagePathReservationIds []int) error
DisableCustomTag(entityKey int, entityValue string) error
GetImagePathsByIds(ids []int) ([]*ImagePathReservation, error)
}

type ImageTagRepositoryImpl struct {
Expand Down Expand Up @@ -139,6 +140,9 @@ func (impl *ImageTagRepositoryImpl) InsertImagePath(tx *pg.Tx, reservation *Imag
}

func (impl *ImageTagRepositoryImpl) DeactivateImagePathReservationByImagePaths(tx *pg.Tx, imagePaths []string) error {
if len(imagePaths) == 0 {
return nil
}
query := `UPDATE image_path_reservation set active=false where image_path in (?)`
_, err := tx.Exec(query, pg.In(imagePaths))
if err != nil && err != pg.ErrNoRows {
Expand All @@ -161,3 +165,13 @@ func (impl *ImageTagRepositoryImpl) DisableCustomTag(entityKey int, entityValue
_, err := impl.dbConnection.Exec(query, entityKey, entityValue)
return err
}
func (impl *ImageTagRepositoryImpl) GetImagePathsByIds(ids []int) ([]*ImagePathReservation, error) {
var imagePaths []*ImagePathReservation
if len(ids) == 0 {
return imagePaths, nil
}
err := impl.dbConnection.Model(&imagePaths).
Where("id in (?) ", pg.In(ids)).
Where("active = ?", true).Select()
return imagePaths, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (impl *TriggerServiceImpl) TriggerPostStage(request bean.TriggerRequest) er
cdStageWorkflowRequest.Type = bean3.CD_WORKFLOW_PIPELINE_TYPE
// handling plugin specific logic

pluginImagePathReservationIds, err := impl.SetCopyContainerImagePluginDataInWorkflowRequest(cdStageWorkflowRequest, pipeline.Id, types.POST, cdWf.CiArtifact)
pluginImagePathReservationIds, err := impl.setCopyContainerImagePluginDataAndReserveImages(cdStageWorkflowRequest, pipeline.Id, types.POST, cdWf.CiArtifact)
if err != nil {
runner.Status = pipelineConfig.WorkflowFailed
runner.Message = err.Error()
Expand Down
195 changes: 111 additions & 84 deletions pkg/deployment/trigger/devtronApps/PreStageTriggerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
repository3 "github.com/devtron-labs/devtron/pkg/pipeline/history/repository"
"github.com/devtron-labs/devtron/pkg/pipeline/types"
"github.com/devtron-labs/devtron/pkg/plugin"
bean3 "github.com/devtron-labs/devtron/pkg/plugin/bean"
"github.com/devtron-labs/devtron/pkg/resourceQualifiers"
"github.com/devtron-labs/devtron/pkg/sql"
util3 "github.com/devtron-labs/devtron/pkg/util"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (impl *TriggerServiceImpl) TriggerPreStage(request bean.TriggerRequest) err
}
cdStageWorkflowRequest.StageType = types.PRE
// handling copyContainerImage plugin specific logic
imagePathReservationIds, err := impl.SetCopyContainerImagePluginDataInWorkflowRequest(cdStageWorkflowRequest, pipeline.Id, types.PRE, artifact)
imagePathReservationIds, err := impl.setCopyContainerImagePluginDataAndReserveImages(cdStageWorkflowRequest, pipeline.Id, types.PRE, artifact)
if err != nil {
runner.Status = pipelineConfig.WorkflowFailed
runner.Message = err.Error()
Expand Down Expand Up @@ -236,95 +237,121 @@ func (impl *TriggerServiceImpl) checkVulnerabilityStatusAndFailWfIfNeeded(ctx co
return nil
}

func (impl *TriggerServiceImpl) SetCopyContainerImagePluginDataInWorkflowRequest(cdStageWorkflowRequest *types.WorkflowRequest, pipelineId int, pipelineStage string, artifact *repository.CiArtifact) ([]int, error) {
copyContainerImagePluginId, err := impl.globalPluginService.GetRefPluginIdByRefPluginName(pipeline.COPY_CONTAINER_IMAGE)
var imagePathReservationIds []int
// setCopyContainerImagePluginDataAndReserveImages sets required fields in cdStageWorkflowRequest and reserve images generated by plugin
func (impl *TriggerServiceImpl) setCopyContainerImagePluginDataAndReserveImages(cdStageWorkflowRequest *types.WorkflowRequest, pipelineId int, pipelineStage string, artifact *repository.CiArtifact) ([]int, error) {

copyContainerImagePluginDetail, err := impl.globalPluginService.GetRefPluginIdByRefPluginName(pipeline.COPY_CONTAINER_IMAGE)
if err != nil && err != pg.ErrNoRows {
impl.logger.Errorw("error in getting copyContainerImage plugin id", "err", err)
return imagePathReservationIds, err
return nil, err
}
for _, step := range cdStageWorkflowRequest.PrePostDeploySteps {
if copyContainerImagePluginId != 0 && step.RefPluginId == copyContainerImagePluginId {
var pipelineStageEntityType int
if pipelineStage == types.PRE {
pipelineStageEntityType = pipelineConfigBean.EntityTypePreCD
} else {
pipelineStageEntityType = pipelineConfigBean.EntityTypePostCD
}
customTagId := -1
var DockerImageTag string

customTag, err := impl.customTagService.GetActiveCustomTagByEntityKeyAndValue(pipelineStageEntityType, strconv.Itoa(pipelineId))
if err != nil && err != pg.ErrNoRows {
impl.logger.Errorw("error in fetching custom tag data", "err", err)
return imagePathReservationIds, err
}
pluginIdToVersionMap := make(map[int]string)
for _, p := range copyContainerImagePluginDetail {
pluginIdToVersionMap[p.Id] = p.Version
}

if !customTag.Enabled {
// case when custom tag is not configured - source image tag will be taken as docker image tag
pluginTriggerImageSplit := strings.Split(artifact.Image, ":")
DockerImageTag = pluginTriggerImageSplit[len(pluginTriggerImageSplit)-1]
} else {
// for copyContainerImage plugin parse destination images and save its data in image path reservation table
customTagDbObject, customDockerImageTag, err := impl.customTagService.GetCustomTag(pipelineStageEntityType, strconv.Itoa(pipelineId))
if err != nil && err != pg.ErrNoRows {
impl.logger.Errorw("error in fetching custom tag by entity key and value for CD", "err", err)
return imagePathReservationIds, err
}
if customTagDbObject != nil && customTagDbObject.Id > 0 {
customTagId = customTagDbObject.Id
}
DockerImageTag = customDockerImageTag
}
dockerImageTag, customTagId, err := impl.getDockerTagAndCustomTagIdForPlugin(pipelineStage, pipelineId, artifact)
if err != nil {
impl.logger.Errorw("error in getting docker tag", "err", err)
return nil, err
}

var sourceDockerRegistryId string
if artifact.DataSource == repository.PRE_CD || artifact.DataSource == repository.POST_CD || artifact.DataSource == repository.POST_CI {
if artifact.CredentialsSourceType == repository.GLOBAL_CONTAINER_REGISTRY {
sourceDockerRegistryId = artifact.CredentialSourceValue
}
} else {
sourceDockerRegistryId = cdStageWorkflowRequest.DockerRegistryId
}
registryDestinationImageMap, registryCredentialMap, err := impl.pluginInputVariableParser.HandleCopyContainerImagePluginInputVariables(step.InputVars, DockerImageTag, cdStageWorkflowRequest.CiArtifactDTO.Image, sourceDockerRegistryId)
var sourceDockerRegistryId string
if artifact.DataSource == repository.PRE_CD || artifact.DataSource == repository.POST_CD || artifact.DataSource == repository.POST_CI {
if artifact.CredentialsSourceType == repository.GLOBAL_CONTAINER_REGISTRY {
sourceDockerRegistryId = artifact.CredentialSourceValue
}
} else {
sourceDockerRegistryId = cdStageWorkflowRequest.DockerRegistryId
}

registryCredentialMap := make(map[string]bean3.RegistryCredentials)
var allDestinationImages []string //saving all images to be reserved in this array

for _, step := range cdStageWorkflowRequest.PrePostDeploySteps {
if version, ok := pluginIdToVersionMap[step.RefPluginId]; ok {
registryDestinationImageMap, credentialMap, err := impl.pluginInputVariableParser.HandleCopyContainerImagePluginInputVariables(step.InputVars, dockerImageTag, cdStageWorkflowRequest.CiArtifactDTO.Image, sourceDockerRegistryId)
if err != nil {
impl.logger.Errorw("error in parsing copyContainerImage input variable", "err", err)
return imagePathReservationIds, err
}
var destinationImages []string
for _, images := range registryDestinationImageMap {
for _, image := range images {
destinationImages = append(destinationImages, image)
}
}
// fetch already saved artifacts to check if they are already present
savedCIArtifacts, err := impl.ciArtifactRepository.FindCiArtifactByImagePaths(destinationImages)
if err != nil {
impl.logger.Errorw("error in fetching artifacts by image path", "err", err)
return imagePathReservationIds, err
return nil, err
}
if len(savedCIArtifacts) > 0 {
// if already present in ci artifact, return "image path already in use error"
return imagePathReservationIds, pipelineConfigBean.ErrImagePathInUse
if version == pipeline.COPY_CONTAINER_IMAGE_VERSION_V1 {
// this is needed in ci runner only for v1
cdStageWorkflowRequest.RegistryDestinationImageMap = registryDestinationImageMap
}
imagePathReservationIds, err = impl.ReserveImagesGeneratedAtPlugin(customTagId, registryDestinationImageMap)
if err != nil {
impl.logger.Errorw("error in reserving image", "err", err)
return imagePathReservationIds, err
for _, images := range registryDestinationImageMap {
allDestinationImages = append(allDestinationImages, images...)
}
cdStageWorkflowRequest.RegistryDestinationImageMap = registryDestinationImageMap
cdStageWorkflowRequest.RegistryCredentialMap = registryCredentialMap
var pluginArtifactStage string
if pipelineStage == types.PRE {
pluginArtifactStage = repository.PRE_CD
} else {
pluginArtifactStage = repository.POST_CD
for k, v := range credentialMap {
registryCredentialMap[k] = v
}
cdStageWorkflowRequest.PluginArtifactStage = pluginArtifactStage
}
}

// set data in cdStageWorkflowRequest needed for copy container image plugin

cdStageWorkflowRequest.RegistryCredentialMap = registryCredentialMap
cdStageWorkflowRequest.DockerImageTag = dockerImageTag
if pipelineStage == types.PRE {
cdStageWorkflowRequest.PluginArtifactStage = repository.PRE_CD
} else {
cdStageWorkflowRequest.PluginArtifactStage = repository.POST_CD
}

// fetch already saved artifacts to check if they are already present

savedCIArtifacts, err := impl.ciArtifactRepository.FindCiArtifactByImagePaths(allDestinationImages)
if err != nil {
impl.logger.Errorw("error in fetching artifacts by image path", "err", err)
return nil, err
}
if len(savedCIArtifacts) > 0 {
// if already present in ci artifact, return "image path already in use error"
return nil, pipelineConfigBean.ErrImagePathInUse
}
// reserve all images where data will be
imagePathReservationIds, err := impl.ReserveImagesGeneratedAtPlugin(customTagId, allDestinationImages)
if err != nil {
impl.logger.Errorw("error in reserving image", "err", err)
return imagePathReservationIds, err
}
return imagePathReservationIds, nil
}

func (impl *TriggerServiceImpl) getDockerTagAndCustomTagIdForPlugin(pipelineStage string, pipelineId int, artifact *repository.CiArtifact) (string, int, error) {
var pipelineStageEntityType int
if pipelineStage == types.PRE {
pipelineStageEntityType = pipelineConfigBean.EntityTypePreCD
} else {
pipelineStageEntityType = pipelineConfigBean.EntityTypePostCD
}
customTag, err := impl.customTagService.GetActiveCustomTagByEntityKeyAndValue(pipelineStageEntityType, strconv.Itoa(pipelineId))
if err != nil && err != pg.ErrNoRows {
impl.logger.Errorw("error in fetching custom tag data", "err", err)
return "", 0, err
}
var DockerImageTag string
customTagId := -1 // if customTag is not configured id=-1 will be saved in image_path_reservation table for image reservation
if !customTag.Enabled {
// case when custom tag is not configured - source image tag will be taken as docker image tag
pluginTriggerImageSplit := strings.Split(artifact.Image, ":")
DockerImageTag = pluginTriggerImageSplit[len(pluginTriggerImageSplit)-1]
} else {
// for copyContainerImage plugin parse destination images and save its data in image path reservation table
customTagDbObject, customDockerImageTag, err := impl.customTagService.GetCustomTag(pipelineStageEntityType, strconv.Itoa(pipelineId))
if err != nil && err != pg.ErrNoRows {
impl.logger.Errorw("error in fetching custom tag by entity key and value for CD", "err", err)
return "", 0, err
}
if customTagDbObject != nil && customTagDbObject.Id > 0 {
customTagId = customTagDbObject.Id
}
DockerImageTag = customDockerImageTag
}
return DockerImageTag, customTagId, nil
}

func (impl *TriggerServiceImpl) buildWFRequest(runner *pipelineConfig.CdWorkflowRunner, cdWf *pipelineConfig.CdWorkflow, cdPipeline *pipelineConfig.Pipeline, envDeploymentConfig *bean5.DeploymentConfig, triggeredBy int32) (*types.WorkflowRequest, error) {
if cdPipeline.App.Id == 0 {
appModel, err := impl.appRepository.FindById(cdPipeline.AppId)
Expand Down Expand Up @@ -843,20 +870,20 @@ func (impl *TriggerServiceImpl) getSourceCiPipelineForArtifact(ciPipeline pipeli
return sourceCiPipeline, nil
}

func (impl *TriggerServiceImpl) ReserveImagesGeneratedAtPlugin(customTagId int, registryImageMap map[string][]string) ([]int, error) {
func (impl *TriggerServiceImpl) ReserveImagesGeneratedAtPlugin(customTagId int, destinationImages []string) ([]int, error) {
var imagePathReservationIds []int
for _, images := range registryImageMap {
for _, image := range images {
imagePathReservationData, err := impl.customTagService.ReserveImagePath(image, customTagId)
if err != nil {
impl.logger.Errorw("Error in marking custom tag reserved", "err", err)
return imagePathReservationIds, err
}
if imagePathReservationData != nil {
imagePathReservationIds = append(imagePathReservationIds, imagePathReservationData.Id)
}

for _, image := range destinationImages {
imagePathReservationData, err := impl.customTagService.ReserveImagePath(image, customTagId)
if err != nil {
impl.logger.Errorw("Error in marking custom tag reserved", "err", err)
return imagePathReservationIds, err
}
if imagePathReservationData != nil {
imagePathReservationIds = append(imagePathReservationIds, imagePathReservationData.Id)
}
}

return imagePathReservationIds, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/eventProcessor/in/WorkflowEventProcessorService.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDStageCompleteEvent() error {
}
} else if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_POST {
impl.logger.Debugw("received post stage success event for workflow runner ", "wfId", strconv.Itoa(wfr.Id))
err = impl.workflowDagExecutor.HandlePostStageSuccessEvent(triggerContext, wfr.CdWorkflowId, cdStageCompleteEvent.CdPipelineId, cdStageCompleteEvent.TriggeredBy, cdStageCompleteEvent.PluginRegistryArtifactDetails)
err = impl.workflowDagExecutor.HandlePostStageSuccessEvent(triggerContext, wfr, wfr.CdWorkflowId, cdStageCompleteEvent.CdPipelineId, cdStageCompleteEvent.TriggeredBy, cdStageCompleteEvent.PluginRegistryArtifactDetails)
if err != nil {
impl.logger.Errorw("deployment success event error", "err", err)
return
Expand Down
Loading