Skip to content

Commit a0132f2

Browse files
committed
Add support for cache TTL
Signed-off-by: Jason Parraga <[email protected]>
1 parent a75cea0 commit a0132f2

File tree

39 files changed

+1279
-815
lines changed

39 files changed

+1279
-815
lines changed

datacatalog/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ require (
1717
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
1818
go.opentelemetry.io/otel v1.24.0
1919
google.golang.org/grpc v1.62.1
20+
google.golang.org/protobuf v1.34.1
2021
gorm.io/driver/postgres v1.5.3
2122
gorm.io/driver/sqlite v1.5.4
2223
gorm.io/gorm v1.25.4
2324
gorm.io/plugin/opentelemetry v0.1.4
25+
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
2426
)
2527

2628
require (
@@ -131,7 +133,6 @@ require (
131133
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect
132134
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
133135
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
134-
google.golang.org/protobuf v1.34.1 // indirect
135136
gopkg.in/inf.v0 v0.9.1 // indirect
136137
gopkg.in/ini.v1 v1.66.4 // indirect
137138
gopkg.in/yaml.v2 v2.4.0 // indirect
@@ -141,7 +142,6 @@ require (
141142
k8s.io/client-go v0.28.1 // indirect
142143
k8s.io/klog/v2 v2.100.1 // indirect
143144
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
144-
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
145145
sigs.k8s.io/controller-runtime v0.0.0-00010101000000-000000000000 // indirect
146146
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
147147
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect

datacatalog/pkg/manager/impl/artifact_manager.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"google.golang.org/grpc/codes"
9+
"k8s.io/utils/clock"
910

1011
"github.com/flyteorg/flyte/datacatalog/pkg/common"
1112
"github.com/flyteorg/flyte/datacatalog/pkg/errors"
@@ -51,6 +52,7 @@ type artifactManager struct {
5152
repo repositories.RepositoryInterface
5253
artifactStore ArtifactDataStore
5354
systemMetrics artifactMetrics
55+
clock clock.Clock
5456
}
5557

5658
// Create an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an offloaded location.
@@ -111,6 +113,12 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatal
111113
return nil, err
112114
}
113115

116+
// Set expiration
117+
if request.GetArtifact().GetTtl() != nil {
118+
expiration := m.clock.Now().Add(request.GetArtifact().GetTtl().AsDuration())
119+
artifactModel.ExpiresAt = &expiration
120+
}
121+
114122
err = m.repo.ArtifactRepo().Create(ctx, artifactModel)
115123
if err != nil {
116124
if errors.IsAlreadyExistsError(err) {
@@ -184,7 +192,7 @@ func (m *artifactManager) findArtifact(ctx context.Context, datasetID *datacatal
184192
logger.Debugf(ctx, "Get artifact by id %v", key)
185193
artifactKey := transformers.ToArtifactKey(datasetID, key)
186194
var err error
187-
artifactModel, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)
195+
artifactModel, err = m.repo.ArtifactRepo().GetAndFilterExpired(ctx, artifactKey)
188196

189197
if err != nil {
190198
if errors.IsDoesNotExistError(err) {
@@ -215,6 +223,11 @@ func (m *artifactManager) findArtifact(ctx context.Context, datasetID *datacatal
215223
artifactModel = tag.Artifact
216224
}
217225

226+
// If the artifact is expired consider this tag expired too
227+
if artifactModel.ExpiresAt != nil && artifactModel.ExpiresAt.Before(m.clock.Now()) {
228+
return models.Artifact{}, errors.NewDataCatalogErrorf(codes.NotFound, "entry not found")
229+
}
230+
218231
if len(artifactModel.ArtifactData) == 0 {
219232
return models.Artifact{}, errors.NewDataCatalogErrorf(codes.Internal, "artifact [%+v] with key %v does not have artifact data associated", artifactModel, key)
220233
}
@@ -273,7 +286,7 @@ func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalo
273286
}
274287

275288
// Perform the list with the dataset and listInput filters
276-
artifactModels, err := m.repo.ArtifactRepo().List(ctx, dataset.DatasetKey, listInput)
289+
artifactModels, err := m.repo.ArtifactRepo().ListAndFilterExpired(ctx, dataset.DatasetKey, listInput)
277290
if err != nil {
278291
logger.Errorf(ctx, "Unable to list Artifacts err: %v", err)
279292
m.systemMetrics.listFailureCounter.Inc(ctx)
@@ -379,6 +392,15 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal
379392

380393
// update artifact in DB, also replaces/upserts associated artifact data
381394
artifactModel.ArtifactData = artifactDataModels
395+
396+
// Reset TTL on the artifact since all the data is fresh.
397+
if request.GetTtl() != nil {
398+
expiration := m.clock.Now().Add(request.GetTtl().AsDuration())
399+
artifactModel.ExpiresAt = &expiration
400+
} else {
401+
artifactModel.ExpiresAt = nil
402+
}
403+
382404
logger.Debugf(ctx, "Updating ArtifactModel with %+v", artifactModel)
383405

384406
err = m.repo.ArtifactRepo().Update(ctx, artifactModel)
@@ -416,7 +438,7 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal
416438
}, nil
417439
}
418440

419-
func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope) interfaces.ArtifactManager {
441+
func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope, clock clock.Clock) interfaces.ArtifactManager {
420442
artifactMetrics := artifactMetrics{
421443
scope: artifactScope,
422444
createResponseTime: labeled.NewStopWatch("create_duration", "The duration of the create artifact calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric),
@@ -446,5 +468,6 @@ func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.Da
446468
repo: repo,
447469
artifactStore: NewArtifactDataStore(store, storagePrefix),
448470
systemMetrics: artifactMetrics,
471+
clock: clock,
449472
}
450473
}

0 commit comments

Comments
 (0)