Skip to content

Commit 0188f60

Browse files
authored
fix: littDB consistent keyfiles (#1573)
* Save value size to keyfile. Signed-off-by: Cody Littley <[email protected]> * Add key count to metadata file. Signed-off-by: Cody Littley <[email protected]> * Fix DB size calculation. Signed-off-by: Cody Littley <[email protected]> * Fix key count. Signed-off-by: Cody Littley <[email protected]> * Consistent key files. Signed-off-by: Cody Littley <[email protected]> * remove old keymap reload logic Signed-off-by: Cody Littley <[email protected]> * fix tickers, fix broken test Signed-off-by: Cody Littley <[email protected]> * Fix TODOs. Signed-off-by: Cody Littley <[email protected]> * cleanup Signed-off-by: Cody Littley <[email protected]> * Made suggested change. Signed-off-by: Cody Littley <[email protected]> --------- Signed-off-by: Cody Littley <[email protected]>
1 parent 4262d71 commit 0188f60

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+698
-246
lines changed

common/testutils/test_utils.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"time"
99

1010
"github.com/Layr-Labs/eigensdk-go/logging"
11-
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
1212
"golang.org/x/exp/rand"
1313
)
1414

@@ -38,13 +38,21 @@ func AssertEventuallyTrue(t *testing.T, condition func() bool, duration time.Dur
3838
}
3939

