Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ func RunController(ctx *cli.Context) error {
}

c := context.Background()

err = controller.RecoverState(c, blobMetadataStore, logger)
if err != nil {
return fmt.Errorf("failed to recover state: %v", err)
}

err = encodingManager.Start(c)
if err != nil {
return fmt.Errorf("failed to start encoding manager: %v", err)
Expand Down
46 changes: 46 additions & 0 deletions disperser/controller/recover_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package controller

import (
"context"
"fmt"

v2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigensdk-go/logging"
)

// RecoverState checks for blobs in the GatheringSignatures state and updates their status to Failed.
func RecoverState(
ctx context.Context,
blobStore *blobstore.BlobMetadataStore,
logger logging.Logger,
) error {
logger.Info("recovering state...")

metadata, err := blobStore.GetBlobMetadataByStatus(ctx, v2.GatheringSignatures, 0)
if err != nil {
return fmt.Errorf("failed to get blobs in gathering signatures state: %w", err)
}

if len(metadata) == 0 {
logger.Info("no blobs in gathering signatures state")
return nil
}

logger.Info("found blobs in gathering signatures state", "count", len(metadata))

for _, blob := range metadata {
key, err := blob.BlobHeader.BlobKey()
if err != nil {
logger.Error("failed to get blob key", "err", err)
continue
}

logger.Debug("updating blob status", "key", key, "status", v2.Failed)
if err := blobStore.UpdateBlobStatus(ctx, key, v2.Failed); err != nil {
logger.Error("failed to update blob status", "blobKey", key.Hex(), "err", err)
}
}
logger.Info("recovered state successfully")
return nil
}
47 changes: 47 additions & 0 deletions disperser/controller/recover_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package controller_test

import (
"context"
"testing"
"time"

"github.com/Layr-Labs/eigenda/common/testutils"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
v2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/Layr-Labs/eigenda/disperser/controller"
"github.com/stretchr/testify/require"
)

const numObjects = 12

func TestRecoverState(t *testing.T) {
logger := testutils.GetLogger()
ctx := context.Background()
keys := make([]corev2.BlobKey, numObjects)
metadatas := make([]*v2.BlobMetadata, numObjects)
for i := 0; i < numObjects; i++ {
key, header := newBlob(t, []uint8{0, 1})
keys[i] = key
now := time.Now()
metadatas[i] = &v2.BlobMetadata{
BlobHeader: header,
BlobStatus: v2.GatheringSignatures,
Expiry: uint64(now.Add(time.Hour).Unix()),
NumRetries: 0,
UpdatedAt: uint64(now.UnixNano()) - uint64(i),
}
err := blobMetadataStore.PutBlobMetadata(ctx, metadatas[i])
require.NoError(t, err)
}
err := controller.RecoverState(ctx, blobMetadataStore, logger)
require.NoError(t, err)

// check that all blobs are in Failed state
for i := 0; i < numObjects; i++ {
metadata, err := blobMetadataStore.GetBlobMetadata(ctx, keys[i])
require.NoError(t, err)
require.Equal(t, v2.Failed, metadata.BlobStatus)
}

deleteBlobs(t, blobMetadataStore, keys, nil)
}
Loading