Skip to content

Commit adbb708

Browse files
committed
Add support for KeysOnly
Modifies SQL queries to only return value and prev_value when required Signed-off-by: Brad Davidson <[email protected]>
1 parent 44ed42d commit adbb708

File tree

15 files changed

+144
-127
lines changed

15 files changed

+144
-127
lines changed

pkg/drivers/generic/generic.go

Lines changed: 62 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,18 @@ const (
2929
var _ server.Dialect = (*Generic)(nil)
3030

3131
var (
32-
columns = "kv.id AS theid, kv.name AS thename, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"
33-
revSQL = `
32+
columns = "kv.id AS theid, kv.name AS thename, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease"
33+
withVal = columns + ", kv.value"
34+
withOldVal = withVal + ", kv.old_value"
35+
revSQL = `
3436
SELECT MAX(rkv.id) AS id
3537
FROM kine AS rkv`
3638

3739
compactRevSQL = `
3840
SELECT MAX(crkv.prev_revision) AS prev_revision
3941
FROM kine AS crkv
4042
WHERE crkv.name = 'compact_rev_key'`
41-
42-
listSQL = fmt.Sprintf(`
43+
listFmt = `
4344
SELECT *
4445
FROM (
4546
SELECT (%s), (%s), %s
@@ -57,7 +58,10 @@ var (
5758
?
5859
) AS lkv
5960
ORDER BY lkv.thename ASC
60-
`, revSQL, compactRevSQL, columns)
61+
`
62+
listSQL = fmt.Sprintf(listFmt, revSQL, compactRevSQL, columns)
63+
listValSQL = fmt.Sprintf(listFmt, revSQL, compactRevSQL, withVal)
64+
listOldValSQL = fmt.Sprintf(listFmt, revSQL, compactRevSQL, withOldVal)
6165
)
6266

6367
type ErrRetry func(error) bool
@@ -73,30 +77,31 @@ type ConnectionPoolConfig struct {
7377
type Generic struct {
7478
sync.Mutex
7579

76-
LockWrites bool
77-
LastInsertID bool
78-
DB *sql.DB
79-
GetCurrentSQL string
80-
GetRevisionSQL string
81-
RevisionSQL string
82-
ListRevisionStartSQL string
83-
GetRevisionAfterSQL string
84-
CountCurrentSQL string
85-
CountRevisionSQL string
86-
AfterSQL string
87-
DeleteSQL string
88-
CompactSQL string
89-
UpdateCompactSQL string
90-
PostCompactSQL string
91-
InsertSQL string
92-
FillSQL string
93-
InsertLastInsertIDSQL string
94-
GetSizeSQL string
95-
Retry ErrRetry
96-
InsertRetry ErrRetry
97-
TranslateErr TranslateErr
98-
ErrCode ErrCode
99-
FillRetryDuration time.Duration
80+
LockWrites bool
81+
LastInsertID bool
82+
DB *sql.DB
83+
GetCurrentSQL string
84+
GetCurrentValSQL string
85+
ListRevisionStartSQL string
86+
ListRevisionStartValSQL string
87+
GetRevisionAfterSQL string
88+
GetRevisionAfterValSQL string
89+
CountCurrentSQL string
90+
CountRevisionSQL string
91+
AfterOldValSQL string
92+
DeleteSQL string
93+
CompactSQL string
94+
UpdateCompactSQL string
95+
PostCompactSQL string
96+
InsertSQL string
97+
FillSQL string
98+
InsertLastInsertIDSQL string
99+
GetSizeSQL string
100+
Retry ErrRetry
101+
InsertRetry ErrRetry
102+
TranslateErr TranslateErr
103+
ErrCode ErrCode
104+
FillRetryDuration time.Duration
100105
}
101106

102107
func q(sql, param string, numbered bool) string {
@@ -200,15 +205,12 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
200205
return &Generic{
201206
DB: db,
202207

203-
GetRevisionSQL: q(fmt.Sprintf(`
204-
SELECT
205-
0, 0, %s
206-
FROM kine AS kv
207-
WHERE kv.id = ?`, columns), paramCharacter, numbered),
208-
209-
GetCurrentSQL: q(fmt.Sprintf(listSQL, "AND mkv.name > ?"), paramCharacter, numbered),
210-
ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered),
211-
GetRevisionAfterSQL: q(fmt.Sprintf(listSQL, "AND mkv.name > ? AND mkv.id <= ?"), paramCharacter, numbered),
208+
GetCurrentSQL: q(fmt.Sprintf(listSQL, "AND mkv.name > ?"), paramCharacter, numbered),
209+
GetCurrentValSQL: q(fmt.Sprintf(listValSQL, "AND mkv.name > ?"), paramCharacter, numbered),
210+
ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered),
211+
ListRevisionStartValSQL: q(fmt.Sprintf(listValSQL, "AND mkv.id <= ?"), paramCharacter, numbered),
212+
GetRevisionAfterSQL: q(fmt.Sprintf(listSQL, "AND mkv.name > ? AND mkv.id <= ?"), paramCharacter, numbered),
213+
GetRevisionAfterValSQL: q(fmt.Sprintf(listValSQL, "AND mkv.name > ? AND mkv.id <= ?"), paramCharacter, numbered),
212214

213215
CountCurrentSQL: q(fmt.Sprintf(`
214216
SELECT (%s), COUNT(c.theid)
@@ -222,13 +224,13 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
222224
%s
223225
) c`, revSQL, fmt.Sprintf(listSQL, "AND mkv.name > ? AND mkv.id <= ?")), paramCharacter, numbered),
224226

225-
AfterSQL: q(fmt.Sprintf(`
227+
AfterOldValSQL: q(fmt.Sprintf(`
226228
SELECT (%s), (%s), %s
227229
FROM kine AS kv
228230
WHERE
229231
kv.name LIKE ? AND
230232
kv.id > ?
231-
ORDER BY kv.id ASC`, revSQL, compactRevSQL, columns), paramCharacter, numbered),
233+
ORDER BY kv.id ASC`, revSQL, compactRevSQL, withOldVal), paramCharacter, numbered),
232234

233235
DeleteSQL: q(`
234236
DELETE FROM kine AS kv
@@ -323,34 +325,44 @@ func (d *Generic) PostCompact(ctx context.Context) error {
323325
return nil
324326
}
325327

