Skip to content

Commit d5bbd30

Browse files
committed
Add TTL support to cache
Signed-off-by: Jason Parraga <[email protected]>
1 parent a75cea0 commit d5bbd30

File tree

40 files changed

+1289
-807
lines changed

40 files changed

+1289
-807
lines changed

datacatalog/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ require (
1717
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
1818
go.opentelemetry.io/otel v1.24.0
1919
google.golang.org/grpc v1.62.1
20+
google.golang.org/protobuf v1.34.1
2021
gorm.io/driver/postgres v1.5.3
2122
gorm.io/driver/sqlite v1.5.4
2223
gorm.io/gorm v1.25.4
2324
gorm.io/plugin/opentelemetry v0.1.4
25+
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
2426
)
2527

2628
require (
@@ -131,7 +133,6 @@ require (
131133
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect
132134
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
133135
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
134-
google.golang.org/protobuf v1.34.1 // indirect
135136
gopkg.in/inf.v0 v0.9.1 // indirect
136137
gopkg.in/ini.v1 v1.66.4 // indirect
137138
gopkg.in/yaml.v2 v2.4.0 // indirect
@@ -141,7 +142,6 @@ require (
141142
k8s.io/client-go v0.28.1 // indirect
142143
k8s.io/klog/v2 v2.100.1 // indirect
143144
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
144-
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
145145
sigs.k8s.io/controller-runtime v0.0.0-00010101000000-000000000000 // indirect
146146
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
147147
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect

datacatalog/pkg/manager/impl/artifact_manager.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"google.golang.org/grpc/codes"
9+
"k8s.io/utils/clock"
910

1011
"github.com/flyteorg/flyte/datacatalog/pkg/common"
1112
"github.com/flyteorg/flyte/datacatalog/pkg/errors"
@@ -51,6 +52,7 @@ type artifactManager struct {
5152
repo repositories.RepositoryInterface
5253
artifactStore ArtifactDataStore
5354
systemMetrics artifactMetrics
55+
clock clock.Clock
5456
}
5557

5658
// Create an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an offloaded location.
@@ -111,6 +113,12 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatal
111113
return nil, err
112114
}
113115

116+
// Set expiration
117+
if request.GetArtifact().GetTtl() != nil {
118+
expiration := m.clock.Now().Add(request.GetArtifact().GetTtl().AsDuration())
119+
artifactModel.ExpiresAt = &expiration
120+
}
121+
114122
err = m.repo.ArtifactRepo().Create(ctx, artifactModel)
115123
if err != nil {
116124
if errors.IsAlreadyExistsError(err) {
@@ -184,7 +192,7 @@ func (m *artifactManager) findArtifact(ctx context.Context, datasetID *datacatal
184192
logger.Debugf(ctx, "Get artifact by id %v", key)
185193
artifactKey := transformers.ToArtifactKey(datasetID, key)
186194
var err error
187-
artifactModel, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)
195+
artifactModel, err = m.repo.ArtifactRepo().GetAndFilterExpired(ctx, artifactKey)
188196

189197
if err != nil {
190198
if errors.IsDoesNotExistError(err) {
@@ -273,7 +281,7 @@ func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalo
273281
}
274282

275283
// Perform the list with the dataset and listInput filters
276-
artifactModels, err := m.repo.ArtifactRepo().List(ctx, dataset.DatasetKey, listInput)
284+
artifactModels, err := m.repo.ArtifactRepo().ListAndFilterExpired(ctx, dataset.DatasetKey, listInput)
277285
if err != nil {
278286
logger.Errorf(ctx, "Unable to list Artifacts err: %v", err)
279287
m.systemMetrics.listFailureCounter.Inc(ctx)
@@ -379,6 +387,15 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal
379387

380388
// update artifact in DB, also replaces/upserts associated artifact data
381389
artifactModel.ArtifactData = artifactDataModels
390+
391+
// Reset TTL on the artifact since all the data is fresh.
392+
if request.GetTtl() != nil {
393+
expiration := m.clock.Now().Add(request.GetTtl().AsDuration())
394+
artifactModel.ExpiresAt = &expiration
395+
} else {
396+
artifactModel.ExpiresAt = nil
397+
}
398+
382399
logger.Debugf(ctx, "Updating ArtifactModel with %+v", artifactModel)
383400

384401
err = m.repo.ArtifactRepo().Update(ctx, artifactModel)
@@ -416,7 +433,7 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal
416433
}, nil
417434
}
418435

419-
func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope) interfaces.ArtifactManager {
436+
func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope, clock clock.Clock) interfaces.ArtifactManager {
420437
artifactMetrics := artifactMetrics{
421438
scope: artifactScope,
422439
createResponseTime: labeled.NewStopWatch("create_duration", "The duration of the create artifact calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric),
@@ -446,5 +463,6 @@ func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.Da
446463
repo: repo,
447464
artifactStore: NewArtifactDataStore(store, storagePrefix),
448465
systemMetrics: artifactMetrics,
466+
clock: clock,
449467
}
450468
}

0 commit comments

Comments
 (0)