Skip to content

Commit 6804f43

Browse files
authored
Merge pull request #3357 from weaveworks/3044-add-paging-to-query-service-store-iterator
Adding paging method to query service store iterator
2 parents 3820da8 + 4c6014a commit 6804f43

File tree

6 files changed

+400
-6
lines changed

6 files changed

+400
-6
lines changed

pkg/query/internal/sqliterator/sqliterator.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,38 @@ func (i *iterator) All() ([]models.Object, error) {
6565
return objects, nil
6666
}
6767

68+
func (i *iterator) Page(count int, offset int) ([]models.Object, error) {
69+
var objects []models.Object
70+
71+
index := -1
72+
73+
for i.rows.Next() {
74+
index++
75+
76+
if index < offset {
77+
continue
78+
}
79+
80+
var object models.Object
81+
82+
if err := i.result.ScanRows(i.rows, &object); err != nil {
83+
return nil, fmt.Errorf("failed to scan rows: %w", err)
84+
}
85+
86+
objects = append(objects, object)
87+
88+
if index >= offset+count-1 {
89+
break
90+
}
91+
}
92+
93+
if err := i.rows.Err(); err != nil {
94+
return nil, fmt.Errorf("failed to get rows: %w", err)
95+
}
96+
97+
return objects, nil
98+
}
99+
68100
func (i *iterator) Close() error {
69101
return i.rows.Close()
70102
}

pkg/query/store/indexer.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,10 +328,6 @@ func (i *indexerIterator) Row() (models.Object, error) {
328328
return i.s.GetObjectByID(context.Background(), id)
329329
}
330330

331-
func (i *indexerIterator) Close() error {
332-
return nil
333-
}
334-
335331
func (i *indexerIterator) All() ([]models.Object, error) {
336332
i.mu.Lock()
337333
defer i.mu.Unlock()
@@ -343,10 +339,41 @@ func (i *indexerIterator) All() ([]models.Object, error) {
343339
}
344340

345341
iter, err := i.s.GetObjects(context.Background(), ids, i.opts)
342+
if err != nil {
343+
return nil, fmt.Errorf("failed to get objects: %w", err)
344+
}
345+
346+
return iter.All()
347+
}
348+
349+
func (i *indexerIterator) Page(count int, offset int) ([]models.Object, error) {
350+
i.mu.Lock()
351+
defer i.mu.Unlock()
346352

353+
ids := []string{}
354+
355+
numHits := i.result.Hits.Len()
356+
upper := offset + count
357+
if upper > numHits {
358+
upper = numHits
359+
}
360+
361+
for index := offset; index < upper; index++ {
362+
ids = append(ids, i.result.Hits[index].ID)
363+
}
364+
365+
if len(ids) == 0 {
366+
return []models.Object{}, nil
367+
}
368+
369+
iter, err := i.s.GetObjects(context.Background(), ids, i.opts)
347370
if err != nil {
348371
return nil, fmt.Errorf("failed to get objects: %w", err)
349372
}
350373

351374
return iter.All()
352375
}
376+
377+
func (i *indexerIterator) Close() error {
378+
return nil
379+
}

pkg/query/store/indexer_test.go

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ func TestIndexer_Metrics(t *testing.T) {
126126
}
127127
assertMetrics(g, metricsUrl, wantMetrics)
128128
})
129-
130129
}
131130

132131
func TestIndexer_RemoveByQuery(t *testing.T) {
@@ -213,6 +212,161 @@ func TestIndexer_RemoveByQuery(t *testing.T) {
213212
}
214213
}
215214