326-
func (d *Generic) GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) {
327-
return d.query(ctx, d.GetRevisionSQL, revision)
328-
}
329-
330328
func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error {
331329
logrus.Tracef("DELETEREVISION %v", revision)
332330
_, err := d.execute(ctx, d.DeleteSQL, revision)
333331
return err
334332
}
335333

336-
func (d *Generic) ListCurrent(ctx context.Context, prefix, startKey string, limit int64, includeDeleted bool) (*sql.Rows, error) {
337-
sql := d.GetCurrentSQL
334+
func (d *Generic) ListCurrent(ctx context.Context, prefix, startKey string, limit int64, includeDeleted, keysOnly bool) (*sql.Rows, error) {
335+
var sql string
336+
if keysOnly {
337+
sql = d.GetCurrentSQL
338+
} else {
339+
sql = d.GetCurrentValSQL
340+
}
338341
if limit > 0 {
339342
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
340343
}
341344
return d.query(ctx, sql, prefix, startKey, includeDeleted)
342345
}
343346

344-
func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) {
347+
func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted, keysOnly bool) (*sql.Rows, error) {
348+
var sql string
345349
if startKey == "" {
346-
sql := d.ListRevisionStartSQL
350+
if keysOnly {
351+
sql = d.ListRevisionStartSQL
352+
} else {
353+
sql = d.ListRevisionStartValSQL
354+
}
347355
if limit > 0 {
348356
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
349357
}
350358
return d.query(ctx, sql, prefix, revision, includeDeleted)
351359
}
352360

353-
sql := d.GetRevisionAfterSQL
361+
if keysOnly {
362+
sql = d.GetRevisionAfterSQL
363+
} else {
364+
sql = d.GetRevisionAfterValSQL
365+
}
354366
if limit > 0 {
355367
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
356368
}
@@ -390,7 +402,7 @@ func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
390402
}
391403

