-
Notifications
You must be signed in to change notification settings - Fork 237
feat: litt unlock command #1823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
2a18cf8
856834b
a5b5ac6
42e3a2a
136e91b
8b64f92
21d3769
a25419a
b9a8f6d
9274b9c
b1958df
57eeab6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/Layr-Labs/eigenda/common" | ||
"github.com/Layr-Labs/eigenda/litt/disktable" | ||
"github.com/urfave/cli/v2" | ||
) | ||
|
||
// called by the CLI to unlock a LittDB file system. | ||
func unlockCommand(ctx *cli.Context) error { | ||
logger, err := common.NewLogger(common.DefaultConsoleLoggerConfig()) | ||
if err != nil { | ||
return fmt.Errorf("failed to create logger: %w", err) | ||
} | ||
sources := ctx.StringSlice(srcFlag.Name) | ||
|
||
if len(sources) == 0 { | ||
return fmt.Errorf("at least one source path is required") | ||
} | ||
|
||
return disktable.Unlock(logger, sources) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package disktable | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
|
||
"github.com/Layr-Labs/eigenda/litt/util" | ||
"github.com/Layr-Labs/eigensdk-go/logging" | ||
) | ||
|
||
// Unlocks a LittDB file system. | ||
// | ||
// DANGER: calling this method opens the door for unsafe concurrent operations on LittDB files. | ||
// With great power comes great responsibility. | ||
func Unlock(logger logging.Logger, sourcePaths []string) error { | ||
for _, sourcePath := range sourcePaths { | ||
err := filepath.WalkDir(sourcePath, func(path string, d os.DirEntry, err error) error { | ||
if err != nil { | ||
return err | ||
} | ||
if d.IsDir() { | ||
return nil | ||
} | ||
|
||
if strings.HasSuffix(path, util.LockfileName) { | ||
logger.Infof("Removing lock file %s", path) | ||
if removeErr := os.Remove(path); removeErr != nil { | ||
logger.Error("Failed to remove lock file", "path", path, "error", removeErr) | ||
return fmt.Errorf("failed to remove lock file %s: %w", path, removeErr) | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
|
||
if err != nil { | ||
return fmt.Errorf("failed to walk directory %s: %w", sourcePath, err) | ||
} | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
package test | ||
|
||
import ( | ||
"os" | ||
"path" | ||
"path/filepath" | ||
"strings" | ||
"testing" | ||
|
||
testrandom "github.com/Layr-Labs/eigenda/common/testutils/random" | ||
"github.com/Layr-Labs/eigenda/litt" | ||
"github.com/Layr-Labs/eigenda/litt/disktable" | ||
"github.com/Layr-Labs/eigenda/litt/littbuilder" | ||
"github.com/Layr-Labs/eigenda/litt/util" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
// Note: this test is defined in the test package to avoid circular dependencies. | ||
|
||
func TestUnlock(t *testing.T) { | ||
testDir := t.TempDir() | ||
rand := testrandom.NewTestRandom() | ||
volumes := []string{path.Join(testDir, "volume1", path.Join(testDir, "volume2"), path.Join(testDir, "volume3"))} | ||
|
||
config, err := litt.DefaultConfig(volumes...) | ||
config.Fsync = false // Disable fsync for faster tests | ||
config.TargetSegmentFileSize = 100 | ||
require.NoError(t, err) | ||
|
||
db, err := littbuilder.NewDB(config) | ||
require.NoError(t, err) | ||
|
||
table, err := db.GetTable("test_table") | ||
require.NoError(t, err) | ||
|
||
expectedData := make(map[string][]byte) | ||
|
||
// Write some data | ||
for i := 0; i < 100; i++ { | ||
key := rand.PrintableBytes(32) | ||
value := rand.PrintableVariableBytes(1, 100) | ||
|
||
expectedData[string(key)] = value | ||
err = table.Put(key, value) | ||
require.NoError(t, err, "Failed to put data in table") | ||
} | ||
|
||
// Look for lock files. We should see one for each volume. | ||
litt3 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
lockFileCount := 0 | ||
err = filepath.Walk(testDir, func(path string, info os.FileInfo, err error) error { | ||
if err != nil { | ||
return err | ||
} | ||
if info.IsDir() { | ||
return nil | ||
} | ||
if strings.HasSuffix(path, util.LockfileName) { | ||
lockFileCount++ | ||
} | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
// Unlock the DB. This should remove all lock files, but leave other files intact. | ||
err = disktable.Unlock(config.Logger, volumes) | ||
require.NoError(t, err, "Failed to unlock the database") | ||
|
||
// There should be no lock files left. | ||
lockFileCount = 0 | ||
err = filepath.Walk(testDir, func(path string, info os.FileInfo, err error) error { | ||
if err != nil { | ||
return err | ||
} | ||
if info.IsDir() { | ||
return nil | ||
} | ||
if strings.HasSuffix(path, util.LockfileName) { | ||
lockFileCount++ | ||
} | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
require.Equal(t, 0, lockFileCount, "There should be no lock files left after unlocking") | ||
|
||
// Calling unlock again should not cause any issues. | ||
err = disktable.Unlock(config.Logger, volumes) | ||
require.NoError(t, err, "Failed to unlock the database again") | ||
|
||
// Verify that the data is still intact. | ||
for key, expectedValue := range expectedData { | ||
value, ok, err := table.Get([]byte(key)) | ||
require.NoError(t, err, "Failed to get data from table") | ||
require.True(t, ok, "Failed to get data from table") | ||
require.Equal(t, expectedValue, value, "Data mismatch for key %s", key) | ||
} | ||
|
||
// Restart the database and verify the data again. | ||
err = db.Close() | ||
require.NoError(t, err) | ||
|
||
db, err = littbuilder.NewDB(config) | ||
require.NoError(t, err) | ||
|
||
table, err = db.GetTable("test_table") | ||
require.NoError(t, err) | ||
|
||
for key, expectedValue := range expectedData { | ||
value, ok, err := table.Get([]byte(key)) | ||
require.NoError(t, err, "Failed to get data from table after restart") | ||
require.True(t, ok, "Failed to get data from table after restart") | ||
require.Equal(t, expectedValue, value, "Data mismatch for key %s after restart", key) | ||
} | ||
|
||
err = db.Close() | ||
require.NoError(t, err, "Failed to close the database after restart") | ||
} | ||
|
||
func TestPurgeLocks(t *testing.T) { | ||
testDir := t.TempDir() | ||
rand := testrandom.NewTestRandom() | ||
volumes := []string{path.Join(testDir, "volume1", path.Join(testDir, "volume2"), path.Join(testDir, "volume3"))} | ||
|
||
config, err := litt.DefaultConfig(volumes...) | ||
config.Fsync = false // Disable fsync for faster tests | ||
config.TargetSegmentFileSize = 100 | ||
require.NoError(t, err) | ||
|
||
db, err := littbuilder.NewDB(config) | ||
require.NoError(t, err) | ||
|
||
table, err := db.GetTable("test_table") | ||
require.NoError(t, err) | ||
|
||
expectedData := make(map[string][]byte) | ||
|
||
// Write some data | ||
for i := 0; i < 100; i++ { | ||
key := rand.PrintableBytes(32) | ||
value := rand.PrintableVariableBytes(1, 100) | ||
|
||
expectedData[string(key)] = value | ||
err = table.Put(key, value) | ||
require.NoError(t, err, "Failed to put data in table") | ||
} | ||
|
||
// Opening a second instance of the database should fail due to existing locks. | ||
_, err = littbuilder.NewDB(config) | ||
require.Error(t, err, "Expected error when opening a second instance of the database with existing locks") | ||
|
||
// Open a new instance of the database at the same time. Normally this is not possible, but it becomes possible | ||
// when we purge locks. | ||
config.PurgeLocks = true | ||
db2, err := littbuilder.NewDB(config) | ||
require.NoError(t, err, "Failed to open a second instance of the database") | ||
|
||
// This test doesn't bother to verify the table data, since we are in unsafe territory now with multiple instances | ||
// of the database running at the same time. | ||
|
||
err = db.Close() | ||
require.NoError(t, err, "Failed to close the first instance of the database") | ||
err = db2.Close() | ||
require.NoError(t, err) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -136,6 +136,10 @@ type Config struct { | |
// Directories do not need to be on the same filesystem. | ||
LittDBStoragePaths []string | ||
|
||
// If true, then purge LittDB locks on startup. Potentially useful to get rid of zombie lock files, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't quite understand the scenario in which zombie lock files appear? Is it due to some sort of ungraceful termination with containers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Expanded the documentation here:
|
||
// but also dangerous (multiple LittDB processes operating on the same files can lead to data corruption). | ||
LittUnsafePurgeLocks bool | ||
|
||
// The rate limit for the number of bytes served by the GetChunks API if the data is in the cache. | ||
// Unit is in megabytes per second. | ||
GetChunksHotCacheReadLimitMB float64 | ||
|
@@ -393,6 +397,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { | |
LittDBReadCacheSizeGB: ctx.GlobalFloat64(flags.LittDBReadCacheSizeGBFlag.Name), | ||
LittDBReadCacheSizeFraction: ctx.GlobalFloat64(flags.LittDBReadCacheSizeFractionFlag.Name), | ||
LittDBStoragePaths: ctx.GlobalStringSlice(flags.LittDBStoragePathsFlag.Name), | ||
LittUnsafePurgeLocks: ctx.GlobalBool(flags.LittUnsafePurgeLocksFlag.Name), | ||
DownloadPoolSize: ctx.GlobalInt(flags.DownloadPoolSizeFlag.Name), | ||
GetChunksHotCacheReadLimitMB: ctx.GlobalFloat64(flags.GetChunksHotCacheReadLimitMBFlag.Name), | ||
GetChunksHotBurstLimitMB: ctx.GlobalFloat64(flags.GetChunksHotBurstLimitMBFlag.Name), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add some sort of confirmation flow where the user has to say "yes" and if they want to skip there's the
force-unlock
or--force
flag for skipping that flow?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. You either must type
I know what I am doing
, or include a--force
tag.