Skip to content

Commit 822ed4a

Browse files
committed
wip
Signed-off-by: Jason Parraga <[email protected]>
1 parent a36d755 commit 822ed4a

File tree

37 files changed

+726
-574
lines changed

37 files changed

+726
-574
lines changed

datacatalog/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
gorm.io/driver/sqlite v1.5.4
2222
gorm.io/gorm v1.25.4
2323
gorm.io/plugin/opentelemetry v0.1.4
24+
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
2425
)
2526

2627
require (
@@ -141,7 +142,6 @@ require (
141142
k8s.io/client-go v0.28.1 // indirect
142143
k8s.io/klog/v2 v2.100.1 // indirect
143144
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
144-
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
145145
sigs.k8s.io/controller-runtime v0.0.0-00010101000000-000000000000 // indirect
146146
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
147147
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect

datacatalog/pkg/manager/impl/artifact_manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func (m *artifactManager) findArtifact(ctx context.Context, datasetID *datacatal
184184
logger.Debugf(ctx, "Get artifact by id %v", key)
185185
artifactKey := transformers.ToArtifactKey(datasetID, key)
186186
var err error
187-
artifactModel, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)
187+
artifactModel, err = m.repo.ArtifactRepo().GetAndFilterExpired(ctx, artifactKey)
188188

189189
if err != nil {
190190
if errors.IsDoesNotExistError(err) {
@@ -273,7 +273,7 @@ func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalo
273273
}
274274

275275
// Perform the list with the dataset and listInput filters
276-
artifactModels, err := m.repo.ArtifactRepo().List(ctx, dataset.DatasetKey, listInput)
276+
artifactModels, err := m.repo.ArtifactRepo().ListAndFilterExpired(ctx, dataset.DatasetKey, listInput)
277277
if err != nil {
278278
logger.Errorf(ctx, "Unable to list Artifacts err: %v", err)
279279
m.systemMetrics.listFailureCounter.Inc(ctx)

datacatalog/pkg/manager/impl/artifact_manager_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,7 @@ func getExpectedArtifactModel(ctx context.Context, t *testing.T, datastore *stor
164164
Tags: []models.Tag{
165165
{TagKey: models.TagKey{TagName: "test-tag"}, DatasetUUID: expectedDataset.GetUUID(), ArtifactID: artifact.GetId()},
166166
},
167-
BaseModel: models.BaseModel{
168-
CreatedAt: getTestTimestamp(),
169-
},
167+
CreatedAt: getTestTimestamp(),
170168
}
171169
}
172170

@@ -484,7 +482,7 @@ func TestListArtifact(t *testing.T) {
484482
expectedArtifact := getTestArtifact()
485483
mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact)
486484

487-
t.Run("List Artifact on invalid filter", func(t *testing.T) {
485+
t.Run("ListAndFilterExpired Artifact on invalid filter", func(t *testing.T) {
488486
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
489487
filter := &datacatalog.FilterExpression{
490488
Filters: []*datacatalog.SinglePropertyFilter{
@@ -507,7 +505,7 @@ func TestListArtifact(t *testing.T) {
507505
assert.Equal(t, codes.InvalidArgument, responseCode)
508506
})
509507

510-
t.Run("List Artifacts with Partition and Tag", func(t *testing.T) {
508+
t.Run("ListAndFilterExpired Artifacts with Partition and Tag", func(t *testing.T) {
511509
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
512510
filter := &datacatalog.FilterExpression{
513511
Filters: []*datacatalog.SinglePropertyFilter{
@@ -554,7 +552,7 @@ func TestListArtifact(t *testing.T) {
554552
mockArtifactModel,
555553
}
556554

557-
dcRepo.MockArtifactRepo.On("List", mock.Anything,
555+
dcRepo.MockArtifactRepo.On("ListAndFilterExpired", mock.Anything,
558556
mock.MatchedBy(func(dataset models.DatasetKey) bool {
559557
return dataset.Project == expectedDataset.GetId().GetProject() &&
560558
dataset.Domain == expectedDataset.GetId().GetDomain() &&
@@ -578,7 +576,7 @@ func TestListArtifact(t *testing.T) {
578576
assert.NotEmpty(t, artifactResponse)
579577
})
580578

581-
t.Run("List Artifacts with No Partition", func(t *testing.T) {
579+
t.Run("ListAndFilterExpired Artifacts with No Partition", func(t *testing.T) {
582580
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
583581
filter := &datacatalog.FilterExpression{Filters: nil}
584582

@@ -594,7 +592,7 @@ func TestListArtifact(t *testing.T) {
594592
mockArtifactModel,
595593
mockArtifactModel,
596594
}
597-
dcRepo.MockArtifactRepo.On("List", mock.Anything,
595+
dcRepo.MockArtifactRepo.On("ListAndFilterExpired", mock.Anything,
598596
mock.MatchedBy(func(dataset models.DatasetKey) bool {
599597
return dataset.Project == expectedDataset.GetId().GetProject() &&
600598
dataset.Domain == expectedDataset.GetId().GetDomain() &&

datacatalog/pkg/manager/impl/tag_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (m *tagManager) AddTag(ctx context.Context, request *datacatalog.AddTagRequ
5555
}
5656

5757
artifactKey := transformers.ToArtifactKey(datasetID, request.GetTag().GetArtifactId())
58-
_, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)
58+
_, err = m.repo.ArtifactRepo().GetAndFilterExpired(ctx, artifactKey)
5959
if err != nil {
6060
m.systemMetrics.addTagFailureCounter.Inc(ctx)
6161
return nil, err

datacatalog/pkg/repositories/gormimpl/artifact.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"gorm.io/gorm"
77
"gorm.io/gorm/clause"
8+
"k8s.io/utils/clock"
89

910
"github.com/flyteorg/flyte/datacatalog/pkg/common"
1011
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/errors"
@@ -18,13 +19,20 @@ type artifactRepo struct {
1819
db *gorm.DB
1920
errorTransformer errors.ErrorTransformer
2021
repoMetrics gormMetrics
22+
clock clock.Clock
2123
}
2224

23-
func NewArtifactRepo(db *gorm.DB, errorTransformer errors.ErrorTransformer, scope promutils.Scope) interfaces.ArtifactRepo {
25+
func NewArtifactRepo(
26+
db *gorm.DB,
27+
errorTransformer errors.ErrorTransformer,
28+
scope promutils.Scope,
29+
clock clock.Clock,
30+
) interfaces.ArtifactRepo {
2431
return &artifactRepo{
2532
db: db,
2633
errorTransformer: errorTransformer,
2734
repoMetrics: newGormMetrics(scope),
35+
clock: clock,
2836
}
2937
}
3038

@@ -35,6 +43,19 @@ func (h *artifactRepo) Create(ctx context.Context, artifact models.Artifact) err
3543

3644
tx := h.db.WithContext(ctx).Begin()
3745

46+
existing, err := h.getForUpdate(ctx, artifact.ArtifactKey, tx)
47+
if err == nil {
48+
if existing.ExpiresAt != nil && h.clock.Now().After(*existing.ExpiresAt) {
49+
// If the previous artifact is expired, soft delete it before creating a new record
50+
tx.Delete(&existing)
51+
}
52+
} else {
53+
if err.Error() != gorm.ErrRecordNotFound.Error() {
54+
return h.errorTransformer.ToDataCatalogError(err)
55+
}
56+
// Not found is desirable
57+
}
58+
3859
tx = tx.Create(&artifact)
3960

4061
if tx.Error != nil {
@@ -50,17 +71,19 @@ func (h *artifactRepo) Create(ctx context.Context, artifact models.Artifact) err
5071
return nil
5172
}
5273

53-
func (h *artifactRepo) Get(ctx context.Context, in models.ArtifactKey) (models.Artifact, error) {
74+
func (h *artifactRepo) GetAndFilterExpired(ctx context.Context, in models.ArtifactKey) (models.Artifact, error) {
5475
timer := h.repoMetrics.GetDuration.Start(ctx)
5576
defer timer.Stop()
5677

5778
var artifact models.Artifact
58-
result := h.db.WithContext(ctx).Preload("ArtifactData").
79+
result := h.db.WithContext(ctx).
80+
Where("artifacts.expires_at is null or artifacts.expires_at < ?", h.clock.Now()).
81+
Preload("ArtifactData").
5982
Preload("Partitions", func(db *gorm.DB) *gorm.DB {
6083
return db.WithContext(ctx).Order("partitions.created_at ASC") // preserve the order in which the partitions were created
6184
}).
6285
Preload("Tags").
63-
Order("artifacts.created_at DESC").
86+
Order("artifacts.created_at DESC"). // Always pick the most recent
6487
First(
6588
&artifact,
6689
&models.Artifact{ArtifactKey: in},
@@ -85,7 +108,27 @@ func (h *artifactRepo) Get(ctx context.Context, in models.ArtifactKey) (models.A
85108
return artifact, nil
86109
}
87110

88-
func (h *artifactRepo) List(ctx context.Context, datasetKey models.DatasetKey, in models.ListModelsInput) ([]models.Artifact, error) {
111+
// Gets the artifact and locks the row
112+
func (h *artifactRepo) getForUpdate(ctx context.Context, in models.ArtifactKey, db *gorm.DB) (models.Artifact, error) {
113+
114+
var artifact models.Artifact
115+
result := db.
116+
Clauses(clause.Locking{Strength: "UPDATE"}).
117+
Preload("ArtifactData").
118+
Preload("Partitions", func(db *gorm.DB) *gorm.DB {
119+
return db.WithContext(ctx).Order("partitions.created_at ASC") // preserve the order in which the partitions were created
120+
}).
121+
Preload("Tags").
122+
Order("artifacts.created_at DESC"). // Always pick the most recent
123+
First(
124+
&artifact,
125+
&models.Artifact{ArtifactKey: in},
126+
)
127+
128+
return artifact, result.Error
129+
}
130+
131+
func (h *artifactRepo) ListAndFilterExpired(ctx context.Context, datasetKey models.DatasetKey, in models.ListModelsInput) ([]models.Artifact, error) {
89132
timer := h.repoMetrics.ListDuration.Start(ctx)
90133
defer timer.Stop()
91134

0 commit comments

Comments
 (0)