215+
func TestIndexer_RemoveByQueryWithPagination(t *testing.T) {
216+
g := NewWithT(t)
217+
tests := []struct {
218+
name string
219+
query string
220+
objects []models.Object
221+
expected []string
222+
}{
223+
{
224+
name: "removes by cluster",
225+
query: "+cluster:management",
226+
objects: []models.Object{
227+
{
228+
Cluster: "management",
229+
Kind: "Namespace",
230+
Name: "name-1",
231+
APIGroup: "anyGroup",
232+
APIVersion: "anyVersion",
233+
Category: "automation",
234+
Namespace: "anyNamespace",
235+
},
236+
{
237+
Cluster: "othercluster",
238+
Kind: "Namespace",
239+
Name: "name-2",
240+
APIGroup: "anyGroup",
241+
APIVersion: "anyVersion",
242+
Category: "automation",
243+
Namespace: "anyNamespace",
244+
},
245+
{
246+
Cluster: "management",
247+
Kind: "Namespace",
248+
Name: "name-3",
249+
APIGroup: "anyGroup",
250+
APIVersion: "anyVersion",
251+
Category: "automation",
252+
Namespace: "anyNamespace",
253+
},
254+
{
255+
Cluster: "management",
256+
Kind: "Namespace",
257+
Name: "name-4",
258+
APIGroup: "anyGroup",
259+
APIVersion: "anyVersion",
260+
Category: "automation",
261+
Namespace: "anyNamespace",
262+
},
263+
{
264+
Cluster: "othercluster",
265+
Kind: "Namespace",
266+
Name: "name-5",
267+
APIGroup: "anyGroup",
268+
APIVersion: "anyVersion",
269+
Category: "automation",
270+
Namespace: "anyNamespace",
271+
},
272+
{
273+
Cluster: "management",
274+
Kind: "Namespace",
275+
Name: "name-6",
276+
APIGroup: "anyGroup",
277+
APIVersion: "anyVersion",
278+
Category: "automation",
279+
Namespace: "anyNamespace",
280+
},
281+
{
282+
Cluster: "othercluster",
283+
Kind: "Namespace",
284+
Name: "name-7",
285+
APIGroup: "anyGroup",
286+
APIVersion: "anyVersion",
287+
Category: "automation",
288+
Namespace: "anyNamespace",
289+
},
290+
},
291+
expected: []string{"name-2", "name-5", "name-7"},
292+
},
293+
}
294+
for _, tt := range tests {
295+
t.Run(tt.name, func(t *testing.T) {
296+
idxFileLocation := filepath.Join(t.TempDir(), indexFile)
297+
mapping := bleve.NewIndexMapping()
298+
299+
index, err := bleve.New(idxFileLocation, mapping)
300+
g.Expect(err).NotTo(HaveOccurred())
301+
302+
s, err := NewStore(StorageBackendSQLite, t.TempDir(), logr.Discard())
303+
g.Expect(err).NotTo(HaveOccurred())
304+
305+
idx := &bleveIndexer{
306+
idx: index,
307+
store: s,
308+
}
309+
310+
err = idx.Add(context.Background(), tt.objects)
311+
g.Expect(err).NotTo(HaveOccurred())
312+
313+
err = s.StoreObjects(context.Background(), tt.objects)
314+
g.Expect(err).NotTo(HaveOccurred())
315+
316+
// Ensure things got written to the index.
317+
// Iterate through all pages without an initial offset.
318+
iter, err := idx.Search(context.Background(), query{}, nil)
319+
g.Expect(err).NotTo(HaveOccurred())
320+
321+
pageObjects, err := iter.Page(3, 0)
322+
g.Expect(err).NotTo(HaveOccurred())
323+
g.Expect(len(pageObjects)).To(Equal(3))
324+
325+
pageObjects, err = iter.Page(3, 3)
326+
g.Expect(err).NotTo(HaveOccurred())
327+
g.Expect(len(pageObjects)).To(Equal(3))
328+
329+
pageObjects, err = iter.Page(3, 6)
330+
g.Expect(err).NotTo(HaveOccurred())
331+
g.Expect(len(pageObjects)).To(Equal(1))
332+
333+
// Iterate through all pages with an initial offset.
334+
iter, err = idx.Search(context.Background(), query{}, nil)
335+
g.Expect(err).NotTo(HaveOccurred())
336+
337+
pageObjects, err = iter.Page(2, 2)
338+
g.Expect(err).NotTo(HaveOccurred())
339+
g.Expect(len(pageObjects)).To(Equal(2))
340+
341+
pageObjects, err = iter.Page(2, 4)
342+
g.Expect(err).NotTo(HaveOccurred())
343+
g.Expect(len(pageObjects)).To(Equal(2))
344+
345+
pageObjects, err = iter.Page(2, 6)
346+
g.Expect(err).NotTo(HaveOccurred())
347+
g.Expect(len(pageObjects)).To(Equal(1))
348+
349+
// Check that required objects were removed.
350+
err = idx.RemoveByQuery(context.Background(), tt.query)
351+
g.Expect(err).NotTo(HaveOccurred())
352+
353+
iter, err = idx.Search(context.Background(), query{}, nil)
354+
g.Expect(err).NotTo(HaveOccurred())
355+
356+
all, err := iter.All()
357+
g.Expect(err).NotTo(HaveOccurred())
358+
359+
names := []string{}
360+
for _, obj := range all {
361+
names = append(names, obj.Name)
362+
}
363+
364+
g.Expect(names).To(Equal(tt.expected))
365+
366+
})
367+
}
368+
}
369+
216370
type query struct{}
217371

