Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datacatalog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
go.opentelemetry.io/otel v1.24.0
google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.34.1
gorm.io/driver/postgres v1.5.3
gorm.io/driver/sqlite v1.5.4
gorm.io/gorm v1.25.4
gorm.io/plugin/opentelemetry v0.1.4
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
)

require (
Expand Down Expand Up @@ -131,7 +133,6 @@ require (
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand All @@ -141,7 +142,6 @@ require (
k8s.io/client-go v0.28.1 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
sigs.k8s.io/controller-runtime v0.0.0-00010101000000-000000000000 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
Expand Down
29 changes: 26 additions & 3 deletions datacatalog/pkg/manager/impl/artifact_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"time"

"google.golang.org/grpc/codes"
"k8s.io/utils/clock"

"github.com/flyteorg/flyte/datacatalog/pkg/common"
"github.com/flyteorg/flyte/datacatalog/pkg/errors"
Expand Down Expand Up @@ -51,6 +52,7 @@
repo repositories.RepositoryInterface
artifactStore ArtifactDataStore
systemMetrics artifactMetrics
clock clock.Clock
}

// Create an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an offloaded location.
Expand Down Expand Up @@ -111,6 +113,12 @@
return nil, err
}

// Set expiration
if request.GetArtifact().GetTtl() != nil {
expiration := m.clock.Now().Add(request.GetArtifact().GetTtl().AsDuration())
artifactModel.ExpiresAt = &expiration
}

err = m.repo.ArtifactRepo().Create(ctx, artifactModel)
if err != nil {
if errors.IsAlreadyExistsError(err) {
Expand Down Expand Up @@ -184,7 +192,7 @@
logger.Debugf(ctx, "Get artifact by id %v", key)
artifactKey := transformers.ToArtifactKey(datasetID, key)
var err error
artifactModel, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)
artifactModel, err = m.repo.ArtifactRepo().GetAndFilterExpired(ctx, artifactKey)

if err != nil {
if errors.IsDoesNotExistError(err) {
Expand Down Expand Up @@ -215,6 +223,11 @@
artifactModel = tag.Artifact
}

// If the artifact is expired consider this tag expired too
if artifactModel.ExpiresAt != nil && artifactModel.ExpiresAt.Before(m.clock.Now()) {
return models.Artifact{}, errors.NewDataCatalogErrorf(codes.NotFound, "entry not found")
}

Check warning on line 229 in datacatalog/pkg/manager/impl/artifact_manager.go

View check run for this annotation

Codecov / codecov/patch

datacatalog/pkg/manager/impl/artifact_manager.go#L228-L229

Added lines #L228 - L229 were not covered by tests

if len(artifactModel.ArtifactData) == 0 {
return models.Artifact{}, errors.NewDataCatalogErrorf(codes.Internal, "artifact [%+v] with key %v does not have artifact data associated", artifactModel, key)
}
Expand Down Expand Up @@ -273,7 +286,7 @@
}

// Perform the list with the dataset and listInput filters
artifactModels, err := m.repo.ArtifactRepo().List(ctx, dataset.DatasetKey, listInput)
artifactModels, err := m.repo.ArtifactRepo().ListAndFilterExpired(ctx, dataset.DatasetKey, listInput)
if err != nil {
logger.Errorf(ctx, "Unable to list Artifacts err: %v", err)
m.systemMetrics.listFailureCounter.Inc(ctx)
Expand Down Expand Up @@ -379,6 +392,15 @@

// update artifact in DB, also replaces/upserts associated artifact data
artifactModel.ArtifactData = artifactDataModels

// Reset TTL on the artifact since all the data is fresh.
if request.GetTtl() != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be problematic depending on how the object storage engine implements the lifecycle rule for the object. It isn't required that the TTL be reset if a /PUT request is issued for a given key. For example, S3 triggers off of the creationTimestamp.

expiration := m.clock.Now().Add(request.GetTtl().AsDuration())
artifactModel.ExpiresAt = &expiration
} else {
artifactModel.ExpiresAt = nil
}

logger.Debugf(ctx, "Updating ArtifactModel with %+v", artifactModel)

err = m.repo.ArtifactRepo().Update(ctx, artifactModel)
Expand Down Expand Up @@ -416,7 +438,7 @@
}, nil
}

func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope) interfaces.ArtifactManager {
func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope, clock clock.Clock) interfaces.ArtifactManager {
artifactMetrics := artifactMetrics{
scope: artifactScope,
createResponseTime: labeled.NewStopWatch("create_duration", "The duration of the create artifact calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric),
Expand Down Expand Up @@ -446,5 +468,6 @@
repo: repo,
artifactStore: NewArtifactDataStore(store, storagePrefix),
systemMetrics: artifactMetrics,
clock: clock,
}
}
Loading
Loading