Skip to content

Commit bae020b

Browse files
committed
wip
Signed-off-by: Jason Parraga <[email protected]>
1 parent a36d755 commit bae020b

40 files changed

+733
-581
lines changed

datacatalog/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
gorm.io/driver/sqlite v1.5.4
2222
gorm.io/gorm v1.25.4
2323
gorm.io/plugin/opentelemetry v0.1.4
24+
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
2425
)
2526

2627
require (
@@ -141,7 +142,6 @@ require (
141142
k8s.io/client-go v0.28.1 // indirect
142143
k8s.io/klog/v2 v2.100.1 // indirect
143144
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
144-
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
145145
sigs.k8s.io/controller-runtime v0.0.0-00010101000000-000000000000 // indirect
146146
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
147147
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect

datacatalog/pkg/manager/impl/artifact_manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func (m *artifactManager) findArtifact(ctx context.Context, datasetID *datacatal
184184
logger.Debugf(ctx, "Get artifact by id %v", key)
185185
artifactKey := transformers.ToArtifactKey(datasetID, key)
186186
var err error
187-
artifactModel, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)
187+
artifactModel, err = m.repo.ArtifactRepo().GetAndFilterExpired(ctx, artifactKey)
188188

189189
if err != nil {
190190
if errors.IsDoesNotExistError(err) {
@@ -273,7 +273,7 @@ func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalo
273273
}
274274

275275
// Perform the list with the dataset and listInput filters
276-
artifactModels, err := m.repo.ArtifactRepo().List(ctx, dataset.DatasetKey, listInput)
276+
artifactModels, err := m.repo.ArtifactRepo().ListAndFilterExpired(ctx, dataset.DatasetKey, listInput)
277277
if err != nil {
278278
logger.Errorf(ctx, "Unable to list Artifacts err: %v", err)
279279
m.systemMetrics.listFailureCounter.Inc(ctx)

datacatalog/pkg/manager/impl/artifact_manager_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,7 @@ func getExpectedArtifactModel(ctx context.Context, t *testing.T, datastore *stor
164164
Tags: []models.Tag{
165165
{TagKey: models.TagKey{TagName: "test-tag"}, DatasetUUID: expectedDataset.GetUUID(), ArtifactID: artifact.GetId()},
166166
},
167-
BaseModel: models.BaseModel{
168-
CreatedAt: getTestTimestamp(),
169-
},
167+
CreatedAt: getTestTimestamp(),
170168
}
171169
}
172170

@@ -484,7 +482,7 @@ func TestListArtifact(t *testing.T) {
484482
expectedArtifact := getTestArtifact()
485483
mockArtifactModel := getExpectedArtifactModel(ctx, t, datastore, expectedArtifact)
486484

487-
t.Run("List Artifact on invalid filter", func(t *testing.T) {
485+
t.Run("ListAndFilterExpired Artifact on invalid filter", func(t *testing.T) {
488486
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
489487
filter := &datacatalog.FilterExpression{
490488
Filters: []*datacatalog.SinglePropertyFilter{
@@ -507,7 +505,7 @@ func TestListArtifact(t *testing.T) {
507505
assert.Equal(t, codes.InvalidArgument, responseCode)
508506
})
509507

510-
t.Run("List Artifacts with Partition and Tag", func(t *testing.T) {
508+
t.Run("ListAndFilterExpired Artifacts with Partition and Tag", func(t *testing.T) {
511509
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
512510
filter := &datacatalog.FilterExpression{
513511
Filters: []*datacatalog.SinglePropertyFilter{
@@ -554,7 +552,7 @@ func TestListArtifact(t *testing.T) {
554552
mockArtifactModel,
555553
}
556554

557-
dcRepo.MockArtifactRepo.On("List", mock.Anything,
555+
dcRepo.MockArtifactRepo.On("ListAndFilterExpired", mock.Anything,
558556
mock.MatchedBy(func(dataset models.DatasetKey) bool {
559557
return dataset.Project == expectedDataset.GetId().GetProject() &&
560558
dataset.Domain == expectedDataset.GetId().GetDomain() &&
@@ -578,7 +576,7 @@ func TestListArtifact(t *testing.T) {
578576
assert.NotEmpty(t, artifactResponse)
579577
})
580578

581-
t.Run("List Artifacts with No Partition", func(t *testing.T) {
579+
t.Run("ListAndFilterExpired Artifacts with No Partition", func(t *testing.T) {
582580
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
583581
filter := &datacatalog.FilterExpression{Filters: nil}
584582

@@ -594,7 +592,7 @@ func TestListArtifact(t *testing.T) {
594592
mockArtifactModel,
595593
mockArtifactModel,
596594
}
597-
dcRepo.MockArtifactRepo.On("List", mock.Anything,
595+
dcRepo.MockArtifactRepo.On("ListAndFilterExpired", mock.Anything,
598596
mock.MatchedBy(func(dataset models.DatasetKey) bool {
599597
return dataset.Project == expectedDataset.GetId().GetProject() &&
600598
dataset.Domain == expectedDataset.GetId().GetDomain() &&

datacatalog/pkg/manager/impl/dataset_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (dm *datasetManager) GetDataset(ctx context.Context, request *datacatalog.G
133133
}, nil
134134
}
135135

136-
// List Datasets with optional filtering and pagination
136+
// ListAndFilterExpired Datasets with optional filtering and pagination
137137
func (dm *datasetManager) ListDatasets(ctx context.Context, request *datacatalog.ListDatasetsRequest) (*datacatalog.ListDatasetsResponse, error) {
138138
err := validators.ValidateListDatasetsRequest(request)
139139
if err != nil {

datacatalog/pkg/manager/impl/dataset_manager_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func TestListDatasets(t *testing.T) {
202202
expectedDataset := getTestDataset()
203203
dcRepo := getDataCatalogRepo()
204204

205-
t.Run("List Datasets on invalid filter", func(t *testing.T) {
205+
t.Run("ListAndFilterExpired Datasets on invalid filter", func(t *testing.T) {
206206
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())
207207
filter := &datacatalog.FilterExpression{
208208
Filters: []*datacatalog.SinglePropertyFilter{
@@ -225,7 +225,7 @@ func TestListDatasets(t *testing.T) {
225225
assert.Equal(t, codes.InvalidArgument, responseCode)
226226
})
227227

228-
t.Run("List Datasets with Project and Name", func(t *testing.T) {
228+
t.Run("ListAndFilterExpired Datasets with Project and Name", func(t *testing.T) {
229229
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())
230230
filter := &datacatalog.FilterExpression{
231231
Filters: []*datacatalog.SinglePropertyFilter{
@@ -253,7 +253,7 @@ func TestListDatasets(t *testing.T) {
253253
datasetModel, err := transformers.CreateDatasetModel(expectedDataset)
254254
assert.NoError(t, err)
255255

256-
dcRepo.MockDatasetRepo.On("List", mock.Anything,
256+
dcRepo.MockDatasetRepo.On("ListAndFilterExpired", mock.Anything,
257257
mock.MatchedBy(func(listInput models.ListModelsInput) bool {
258258
return len(listInput.ModelFilters) == 2 &&
259259
listInput.ModelFilters[0].Entity == common.Dataset &&
@@ -270,13 +270,13 @@ func TestListDatasets(t *testing.T) {
270270
assert.Len(t, datasetResponse.GetDatasets(), 1)
271271
})
272272

273-
t.Run("List Datasets with no filtering", func(t *testing.T) {
273+
t.Run("ListAndFilterExpired Datasets with no filtering", func(t *testing.T) {
274274
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())
275275

276276
datasetModel, err := transformers.CreateDatasetModel(expectedDataset)
277277
assert.NoError(t, err)
278278

279-
dcRepo.MockDatasetRepo.On("List", mock.Anything,
279+
dcRepo.MockDatasetRepo.On("ListAndFilterExpired", mock.Anything,
280280
mock.MatchedBy(func(listInput models.ListModelsInput) bool {
281281
return len(listInput.ModelFilters) == 0 &&
282282
listInput.Limit == 50 &&

datacatalog/pkg/manager/impl/tag_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (m *tagManager) AddTag(ctx context.Context, request *datacatalog.AddTagRequ
5555
}
5656

5757
artifactKey := transformers.ToArtifactKey(datasetID, request.GetTag().GetArtifactId())
58-
_, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)
58+
_, err = m.repo.ArtifactRepo().GetAndFilterExpired(ctx, artifactKey)
5959
if err != nil {
6060
m.systemMetrics.addTagFailureCounter.Inc(ctx)
6161
return nil, err

datacatalog/pkg/repositories/gormimpl/artifact.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"gorm.io/gorm"
77
"gorm.io/gorm/clause"
8+
"k8s.io/utils/clock"
89

910
"github.com/flyteorg/flyte/datacatalog/pkg/common"
1011
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/errors"
@@ -18,13 +19,20 @@ type artifactRepo struct {
1819
db *gorm.DB
1920
errorTransformer errors.ErrorTransformer
2021
repoMetrics gormMetrics
22+
clock clock.Clock
2123
}
2224

23-
func NewArtifactRepo(db *gorm.DB, errorTransformer errors.ErrorTransformer, scope promutils.Scope) interfaces.ArtifactRepo {
25+
func NewArtifactRepo(
26+
db *gorm.DB,
27+
errorTransformer errors.ErrorTransformer,
28+
scope promutils.Scope,
29+
clock clock.Clock,
30+
) interfaces.ArtifactRepo {
2431
return &artifactRepo{
2532
db: db,
2633
errorTransformer: errorTransformer,
2734
repoMetrics: newGormMetrics(scope),
35+
clock: clock,
2836
}
2937
}
3038

@@ -35,6 +43,19 @@ func (h *artifactRepo) Create(ctx context.Context, artifact models.Artifact) err
3543

3644
tx := h.db.WithContext(ctx).Begin()
3745

46+
existing, err := h.getForUpdate(ctx, artifact.ArtifactKey, tx)
47+
if err == nil {
48+
if existing.ExpiresAt != nil && h.clock.Now().After(*existing.ExpiresAt) {
49+
// If the previous artifact is expired, soft delete it before creating a new record
50+
tx.Delete(&existing)
51+
}
52+
} else {
53+
if err.Error() != gorm.ErrRecordNotFound.Error() {
54+
return h.errorTransformer.ToDataCatalogError(err)
55+
}
56+
// Not found is desirable
57+
}
58+
3859
tx = tx.Create(&artifact)
3960

4061
if tx.Error != nil {
@@ -50,17 +71,19 @@ func (h *artifactRepo) Create(ctx context.Context, artifact models.Artifact) err
5071
return nil
5172
}
5273

53-
func (h *artifactRepo) Get(ctx context.Context, in models.ArtifactKey) (models.Artifact, error) {
74+
func (h *artifactRepo) GetAndFilterExpired(ctx context.Context, in models.ArtifactKey) (models.Artifact, error) {
5475
timer := h.repoMetrics.GetDuration.Start(ctx)
5576
defer timer.Stop()
5677

5778
var artifact models.Artifact
58-
result := h.db.WithContext(ctx).Preload("ArtifactData").
79+
result := h.db.WithContext(ctx).
80+
Where("artifacts.expires_at is null or artifacts.expires_at < ?", h.clock.Now()).
81+
Preload("ArtifactData").
5982
Preload("Partitions", func(db *gorm.DB) *gorm.DB {
6083
return db.WithContext(ctx).Order("partitions.created_at ASC") // preserve the order in which the partitions were created
6184
}).
6285
Preload("Tags").
63-
Order("artifacts.created_at DESC").
86+
Order("artifacts.created_at DESC"). // Always pick the most recent
6487
First(
6588
&artifact,
6689
&models.Artifact{ArtifactKey: in},
@@ -85,7 +108,27 @@ func (h *artifactRepo) Get(ctx context.Context, in models.ArtifactKey) (models.A
85108
return artifact, nil
86109
}
87110

88-
func (h *artifactRepo) List(ctx context.Context, datasetKey models.DatasetKey, in models.ListModelsInput) ([]models.Artifact, error) {
111+
// Gets the artifact and locks the row
112+
func (h *artifactRepo) getForUpdate(ctx context.Context, in models.ArtifactKey, db *gorm.DB) (models.Artifact, error) {
113+
114+
var artifact models.Artifact
115+
result := db.
116+
Clauses(clause.Locking{Strength: "UPDATE"}).
117+
Preload("ArtifactData").
118+
Preload("Partitions", func(db *gorm.DB) *gorm.DB {
119+
return db.WithContext(ctx).Order("partitions.created_at ASC") // preserve the order in which the partitions were created
120+
}).
121+
Preload("Tags").
122+
Order("artifacts.created_at DESC"). // Always pick the most recent
123+
First(
124+
&artifact,
125+
&models.Artifact{ArtifactKey: in},
126+
)
127+
128+
return artifact, result.Error
129+
}
130+
131+
func (h *artifactRepo) ListAndFilterExpired(ctx context.Context, datasetKey models.DatasetKey, in models.ListModelsInput) ([]models.Artifact, error) {
89132
timer := h.repoMetrics.ListDuration.Start(ctx)
90133
defer timer.Stop()
91134

0 commit comments

Comments
 (0)