218372
func (q query) GetTerms() string {

pkg/query/store/store.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ type Iterator interface {
8787
Row() (models.Object, error)
8888
// All returns all rows of the iterator
8989
All() ([]models.Object, error)
90+
// Page returns a specified number of rows of the iterator with a specified offset
91+
Page(count int, offset int) ([]models.Object, error)
9092

9193
// Close closes the iterator
9294
Close() error

pkg/query/store/store_test.go

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,109 @@ func TestGetObjects(t *testing.T) {
4040

4141
g.Expect(len(objects) > 0).To(BeTrue())
4242
g.Expect(objects[0].Name).To(Equal(obj.Name))
43+
}
44+
45+
func TestGetObjectsWithPagination(t *testing.T) {
46+
g := NewGomegaWithT(t)
47+
48+
store, db := createStore(t)
49+
50+
testObjects := []models.Object{
51+
{
52+
Cluster: "test-cluster",
53+
Name: "someName-1",
54+
Namespace: "namespace",
55+
Kind: "ValidKind",
56+
},
57+
{
58+
Cluster: "test-cluster",
59+
Name: "someName-2",
60+
Namespace: "namespace",
61+
Kind: "ValidKind",
62+
},
63+
{
64+
Cluster: "test-cluster",
65+
Name: "someName-3",
66+
Namespace: "namespace",
67+
Kind: "ValidKind",
68+
},
69+
{
70+
Cluster: "test-cluster",
71+
Name: "someName-4",
72+
Namespace: "namespace",
73+
Kind: "ValidKind",
74+
},
75+
{
76+
Cluster: "test-cluster",
77+
Name: "someName-5",
78+
Namespace: "namespace",
79+
Kind: "ValidKind",
80+
},
81+
{
82+
Cluster: "test-cluster",
83+
Name: "someName-6",
84+
Namespace: "namespace",
85+
Kind: "ValidKind",
86+
},
87+
{
88+
Cluster: "test-cluster",
89+
Name: "someName-7",
90+
Namespace: "namespace",
91+
Kind: "ValidKind",
92+
},
93+
}
94+
95+
g.Expect(SeedObjects(db, testObjects)).To(Succeed())
4396

97+
iter, err := store.GetObjects(context.Background(), nil, nil)
98+
g.Expect(err).To(BeNil())
99+
100+
objects, err := iter.All()
101+
g.Expect(err).To(BeNil())
102+
g.Expect(len(objects)).To(Equal(len(testObjects)))
103+
g.Expect(objects[0].Name).To(Equal(testObjects[0].Name))
104+
g.Expect(objects[3].Name).To(Equal(testObjects[3].Name))
105+
106+
// With pagination and without an offset.
107+
iter, err = store.GetObjects(context.Background(), nil, nil)
108+
g.Expect(err).To(BeNil())
109+
110+
objects, err = iter.Page(3, 0)
111+
g.Expect(err).To(BeNil())
112+
g.Expect(len(objects)).To(Equal(3))
113+
g.Expect(objects[0].Name).To(Equal(testObjects[0].Name))
114+
115+
objects, err = iter.Page(3, 0)
116+
g.Expect(err).To(BeNil())
117+
g.Expect(len(objects)).To(Equal(3))
118+
g.Expect(objects[0].Name).To(Equal(testObjects[3].Name))
119+
120+
objects, err = iter.Page(3, 0)
121+
g.Expect(err).To(BeNil())
122+
g.Expect(len(objects)).To(Equal(1))
123+
g.Expect(objects[0].Name).To(Equal(testObjects[6].Name))
124+
125+
// With pagination and with an initial offset.
126+
iter, err = store.GetObjects(context.Background(), nil, nil)
127+
g.Expect(err).To(BeNil())
128+
129+
objects, err = iter.Page(2, 2)
130+
g.Expect(err).To(BeNil())
131+
g.Expect(len(objects)).To(Equal(2))
132+
g.Expect(objects[0].Name).To(Equal(testObjects[2].Name))
133+
134+
objects, err = iter.Page(2, 0)
135+
g.Expect(err).To(BeNil())
136+
g.Expect(len(objects)).To(Equal(2))
137+
g.Expect(objects[0].Name).To(Equal(testObjects[4].Name))
138+
139+
objects, err = iter.Page(2, 0)
140+
g.Expect(err).To(BeNil())
141+
g.Expect(len(objects)).To(Equal(1))
142+
g.Expect(objects[0].Name).To(Equal(testObjects[6].Name))
44143
}
45144

46145
func TestDeleteObjects(t *testing.T) {
47-
48146
tests := []struct {
49147
name string
50148
seed []models.Object

0 commit comments

Comments
 (0)