392404
func (d *Generic) After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) {
393-
sql := d.AfterSQL
405+
sql := d.AfterOldValSQL
394406
if limit > 0 {
395407
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
396408
}

pkg/drivers/generic/tx.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,6 @@ func (t *Tx) Compact(ctx context.Context, revision int64) (int64, error) {
8080
return res.RowsAffected()
8181
}
8282

83-
func (t *Tx) GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) {
84-
return t.query(ctx, t.d.GetRevisionSQL, revision)
85-
}
86-
8783
func (t *Tx) DeleteRevision(ctx context.Context, revision int64) error {
8884
logrus.Tracef("TX DELETEREVISION %v", revision)
8985
_, err := t.execute(ctx, t.d.DeleteSQL, revision)

pkg/drivers/nats/backend.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (b *Backend) Count(ctx context.Context, prefix, startKey string, revision i
146146
}
147147

148148
// Get returns the store's current revision, the associated server.KeyValue or an error.
149-
func (b *Backend) Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (int64, *server.KeyValue, error) {
149+
func (b *Backend) Get(ctx context.Context, key, rangeEnd string, limit, revision int64, keysOnly bool) (int64, *server.KeyValue, error) {
150150
storeRev := b.kv.BucketRevision()
151151
// Get the kv entry and return the revision.
152152
rev, nv, err := b.get(ctx, key, revision, false)
@@ -338,8 +338,8 @@ func (b *Backend) Update(ctx context.Context, key string, value []byte, revision
338338
// that are alphanumerically equal to or greater than the startKey.
339339
// If limit is provided, the maximum set of matches is limited.
340340
// If revision is provided, this indicates the maximum revision to return.
341-
func (b *Backend) List(ctx context.Context, prefix, startKey string, limit, maxRevision int64) (int64, []*server.KeyValue, error) {
342-
matches, err := b.kv.List(ctx, prefix, startKey, limit, maxRevision)
341+
func (b *Backend) List(ctx context.Context, prefix, startKey string, limit, maxRevision int64, keysOnly bool) (int64, []*server.KeyValue, error) {
342+
matches, err := b.kv.List(ctx, prefix, startKey, limit, maxRevision, keysOnly)
343343
if err != nil {
344344
return 0, nil, err
345345
}

pkg/drivers/nats/backend_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func TestBackend_Get(t *testing.T) {
168168

169169
time.Sleep(2 * time.Millisecond)
170170

171-
srev, ent, err := b.Get(ctx, "/a", "", 0, 0)
171+
srev, ent, err := b.Get(ctx, "/a", "", 0, 0, false)
172172
noErr(t, err)
173173
expEqual(t, 1, srev)
174174
expEqual(t, "/a", ent.Key)
@@ -180,15 +180,15 @@ func TestBackend_Get(t *testing.T) {
180180
time.Sleep(time.Second)
181181

182182
// Latest is gone.
183-
_, ent, err = b.Get(ctx, "/a", "", 0, 0)
183+
_, ent, err = b.Get(ctx, "/a", "", 0, 0, false)
184184
expEqualErr(t, nil, err)
185185

186186
// Get at a revision will fail also.
187-
_, ent, err = b.Get(ctx, "/a", "", 0, 1)
187+
_, ent, err = b.Get(ctx, "/a", "", 0, 1, false)
188188
expEqualErr(t, nil, err)
189189

190190
// Get at later revision, does not exist.
191-
_, _, err = b.Get(ctx, "/a", "", 0, 2)
191+
_, _, err = b.Get(ctx, "/a", "", 0, 2, false)
192192
expEqualErr(t, nil, err)
193193

194194
// Create it again and update it.
@@ -200,7 +200,7 @@ func TestBackend_Get(t *testing.T) {
200200
noErr(t, err)
201201

202202
// Get at prior version.
203-
rev, ent, err = b.Get(ctx, "/a", "", 0, rev)
203+
rev, ent, err = b.Get(ctx, "/a", "", 0, rev, false)
204204
noErr(t, err)
205205
expEqual(t, 3, rev)
206206
expEqual(t, "/a", ent.Key)
@@ -304,44 +304,44 @@ func TestBackend_List(t *testing.T) {
304304
time.Sleep(time.Millisecond)
305305

306306
// List the keys.
307-
rev, ents, err := b.List(ctx, "/", "", 0, 0)
307+
rev, ents, err := b.List(ctx, "/", "", 0, 0, false)
308308
noErr(t, err)
309309
expEqual(t, 7, rev)
310310
expEqual(t, 7, len(ents))
311311
expSortedKeys(t, ents)
312312

313313
// List the keys with prefix.
314-
rev, ents, err = b.List(ctx, "/a", "", 0, 0)
314+
rev, ents, err = b.List(ctx, "/a", "", 0, 0, false)
315315
noErr(t, err)
316316
expEqual(t, 7, rev)
317317
expEqual(t, 3, len(ents))
318318
expSortedKeys(t, ents)
319319

320320
// List the keys >= start key.
321-
rev, ents, err = b.List(ctx, "/", "b", 0, 0)
321+
rev, ents, err = b.List(ctx, "/", "b", 0, 0, false)
322322
noErr(t, err)
323323
expEqual(t, 7, rev)
324324
expEqual(t, 4, len(ents))
325325
expSortedKeys(t, ents)
326326

327327
// List the keys up to a revision.
328-
rev, ents, err = b.List(ctx, "/", "", 0, 3)
328+
rev, ents, err = b.List(ctx, "/", "", 0, 3, false)
329329
noErr(t, err)
330330
expEqual(t, 7, rev)
331331
expEqual(t, 3, len(ents))
332332
expSortedKeys(t, ents)
333333
expEqualKeys(t, []string{"/a", "/a/b/c", "/b"}, ents)
334334

335335
// List the keys with a limit.
336-
rev, ents, err = b.List(ctx, "/", "", 4, 0)
336+
rev, ents, err = b.List(ctx, "/", "", 4, 0, false)
337337
noErr(t, err)
338338
expEqual(t, 7, rev)
339339
expEqual(t, 4, len(ents))
340340
expSortedKeys(t, ents)
341341
expEqualKeys(t, []string{"/a", "/a/b", "/a/b/c", "/b"}, ents)
342342

343343
// List the keys with a limit after some start key.
344-
rev, ents, err = b.List(ctx, "/", "b", 2, 0)
344+
rev, ents, err = b.List(ctx, "/", "b", 2, 0, false)
345345
noErr(t, err)
346346
expEqual(t, 7, rev)
347347
expEqual(t, 2, len(ents))

pkg/drivers/nats/kv.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ func (e *KeyValue) Count(ctx context.Context, prefix, startKey string, revision
433433
return count, nil
434434
}
435435

436-
func (e *KeyValue) List(ctx context.Context, prefix, startKey string, limit, revision int64) ([]jetstream.KeyValueEntry, error) {
436+
func (e *KeyValue) List(ctx context.Context, prefix, startKey string, limit, revision int64, keysOnly bool) ([]jetstream.KeyValueEntry, error) {
437437
seekKey := prefix
438438
if startKey != "" {
439439
seekKey = strings.TrimSuffix(seekKey, "/")

pkg/drivers/nats/logger.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (b *BackendLogger) Start(ctx context.Context) error {
3131
}
3232

3333
// Get returns the store's current revision, the associated server.KeyValue or an error.
34-
func (b *BackendLogger) Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (revRet int64, kvRet *server.KeyValue, errRet error) {
34+
func (b *BackendLogger) Get(ctx context.Context, key, rangeEnd string, limit, revision int64, keysOnly bool) (revRet int64, kvRet *server.KeyValue, errRet error) {
3535
start := time.Now()
3636
defer func() {
3737
dur := time.Since(start)
@@ -43,7 +43,7 @@ func (b *BackendLogger) Get(ctx context.Context, key, rangeEnd string, limit, re
4343
b.logMethod(dur, fStr, key, revision, revRet, kvRet != nil, size, errRet, dur)
4444
}()
4545

46-
return b.backend.Get(ctx, key, rangeEnd, limit, revision)
46+
return b.backend.Get(ctx, key, rangeEnd, limit, revision, keysOnly)
4747
}
4848

4949
// Create attempts to create the key-value entry and returns the revision number.
@@ -69,15 +69,15 @@ func (b *BackendLogger) Delete(ctx context.Context, key string, revision int64)
6969
return b.backend.Delete(ctx, key, revision)
7070
}
7171

72-
func (b *BackendLogger) List(ctx context.Context, prefix, startKey string, limit, revision int64) (revRet int64, kvRet []*server.KeyValue, errRet error) {
72+
func (b *BackendLogger) List(ctx context.Context, prefix, startKey string, limit, revision int64, keysOnly bool) (revRet int64, kvRet []*server.KeyValue, errRet error) {
7373
start := time.Now()
7474
defer func() {
7575
dur := time.Since(start)
7676
fStr := "LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v, duration=%s"
7777
b.logMethod(dur, fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, dur)
7878
}()
7979

80-
return b.backend.List(ctx, prefix, startKey, limit, revision)
80+
return b.backend.List(ctx, prefix, startKey, limit, revision, keysOnly)
8181
}
8282

8383
// Count returns an exact count of the number of matching keys and the current revision of the database

0 commit comments

Comments
 (0)