Skip to content

Commit 11659e8

Browse files
authored
fix: semaphore use for StoreChunks (#1866)
* Fix semaphore use for StoreChunks() * make suggested changes * Override locks when starting validator, by default * Change default flag behavior * make suggested changes
1 parent 64032d2 commit 11659e8

File tree

7 files changed

+105
-79
lines changed

7 files changed

+105
-79
lines changed

node/config.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -128,24 +128,12 @@ type Config struct {
128128
// Directories do not need to be on the same filesystem.
129129
LittDBStoragePaths []string
130130

131-
// If true, then purge LittDB locks on startup. Potentially useful to get rid of zombie lock files,
132-
// but also dangerous (multiple LittDB processes operating on the same files can lead to data corruption).
131+
// If true, then LittDB will refuse to start if it can't acquire locks on the database file structure.
133132
//
134-
// When LittDB starts up, it attempts to create lock files. When a validator is forcefully shut down, lock files
135-
// may be left behind. At startup time, if LittDB observes existing lock files, it first checks to see
136-
// if the process that created the lock files is still running. The lock files contain the creator's PID, and so
137-
// LittDB checks to see if there is any process with that PID still running.
138-
//
139-
// Although it should be rare, it's possible that another process may be started with the same PID as the
140-
// PID used to create the lock files. When this happens, LittDB will be prevented from starting up out of
141-
// fear of another process trying to access the same files, even though the original process that created the
142-
// lock files is no longer running. If that happens, this flag is a safe way to force LittDB to start up
143-
// without being blocked by those lock files. BE VERY CERTAIN THAT THE OTHER PROCESS IS ACTUALLY DEAD!
144-
// If two instances of LittDB are running on the same files, it WILL lead to data corruption.
145-
//
146-
// An alternate way to clear the LittDB lock files is via the LittDB CLI with the "litt unlock" command.
147-
// Run "litt unlock --help" for more information.
148-
LittUnsafePurgeLocks bool
133+
// Ideally, this would always be enabled. But PID reuse in common platforms such as Docker/Kubernetes can lead to
134+
// a breakdown in lock files being able to detect unsafe concurrent access to the database. Since many (if not most)
135+
// users of this software will be running in such an environment, this is disabled by default.
136+
LittRespectLocks bool
149137

150138
// The rate limit for the number of bytes served by the GetChunks API if the data is in the cache.
151139
// Unit is in megabytes per second.
@@ -424,7 +412,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
424412
LittDBReadCacheSizeBytes: uint64(ctx.GlobalFloat64(flags.LittDBReadCacheSizeGBFlag.Name) * units.GiB),
425413
LittDBReadCacheSizeFraction: ctx.GlobalFloat64(flags.LittDBReadCacheSizeFractionFlag.Name),
426414
LittDBStoragePaths: ctx.GlobalStringSlice(flags.LittDBStoragePathsFlag.Name),
427-
LittUnsafePurgeLocks: ctx.GlobalBool(flags.LittUnsafePurgeLocksFlag.Name),
415+
LittRespectLocks: ctx.GlobalBool(flags.LittRespectLocksFlag.Name),
428416
DownloadPoolSize: ctx.GlobalInt(flags.DownloadPoolSizeFlag.Name),
429417
GetChunksHotCacheReadLimitMB: ctx.GlobalFloat64(flags.GetChunksHotCacheReadLimitMBFlag.Name),
430418
GetChunksHotBurstLimitMB: ctx.GlobalFloat64(flags.GetChunksHotBurstLimitMBFlag.Name),

node/flags/flags.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -451,11 +451,13 @@ var (
451451
Required: false,
452452
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "LITT_DB_STORAGE_PATHS"),
453453
}
454-
LittUnsafePurgeLocksFlag = cli.BoolFlag{
455-
Name: common.PrefixFlag(FlagPrefix, "litt-unsafe-purge-locks"),
456-
Usage: "Unsafe flag to purge locks in LittDB. Use with caution, as it may lead to data loss or corruption.",
454+
LittRespectLocksFlag = cli.BoolFlag{
455+
Name: common.PrefixFlag(FlagPrefix, "litt-respect-locks"),
456+
Usage: "If set, LittDB will refuse to start if it can't acquire locks on the storage paths. " +
457+
"Ideally this would always be enabled, but PID reuse in platforms like Kubernetes/Docker can make " +
458+
"lock files practically impossible to manage.",
457459
Required: false,
458-
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "LITT_UNSAFE_PURGE_LOCKS"),
460+
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "LITT_RESPECT_LOCKS"),
459461
}
460462
DownloadPoolSizeFlag = cli.IntFlag{
461463
Name: common.PrefixFlag(FlagPrefix, "download-pool-size"),
@@ -668,7 +670,7 @@ var optionalFlags = []cli.Flag{
668670
EigenDADirectoryFlag,
669671
BlsOperatorStateRetrieverFlag,
670672
EigenDAServiceManagerFlag,
671-
LittUnsafePurgeLocksFlag,
673+
LittRespectLocksFlag,
672674
StoreChunksBufferTimeoutFlag,
673675
StoreChunksBufferSizeGBFlag,
674676
StoreChunksBufferSizeFractionFlag,

node/grpc/server_v2.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,33 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
182182
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get the operator state: %v", err))
183183
}
184184

185-
blobShards, rawBundles, err := s.node.DownloadBundles(ctx, batch, operatorState, probe)
185+
downloadSizeInBytes, relayRequests, err :=
186+
s.node.DetermineChunkLocations(batch, operatorState, probe)
186187
if err != nil {
187-
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get the operator state: %v", err))
188+
//nolint:wrapcheck
189+
return nil, api.NewErrorInternal(fmt.Sprintf("failed to determine chunk locations: %v", err))
190+
}
191+
192+
// storeChunksSemaphore can be nil during unit tests, since there are a bunch of places where the Node struct
193+
// is instantiated directly without using the constructor.
194+
if s.node.StoreChunksSemaphore != nil {
195+
// So far, we've only downloaded metadata for the blob. Before downloading the actual chunks, make sure there
196+
// is capacity in the store chunks buffer. This is an OOM safety measure.
197+
198+
probe.SetStage("acquire_buffer_capacity")
199+
semaphoreCtx, cancel := context.WithTimeout(ctx, s.node.Config.StoreChunksBufferTimeout)
200+
defer cancel()
201+
err = s.node.StoreChunksSemaphore.Acquire(semaphoreCtx, int64(downloadSizeInBytes))
202+
if err != nil {
203+
return nil, fmt.Errorf("failed to acquire buffer capacity: %w", err)
204+
}
205+
defer s.node.StoreChunksSemaphore.Release(int64(downloadSizeInBytes))
206+
}
207+
208+
blobShards, rawBundles, err := s.node.DownloadChunksFromRelays(ctx, batch, relayRequests, probe)
209+
if err != nil {
210+
//nolint:wrapcheck
211+
return nil, api.NewErrorInternal(fmt.Sprintf("failed to download chunks: %v", err))
188212
}
189213

190214
err = s.validateAndStoreChunks(ctx, batch, blobShards, rawBundles, operatorState, batchHeaderHash, probe)

node/node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ type Node struct {
100100
QuorumCount atomic.Uint32
101101

102102
// Used to limit the maximum amount of memory used to serve StoreChunks() gRPC requests.
103-
storeChunksSemaphore *semaphore.Weighted
103+
StoreChunksSemaphore *semaphore.Weighted
104104
}
105105

106106
// NewNode creates a new Node with the provided config.
@@ -286,7 +286,7 @@ func NewNode(
286286
BLSSigner: blsSigner,
287287
DownloadPool: downloadPool,
288288
ValidationPool: validationPool,
289-
storeChunksSemaphore: storeChunksSemaphore,
289+
StoreChunksSemaphore: storeChunksSemaphore,
290290
}
291291

292292
if !config.EnableV2 {

node/node_v2.go

Lines changed: 48 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -33,54 +33,40 @@ type RawBundle struct {
3333
Bundle []byte
3434
}
3535

36-
func (n *Node) DownloadBundles(
37-
ctx context.Context,
36+
// Determines where to find the chunks we need to download for a given batch. For each chunk in a batch, there will
37+
// be one or more relays that are responsible for serving that chunk. This function determines which relays to contact
38+
// for each chunk, and sorts the requests by relayID to support batching. Additionally, this method also calculates
39+
// the size of the chunk data that will be downloaded, in bytes.
40+
func (n *Node) DetermineChunkLocations(
3841
batch *corev2.Batch,
3942
operatorState *core.OperatorState,
4043
probe *common.SequenceProbe,
41-
) ([]*corev2.BlobShard, []*RawBundle, error) {
42-
43-
probe.SetStage("prepare_to_download")
44-
45-
relayClient, ok := n.RelayClient.Load().(relay.RelayClient)
44+
) (downloadSizeInBytes uint64, relayRequests map[corev2.RelayKey]*relayRequest, err error) {
4645

47-
if !ok || relayClient == nil {
48-
return nil, nil, fmt.Errorf("relay client is not set")
49-
}
46+
probe.SetStage("determine_chunk_locations")
5047

5148
blobVersionParams := n.BlobVersionParams.Load()
5249
if blobVersionParams == nil {
53-
return nil, nil, fmt.Errorf("blob version params is nil")
50+
return 0, nil, fmt.Errorf("blob version params is nil")
5451
}
5552

56-
blobShards := make([]*corev2.BlobShard, len(batch.BlobCertificates))
57-
rawBundles := make([]*RawBundle, len(batch.BlobCertificates))
58-
requests := make(map[corev2.RelayKey]*relayRequest)
59-
60-
// Tally the number of bytes we are about to download.
61-
var downloadSizeInBytes uint64
53+
relayRequests = make(map[corev2.RelayKey]*relayRequest)
6254

6355
for i, cert := range batch.BlobCertificates {
6456
blobKey, err := cert.BlobHeader.BlobKey()
6557
if err != nil {
66-
return nil, nil, fmt.Errorf("failed to get blob key: %v", err)
58+
return 0, nil, fmt.Errorf("failed to get blob key: %w", err)
6759
}
6860

6961
if len(cert.RelayKeys) == 0 {
70-
return nil, nil, fmt.Errorf("no relay keys in the certificate")
71-
}
72-
blobShards[i] = &corev2.BlobShard{
73-
BlobCertificate: cert,
74-
}
75-
rawBundles[i] = &RawBundle{
76-
BlobCertificate: cert,
62+
return 0, nil, fmt.Errorf("no relay keys in the certificate")
7763
}
7864
relayIndex := rand.Intn(len(cert.RelayKeys))
7965
relayKey := cert.RelayKeys[relayIndex]
8066

8167
blobParams, ok := blobVersionParams.Get(cert.BlobHeader.BlobVersion)
8268
if !ok {
83-
return nil, nil, fmt.Errorf("blob version %d not found", cert.BlobHeader.BlobVersion)
69+
return 0, nil, fmt.Errorf("blob version %d not found", cert.BlobHeader.BlobVersion)
8470
}
8571

8672
assgn, err := corev2.GetAssignmentForBlob(operatorState, blobParams, cert.BlobHeader.QuorumNumbers, n.Config.ID)
@@ -91,17 +77,17 @@ func (n *Node) DownloadBundles(
9177

9278
chunkLength, err := blobParams.GetChunkLength(uint32(cert.BlobHeader.BlobCommitments.Length))
9379
if err != nil {
94-
return nil, nil, fmt.Errorf("failed to get chunk length: %w", err)
80+
return 0, nil, fmt.Errorf("failed to get chunk length: %w", err)
9581
}
9682
downloadSizeInBytes += uint64(assgn.NumChunks() * chunkLength)
9783

98-
req, ok := requests[relayKey]
84+
req, ok := relayRequests[relayKey]
9985
if !ok {
10086
req = &relayRequest{
10187
chunkRequests: make([]*relay.ChunkRequestByIndex, 0),
10288
metadata: make([]*requestMetadata, 0),
10389
}
104-
requests[relayKey] = req
90+
relayRequests[relayKey] = req
10591
}
10692
// Chunks from one blob are requested to the same relay
10793
req.chunkRequests = append(req.chunkRequests, &relay.ChunkRequestByIndex{
@@ -115,27 +101,40 @@ func (n *Node) DownloadBundles(
115101

116102
}
117103

118-
// storeChunksSemaphore can be nil during unit tests, since there are a bunch of places where the Node struct
119-
// is instantiated directly without using the constructor.
120-
if n.storeChunksSemaphore != nil {
121-
// So far, we've only downloaded metadata for the blob. Before downloading the actual chunks, make sure there
122-
// is capacity in the store chunks buffer. This is an OOM safety measure.
104+
return downloadSizeInBytes, relayRequests, nil
105+
}
123106

124-
probe.SetStage("acquire_buffer_capacity")
125-
semaphoreCtx, cancel := context.WithTimeout(ctx, n.Config.StoreChunksBufferTimeout)
126-
defer cancel()
127-
err := n.storeChunksSemaphore.Acquire(semaphoreCtx, int64(downloadSizeInBytes))
128-
if err != nil {
129-
return nil, nil, fmt.Errorf("failed to acquire buffer capacity: %w", err)
107+
// This method takes a "download plan" from DetermineChunkLocations() and downloads the chunks from the relays.
108+
// It also deserializes the responses from the relays into BlobShards and RawBundles.
109+
func (n *Node) DownloadChunksFromRelays(
110+
ctx context.Context,
111+
batch *corev2.Batch,
112+
relayRequests map[corev2.RelayKey]*relayRequest,
113+
probe *common.SequenceProbe,
114+
) (blobShards []*corev2.BlobShard, rawBundles []*RawBundle, err error) {
115+
116+
blobShards = make([]*corev2.BlobShard, len(batch.BlobCertificates))
117+
rawBundles = make([]*RawBundle, len(batch.BlobCertificates))
118+
for i, cert := range batch.BlobCertificates {
119+
blobShards[i] = &corev2.BlobShard{
120+
BlobCertificate: cert,
130121
}
122+
rawBundles[i] = &RawBundle{
123+
BlobCertificate: cert,
124+
}
125+
}
126+
127+
relayClient, ok := n.RelayClient.Load().(relay.RelayClient)
128+
if !ok || relayClient == nil {
129+
return nil, nil, fmt.Errorf("relay client is not set")
131130
}
132131

133132
probe.SetStage("download")
134133

135-
bundleChan := make(chan response, len(requests))
136-
for relayKey := range requests {
134+
bundleChan := make(chan response, len(relayRequests))
135+
for relayKey := range relayRequests {
137136
relayKey := relayKey
138-
req := requests[relayKey]
137+
req := relayRequests[relayKey]
139138
n.DownloadPool.Submit(func() {
140139
ctxTimeout, cancel := context.WithTimeout(ctx, n.Config.ChunkDownloadTimeout)
141140
defer cancel()
@@ -157,23 +156,24 @@ func (n *Node) DownloadBundles(
157156
})
158157
}
159158

160-
responses := make([]response, len(requests))
161-
for i := 0; i < len(requests); i++ {
159+
responses := make([]response, len(relayRequests))
160+
for i := 0; i < len(relayRequests); i++ {
162161
responses[i] = <-bundleChan
163162
}
164163

165164
probe.SetStage("deserialize")
166165

167-
for i := 0; i < len(requests); i++ {
166+
for i := 0; i < len(relayRequests); i++ {
168167
resp := responses[i]
169168
if resp.err != nil {
170169
// TODO (cody-littley) this is flaky, and will fail if any relay fails. We should retry failures
171170
return nil, nil, fmt.Errorf("failed to get chunks from relays: %v", resp.err)
172171
}
173172

174173
if len(resp.bundles) != len(resp.metadata) {
175-
return nil, nil, fmt.Errorf("number of bundles and metadata do not match (%d != %d)",
176-
len(resp.bundles), len(resp.metadata))
174+
return nil, nil,
175+
fmt.Errorf("number of bundles and metadata do not match (%d != %d)",
176+
len(resp.bundles), len(resp.metadata))
177177
}
178178

179179
for j, bundle := range resp.bundles {

node/node_v2_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ func TestDownloadBundles(t *testing.T) {
4545
})
4646
state, err := c.node.ChainState.GetOperatorStateByOperator(ctx, uint(10), op0)
4747
require.NoError(t, err)
48-
blobShards, rawBundles, err := c.node.DownloadBundles(ctx, batch, state, nil)
48+
49+
_, relayRequests, err := c.node.DetermineChunkLocations(batch, state, nil)
50+
require.NoError(t, err)
51+
52+
blobShards, rawBundles, err := c.node.DownloadChunksFromRelays(ctx, batch, relayRequests, nil)
4953
require.NoError(t, err)
5054
require.Len(t, blobShards, 3)
5155
require.Equal(t, blobCerts[0], blobShards[0].BlobCertificate)
@@ -82,7 +86,11 @@ func TestDownloadBundlesFail(t *testing.T) {
8286
})
8387
state, err := c.node.ChainState.GetOperatorState(ctx, uint(10), []core.QuorumID{0, 1, 2})
8488
require.NoError(t, err)
85-
blobShards, rawBundles, err := c.node.DownloadBundles(ctx, batch, state, nil)
89+
90+
_, relayRequests, err := c.node.DetermineChunkLocations(batch, state, nil)
91+
require.NoError(t, err)
92+
93+
blobShards, rawBundles, err := c.node.DownloadChunksFromRelays(ctx, batch, relayRequests, nil)
8694
require.Error(t, err)
8795
require.Nil(t, blobShards)
8896
require.Nil(t, rawBundles)
@@ -117,7 +125,11 @@ func TestDownloadBundlesOnlyParticipatingQuorums(t *testing.T) {
117125

118126
state, err := c.node.ChainState.GetOperatorState(ctx, uint(10), []core.QuorumID{0, 1, 2})
119127
require.NoError(t, err)
120-
blobShards, rawBundles, err := c.node.DownloadBundles(ctx, batch, state, nil)
128+
129+
_, relayRequests, err := c.node.DetermineChunkLocations(batch, state, nil)
130+
require.NoError(t, err)
131+
132+
blobShards, rawBundles, err := c.node.DownloadChunksFromRelays(ctx, batch, relayRequests, nil)
121133
require.NoError(t, err)
122134
require.Len(t, blobShards, 3)
123135
require.Equal(t, blobCerts[0], blobShards[0].BlobCertificate)

node/validator_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func NewValidatorStore(
113113
littConfig.MetricsNamespace = littDBMetricsPrefix
114114
littConfig.Logger = logger
115115
littConfig.DoubleWriteProtection = config.LittDBDoubleWriteProtection
116-
littConfig.PurgeLocks = config.LittUnsafePurgeLocks
116+
littConfig.PurgeLocks = !config.LittRespectLocks
117117
if err != nil {
118118
return nil, fmt.Errorf("failed to create new litt config: %w", err)
119119
}

0 commit comments

Comments
 (0)