4040
ticker := time.NewTicker(1 * time.Millisecond)
41-
select {
42-
case <-ticker.C:
43-
if condition() {
41+
defer ticker.Stop()
42+
43+
ctx, cancel := context.WithTimeout(context.Background(), duration)
44+
defer cancel()
45+
46+
for {
47+
select {
48+
case <-ticker.C:
49+
if condition() {
50+
return
51+
}
52+
case <-ctx.Done():
53+
require.True(t, condition(), debugInfo...)
4454
return
4555
}
46-
case <-time.After(duration):
47-
assert.True(t, condition(), debugInfo...)
4856
}
4957
}
5058

litt/disktable/control_loop.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type controlLoop struct {
6060
targetFileSize uint32
6161

6262
// The maximum number of keys in a segment.
63-
maxKeyCount uint64
63+
maxKeyCount uint32
6464

6565
// The target size for key files.
6666
targetKeyFileSize uint64
@@ -119,6 +119,7 @@ func (c *controlLoop) enqueue(request controlLoopMessage) error {
119119
// mutate the data in the disk table.
120120
func (c *controlLoop) run() {
121121
ticker := time.NewTicker(c.garbageCollectionPeriod)
122+
defer ticker.Stop()
122123

123124
for {
124125
select {
@@ -157,13 +158,15 @@ func (c *controlLoop) doGarbageCollection() {
157158
return
158159
}
159160

160-
if c.metrics != nil {
161-
defer func() {
161+
defer func() {
162+
if c.metrics != nil {
162163
end := c.clock()
163164
delta := end.Sub(start)
164165
c.metrics.ReportGarbageCollectionLatency(c.name, delta)
165-
}()
166-
}
166+
167+
}
168+
c.updateCurrentSize()
169+
}()
167170

168171
for index := c.lowestSegmentIndex; index <= c.highestSegmentIndex; index++ {
169172
seg := c.segments[index]
@@ -198,8 +201,13 @@ func (c *controlLoop) doGarbageCollection() {
198201
}
199202
}
200203

204+
if seg.Size() > c.immutableSegmentSize {
205+
c.logger.Errorf("segment %d size %d is larger than immutable segment size %d, "+
206+
"reported DB size will not be accurate", index, seg.Size(), c.immutableSegmentSize)
207+
}
208+
201209
c.immutableSegmentSize -= seg.Size()
202-
c.keyCount.Add(-1 * int64(len(keys)))
210+
c.keyCount.Add(-1 * int64(seg.KeyCount()))
203211

204212
// Deletion of segment files will happen when the segment is released by all reservation holders.
205213
seg.Release()
@@ -209,8 +217,6 @@ func (c *controlLoop) doGarbageCollection() {
209217

210218
c.lowestSegmentIndex++
211219
}
212-
213-
c.updateCurrentSize()
214220
}
215221

216222
// getReservedSegment returns the segment with the given index. Segment is reserved, and it is the caller's
@@ -245,7 +251,10 @@ func (c *controlLoop) getSegments() (map[uint32]*segment.Segment, error) {
245251

246252
// updateCurrentSize updates the size of the table.
247253
func (c *controlLoop) updateCurrentSize() {
248-
size := c.immutableSegmentSize + c.segments[c.highestSegmentIndex].Size() + segment.MetadataSize
254+
size := c.immutableSegmentSize +
255+
c.segments[c.highestSegmentIndex].Size() +
256+
c.metadata.Size()
257+
249258
c.size.Store(size)
250259
}
251260

@@ -280,8 +289,6 @@ func (c *controlLoop) handleWriteRequest(req *controlLoopWriteRequest) {
280289
func (c *controlLoop) expandSegments() error {
281290
now := c.clock()
282291

283-
c.immutableSegmentSize += c.segments[c.highestSegmentIndex].Size()
284-
285292
// Seal the previous segment.
286293
flushLoopResponseChan := make(chan struct{}, 1)
287294
request := &flushLoopSealRequest{
@@ -302,6 +309,9 @@ func (c *controlLoop) expandSegments() error {
302309
return fmt.Errorf("failed to seal segment: %w", err)
303310
}
304311

312+
// Record the size of the segment.
313+
c.immutableSegmentSize += c.segments[c.highestSegmentIndex].Size()
314+
305315
// Create a new segment.
306316
salt := [16]byte{}
307317
_, err = c.saltShaker.Read(salt[:])

litt/disktable/disk_table.go

Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ func NewDiskTable(
198198
return nil, fmt.Errorf("failed to gather segment files: %w", err)
199199
}
200200

201+
keyCount := int64(0)
202+
for _, seg := range segments {
203+
keyCount += int64(seg.KeyCount())
204+
}
205+
table.keyCount.Store(keyCount)
206+
201207
immutableSegmentSize := uint64(0)
202208
for _, seg := range segments {
203209
immutableSegmentSize += seg.Size()
@@ -282,6 +288,7 @@ func NewDiskTable(
282288
keymap: keymap,
283289
flushLoop: fLoop,
284290
garbageCollectionPeriod: config.GCPeriod,
291+
immutableSegmentSize: immutableSegmentSize,
285292
}
286293
cLoop.threadsafeHighestSegmentIndex.Store(highestSegmentIndex)
287294
table.controlLoop = cLoop
@@ -306,27 +313,14 @@ func (d *DiskTable) reloadKeymap(
306313
lowestSegmentIndex uint32,
307314
highestSegmentIndex uint32) error {
308315

309-
if len(segments) > 0 {
310-
// TODO(cody.littley): there is currently a race condition that may cause key files and value files to
311-
// be inconsistent. This is not a problem as long as the keymap is not reloaded, since this inconsistency
312-
// only hurts us when we are reloading the keymap. This needs to be fixed, but is not urgent since reloading
313-
// the keymap is not a feature currently used in production.
314-
d.logger.Errorf("there is currently a race condition in this method, " +
315-
"changing keymap type should not be used until fixed")
316-
}
317-
318316
start := d.clock()
319317
defer func() {
320318
d.logger.Infof("spent %v reloading keymap", d.clock().Sub(start))
321319
}()
322320

323-
// It's possible that some of the data written near the end of the previous session was corrupted.
324-
// Read data from the end until the first valid key/value pair is found.
325-
isValid := false
321+
batch := make([]*types.ScopedKey, 0, keymapReloadBatchSize)
326322

327-
batch := make([]*types.KAPair, 0, keymapReloadBatchSize)
328-
329-
for i := highestSegmentIndex; i >= lowestSegmentIndex && i+1 != 0; i-- {
323+
for i := lowestSegmentIndex; i <= highestSegmentIndex; i++ {
330324
if !segments[i].IsSealed() {
331325
// ignore unsealed segment, this will have been created in the current session and will not
332326
// yet contain any data.
@@ -340,32 +334,15 @@ func (d *DiskTable) reloadKeymap(
340334
for keyIndex := len(keys) - 1; keyIndex >= 0; keyIndex-- {
341335
key := keys[keyIndex]
342336

343-
if !isValid {
344-
_, err = segments[i].Read(key.Key, key.Address)
345-
if err == nil {
346-
// we found a valid key/value pair. All subsequent keys are valid.
347-
isValid = true
348-
} else {
349-
// This is not cause for alarm (probably).
350-
// This can happen when the database is not cleanly shut down,
351-
// and just means that some data near the end was not fully committed.
352-
d.logger.Infof("truncated value for key %s with address %s for segment %d",
353-
key.Key, key.Address, i)
354-
}
355-
}
356-
357-
if isValid {
358-
batch = append(batch, key)
359-
if len(batch) == keymapReloadBatchSize {
360-
err = d.keymap.Put(batch)
361-
if err != nil {
362-
return fmt.Errorf("failed to put keys for segment %d: %w", i, err)
363-
}
364-
batch = make([]*types.KAPair, 0, keymapReloadBatchSize)
337+
batch = append(batch, key)
338+
if len(batch) == keymapReloadBatchSize {
339+
err = d.keymap.Put(batch)
340+
if err != nil {
341+
return fmt.Errorf("failed to put keys for segment %d: %w", i, err)
365342
}
343+
batch = make([]*types.ScopedKey, 0, keymapReloadBatchSize)
366344
}
367345
}
368-
369346
}
370347

371348
if len(batch) > 0 {
@@ -766,7 +743,7 @@ func (d *DiskTable) RunGC() error {
766743

767744
// writeKeysToKeymap flushes all keys to the keymap. Once they are flushed, it also removes the keys from the
768745
// unflushedDataCache.
769-
func (d *DiskTable) writeKeysToKeymap(keys []*types.KAPair) error {
746+
func (d *DiskTable) writeKeysToKeymap(keys []*types.ScopedKey) error {
770747
if len(keys) == 0 {
771748
// Nothing to flush.
772749
return nil

litt/disktable/disk_table_test.go

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -811,7 +811,7 @@ func lastFileMissingTest(t *testing.T, tableBuilder *tableBuilder, typeToDelete
811811
// to the files being artificially deleted in this test), the keymap will not hold any value that has not
812812
// yet been durably flushed to disk.
813813
for key := range missingKeys {
814-
err = table.(*DiskTable).keymap.Delete([]*types.KAPair{{Key: []byte(key)}})
814+
err = table.(*DiskTable).keymap.Delete([]*types.ScopedKey{{Key: []byte(key)}})
815815
require.NoError(t, err)
816816
}
817817

@@ -1041,7 +1041,7 @@ func truncatedKeyFileTest(t *testing.T, tableBuilder *tableBuilder) {
10411041
// to the files being artificially deleted in this test), the keymap will not hold any value that has not
10421042
// yet been durably flushed to disk.
10431043
for key := range missingKeys {
1044-
err = table.(*DiskTable).keymap.Delete([]*types.KAPair{{Key: []byte(key)}})
1044+
err = table.(*DiskTable).keymap.Delete([]*types.ScopedKey{{Key: []byte(key)}})
10451045
require.NoError(t, err)
10461046
}
10471047

@@ -1281,7 +1281,7 @@ func truncatedValueFileTest(t *testing.T, tableBuilder *tableBuilder) {
12811281
// to the files being artificially deleted in this test), the keymap will not hold any value that has not
12821282
// yet been durably flushed to disk.
12831283
for key := range missingKeys {
1284-
err = table.(*DiskTable).keymap.Delete([]*types.KAPair{{Key: []byte(key)}})
1284+
err = table.(*DiskTable).keymap.Delete([]*types.ScopedKey{{Key: []byte(key)}})
12851285
require.NoError(t, err)
12861286
}
12871287

@@ -1485,7 +1485,7 @@ func unflushedKeysTest(t *testing.T, tableBuilder *tableBuilder) {
14851485
// to the files being artificially deleted in this test), the keymap will not hold any value that has not
14861486
// yet been durably flushed to disk.
14871487
for key := range missingKeys {
1488-
err = table.(*DiskTable).keymap.Delete([]*types.KAPair{{Key: []byte(key)}})
1488+
err = table.(*DiskTable).keymap.Delete([]*types.ScopedKey{{Key: []byte(key)}})
14891489
require.NoError(t, err)
14901490
}
14911491

@@ -2137,10 +2137,18 @@ func tableSizeTest(t *testing.T, tableBuilder *tableBuilder) {
21372137
// Once in a while, scan the table and verify that all expected values are present.
21382138
// Don't do this every time for the sake of test runtime.
21392139
if rand.BoolWithProbability(0.01) || i == iterations-1 /* always check on the last iteration */ {
2140+
2141+
// Force garbage collection to run in order to remove expired values from counts.
2142+
err = table.Flush()
2143+
require.NoError(t, err)
2144+
err = (table).(*DiskTable).RunGC()
2145+
require.NoError(t, err)
2146+
21402147
// Remove expired values from the expected values.
21412148
newlyExpiredKeys := make([]string, 0)
21422149
for key, creationTime := range creationTimes {
2143-
if newTime.Sub(creationTime) > ttl {
2150+
age := newTime.Sub(creationTime)
2151+
if age > ttl {
21442152
newlyExpiredKeys = append(newlyExpiredKeys, key)
21452153
}
21462154
}
@@ -2183,7 +2191,74 @@ func tableSizeTest(t *testing.T, tableBuilder *tableBuilder) {
21832191
err = table.RunGC()
21842192
require.NoError(t, err)
21852193

2194+
// disable garbage collection
2195+
err = table.SetTTL(0)
2196+
require.NoError(t, err)
2197+
err = table.Flush()
2198+
require.NoError(t, err)
2199+
2200+
// Write some data that won't expire, just to be sure that the table is not empty.
2201+
for i := 0; i < 10; i++ {
2202+
key := rand.PrintableVariableBytes(32, 64)
2203+
value := rand.PrintableVariableBytes(1, 128)
2204+
err = table.Put(key, value)
2205+
require.NoError(t, err)
2206+
expectedValues[string(key)] = value
2207+
}
2208+
2209+
err = table.Flush()
2210+
require.NoError(t, err)
2211+
21862212
reportedSize := table.Size()
2213+
reportedKeyCount := table.KeyCount()
2214+
2215+
// The exact key count is hard to predict for the sake of this unit test, since GC is "lazy" and may not
2216+
// immediately remove all values that are legal to be removed. But at the very least, all unexpired
2217+
// values should be present, and the key count should not exceed the number of total inserted values.
2218+
require.GreaterOrEqual(t, reportedKeyCount, uint64(len(expectedValues)))
2219+
require.LessOrEqual(t, reportedKeyCount, uint64(len(expectedValues)+len(expiredValues)))
2220+
2221+
err = table.Close()
2222+
require.NoError(t, err)
2223+
2224+
// Walk the "directory" file tree and calculate the actual size of the table.
2225+
// There is some asynchrony in file deletion, so we retry a reasonable number of times.
2226+
testutils.AssertEventuallyTrue(t, func() bool {
2227+
actualSize := uint64(0)
2228+
2229+
err = filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
2230+
if err != nil {
2231+
// files may be deleted in the middle of the walk
2232+
return nil
2233+
}
2234+
if info.IsDir() {
2235+
// directory sizes are not factored into the table size
2236+
return nil
2237+
}
2238+
if strings.Contains(path, "keymap") {
2239+
// table size does not currently include the keymap size
2240+
return nil
2241+
}
2242+
actualSize += uint64(info.Size())
2243+
return nil
2244+
})
2245+
require.NoError(t, err)
2246+
return actualSize == reportedSize
2247+
}, time.Second)
2248+
2249+
// Restart the table. The size should be accurately reported.
2250+
table, err = tableBuilder.builder(clock, tableName, []string{directory})
2251+
require.NoError(t, err)
2252+
2253+
newReportedSize := table.Size()
2254+
newReportedKeyCount := table.KeyCount()
2255+
2256+
// New size should be greater than the old size, since GC is disabled and
2257+
// we will have started a new segment upon restart.
2258+
require.LessOrEqual(t, reportedSize, newReportedSize)
2259+
2260+
// The number of keys should be the same as before.
2261+
require.Equal(t, reportedKeyCount, newReportedKeyCount)
21872262

21882263
err = table.Close()
21892264
require.NoError(t, err)
@@ -2211,8 +2286,7 @@ func tableSizeTest(t *testing.T, tableBuilder *tableBuilder) {
22112286
})
22122287
require.NoError(t, err)
22132288

2214-
return actualSize == reportedSize
2215-
//require.Equal(t, actualSize, reportedSize, "delta: %d", int(actualSize)-int(reportedSize))
2289+
return actualSize == newReportedSize
22162290
}, time.Second)
22172291
}
22182292

litt/disktable/keymap/keymap.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ const KeymapInitializedFileName = "initialized"
1919

2020
// Keymap maintains a mapping between keys and addresses. Implementations of this interface are goroutine safe.
2121
type Keymap interface {
22-
// Put adds keys to the keymap as a batch.
22+
// Put adds keys to the keymap as a batch. This method is required to store the address, but can ignore
23+
// other fields in the ScopedKey struct such as the value length.
2324
//
2425
// A keymap provides atomicity for individual key-address pairs, but not for the batch as a whole.
2526
//
2627
// It is not safe to modify the contents of any slices passed to this function after the call.
2728
// This includes the byte slices containing the keys.
28-
Put(pairs []*types.KAPair) error
29+
Put(pairs []*types.ScopedKey) error
2930

3031
// Get returns the address for a key. Returns true if the key exists, and false otherwise (i.e. does not
3132
// return an error if the key does not exist).
@@ -39,7 +40,7 @@ type Keymap interface {
3940
//
4041
// It is not safe to modify the contents of any slices passed to this function after the call.
4142
// This includes the byte slices containing the keys.
42-
Delete(keys []*types.KAPair) error
43+
Delete(keys []*types.ScopedKey) error
4344

4445
// Stop stops the keymap.
4546
Stop() error

0 commit comments

Comments
 (0)