Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/proxy/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ linters:
- unused # checks for unused constants, variables, functions and types
## disabled by default
- asasalint # checks for pass []any as any in variadic func(...any)
- asciicheck # checks that your code does not contain non-ASCII identifiers
- asciicheck # checks that your code does not contain non-ASCII identifiers1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray keystroke

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- bidichk # checks for dangerous unicode character sequences
- bodyclose # checks whether HTTP response body is closed successfully
- cyclop # checks function and package cyclomatic complexity
Expand Down Expand Up @@ -195,7 +195,7 @@ linters:
- unparam

# Allow certain patterns to be ignored by lll (long lines)
- source: '".{120,}"' # Ignores double-quoted strings longer than 120 chars
- source: '".{100,}"' # Ignores double-quoted strings longer than 120 chars
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in favor of this change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know... but otherwise there was a bug which the celo engineer was running into and he didn't find another way to solve it. See https://eigenlabs.slack.com/archives/C07614NNXDH/p1751913947976629?thread_ts=1750965737.272329&cid=C07614NNXDH

Copy link
Collaborator Author

@samlaf samlaf Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed comment and added some "explanation" in 2c50038
Tbf I don't understand the bug so not super sure... but its not super important to me we can keep tweaking this in another PR I think, given that this blocks the 2.1 release we're trying to achieve and I want this to be included.

linters: [lll]
- source: "// https?://" # This pattern matches comments containing URLs
linters: [lll]
Expand Down
1 change: 1 addition & 0 deletions api/proxy/docs/help_out.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,5 @@ This check is optional and will be skipped when set to 0. (default: 0) [$EIGENDA
--storage.concurrent-write-routines value Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. (default: 0) [$EIGENDA_PROXY_STORAGE_CONCURRENT_WRITE_THREADS]
--storage.dispersal-backend value Target EigenDA backend version for blob dispersal (e.g. V1 or V2). (default: "V1") [$EIGENDA_PROXY_STORAGE_DISPERSAL_BACKEND]
--storage.fallback-targets value [ --storage.fallback-targets value ] List of read fallback targets to rollover to if cert can't be read from EigenDA. [$EIGENDA_PROXY_STORAGE_FALLBACK_TARGETS]
--storage.write-on-cache-miss While doing a GET, write to the secondary storage if the cert/blob is not found in the cache but is found in EigenDA. (default: false) [$EIGENDA_PROXY_STORAGE_WRITE_ON_CACHE_MISS]

2 changes: 1 addition & 1 deletion api/proxy/store/builder/storage_manager_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func BuildStoreManager(

fallbacks := buildSecondaries(config.StoreConfig.FallbackTargets, s3Store, redisStore)
caches := buildSecondaries(config.StoreConfig.CacheTargets, s3Store, redisStore)
secondary := secondary.NewSecondaryManager(log, metrics, caches, fallbacks)
secondary := secondary.NewSecondaryManager(log, metrics, caches, fallbacks, config.StoreConfig.WriteOnCacheMiss)

if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled
log.Info("Starting secondary write loop(s)", "count", config.StoreConfig.AsyncPutWorkers)
Expand Down
9 changes: 9 additions & 0 deletions api/proxy/store/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
FallbackTargetsFlagName = withFlagPrefix("fallback-targets")
CacheTargetsFlagName = withFlagPrefix("cache-targets")
ConcurrentWriteThreads = withFlagPrefix("concurrent-write-routines")
WriteOnCacheMissFlagName = withFlagPrefix("write-on-cache-miss")
)

func withFlagPrefix(s string) string {
Expand Down Expand Up @@ -65,6 +66,13 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
EnvVars: withEnvPrefix(envPrefix, "CONCURRENT_WRITE_THREADS"),
Category: category,
},
&cli.BoolFlag{
Name: WriteOnCacheMissFlagName,
Usage: "While doing a GET, write to the secondary storage if the cert/blob is not found in the cache but is found in EigenDA.",
Value: false,
EnvVars: withEnvPrefix(envPrefix, "WRITE_ON_CACHE_MISS"),
Category: category,
},
}
}

Expand Down Expand Up @@ -94,5 +102,6 @@ func ReadConfig(ctx *cli.Context) (Config, error) {
AsyncPutWorkers: ctx.Int(ConcurrentWriteThreads),
FallbackTargets: ctx.StringSlice(FallbackTargetsFlagName),
CacheTargets: ctx.StringSlice(CacheTargetsFlagName),
WriteOnCacheMiss: ctx.Bool(WriteOnCacheMissFlagName),
}, nil
}
2 changes: 2 additions & 0 deletions api/proxy/store/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type Config struct {
AsyncPutWorkers int
FallbackTargets []string
CacheTargets []string

WriteOnCacheMiss bool
}

// checkTargets ... verifies that a backend target slice is constructed correctly
Expand Down
23 changes: 16 additions & 7 deletions api/proxy/store/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (m *Manager) Get(ctx context.Context,
// 2 - read blob from EigenDA
data, err := m.getFromCorrectEigenDABackend(ctx, versionedCert, verifyOpts)
if err == nil {
if m.secondary.WriteOnCacheMissEnabled() {
m.backupToSecondary(ctx, versionedCert.SerializedCert, data)
}

return data, nil
}

Expand Down Expand Up @@ -171,23 +175,28 @@ func (m *Manager) Put(ctx context.Context, cm commitments.CommitmentMode, value
}

// 2 - Put blob into secondary storage backends
if m.secondary.Enabled() &&
m.secondary.AsyncWriteEntry() { // publish put notification to secondary's subscription on PutNotify topic
if m.secondary.Enabled() {
m.backupToSecondary(ctx, commit, value)
}

return commit, nil
}

func (m *Manager) backupToSecondary(ctx context.Context, commitment []byte, value []byte) {
if m.secondary.AsyncWriteEntry() { // publish put notification to secondary's subscription on PutNotify topic
m.log.Debug("Publishing data to async secondary stores")
m.secondary.Topic() <- secondary.PutNotify{
Commitment: commit,
Commitment: commitment,
Value: value,
}
// secondary is available only for synchronous writes
} else if m.secondary.Enabled() && !m.secondary.AsyncWriteEntry() {
} else {
m.log.Debug("Publishing data to single threaded secondary stores")
err := m.secondary.HandleRedundantWrites(ctx, commit, value)
err := m.secondary.HandleRedundantWrites(ctx, commitment, value)
if err != nil {
m.log.Error("Secondary insertions failed", "error", err.Error())
}
}

return commit, nil
}

// getVerifyMethod returns the correct verify method based on commitment type
Expand Down
18 changes: 13 additions & 5 deletions api/proxy/store/secondary/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ISecondary interface {
verifyOpts common.CertVerificationOpts,
) ([]byte, error)
WriteSubscriptionLoop(ctx context.Context)
WriteOnCacheMissEnabled() bool
}

// PutNotify ... notification received by primary manager to perform insertion across
Expand All @@ -57,6 +58,7 @@ type SecondaryManager struct {
verifyLock sync.RWMutex
topic chan PutNotify
concurrentWrites bool
writeOnCacheMiss bool
}

// NewSecondaryManager ... creates a new secondary storage manager
Expand All @@ -65,16 +67,18 @@ func NewSecondaryManager(
m metrics.Metricer,
caches []common.SecondaryStore,
fallbacks []common.SecondaryStore,
writeOnCacheMiss bool,
) ISecondary {
return &SecondaryManager{
topic: make(
chan PutNotify,
), // channel is un-buffered which dispersing consumption across routines helps alleviate
log: log,
m: m,
caches: caches,
fallbacks: fallbacks,
verifyLock: sync.RWMutex{},
log: log,
m: m,
caches: caches,
fallbacks: fallbacks,
verifyLock: sync.RWMutex{},
writeOnCacheMiss: writeOnCacheMiss,
}
}

Expand All @@ -95,6 +99,10 @@ func (sm *SecondaryManager) FallbackEnabled() bool {
return len(sm.fallbacks) > 0
}

func (sm *SecondaryManager) WriteOnCacheMissEnabled() bool {
return sm.CachingEnabled() && sm.writeOnCacheMiss
}

// HandleRedundantWrites ... writes to both sets of backends (i.e, fallback, cache)
// and returns an error if NONE of them succeed
func (sm *SecondaryManager) HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error {
Expand Down
53 changes: 53 additions & 0 deletions api/proxy/test/e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,59 @@ func testProxyReadFallback(t *testing.T, dispersalBackend common.EigenDABackend)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.StandardCommitmentMode)
}

func TestProxyWriteCacheOnMissV1(t *testing.T) {
testProxyWriteCacheOnMiss(t, common.V1EigenDABackend)
}

func TestProxyWriteCacheOnMissV2(t *testing.T) {
testProxyWriteCacheOnMiss(t, common.V2EigenDABackend)
}

func testProxyWriteCacheOnMiss(t *testing.T, dispersalBackend common.EigenDABackend) {
t.Parallel()

testCfg := testutils.NewTestConfig(testutils.GetBackend(), dispersalBackend, nil)
testCfg.UseS3Caching = true
testCfg.WriteOnCacheMiss = true

tsConfig := testutils.BuildTestSuiteConfig(testCfg)
ts, kill := testutils.CreateTestSuite(tsConfig)
defer kill()

cfg := &standard_client.Config{
URL: ts.Address(),
}
daClient := standard_client.New(cfg)
expectedBlob := testutils.RandBytes(1_000_000)
t.Log("Setting input data on proxy server...")
blobInfo, err := daClient.SetData(ts.Ctx, expectedBlob)
require.NoError(t, err)

_, err = daClient.GetData(ts.Ctx, blobInfo)
require.NoError(t, err)

exists, err := testutils.ExistsBlobInfotInBucket(tsConfig.StoreBuilderConfig.S3Config.Bucket, blobInfo)
require.NoError(t, err)
require.True(t, exists)

t.Log("Erase blob from the cache...")
err = testutils.RemoveBlobInfoFromBucket(tsConfig.StoreBuilderConfig.S3Config.Bucket, blobInfo)
require.NoError(t, err)
exists, err = testutils.ExistsBlobInfotInBucket(tsConfig.StoreBuilderConfig.S3Config.Bucket, blobInfo)
require.NoError(t, err)
require.False(t, exists)

// Blob created in disperser, removed from S3
t.Log("Getting input data from proxy server...")
actualBlob, err := daClient.GetData(ts.Ctx, blobInfo)
require.NoError(t, err)
require.Equal(t, expectedBlob, actualBlob)

exists, err = testutils.ExistsBlobInfotInBucket(tsConfig.StoreBuilderConfig.S3Config.Bucket, blobInfo)
require.NoError(t, err)
require.True(t, exists)
}

func TestProxyMemConfigClientCanGetAndPatchV1(t *testing.T) {
testProxyMemConfigClientCanGetAndPatch(t, common.V1EigenDABackend)
}
Expand Down
15 changes: 3 additions & 12 deletions api/proxy/test/testutils/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"golang.org/x/exp/rand"

miniotc "github.com/testcontainers/testcontainers-go/modules/minio"
redistc "github.com/testcontainers/testcontainers-go/modules/redis"
Expand Down Expand Up @@ -172,6 +171,7 @@ type TestConfig struct {
Expiration time.Duration
MaxBlobLength string
WriteThreadCount int
WriteOnCacheMiss bool
// at most one of the below options should be true
UseKeccak256ModeS3 bool
UseS3Caching bool
Expand Down Expand Up @@ -205,6 +205,7 @@ func NewTestConfig(
UseRedisCaching: false,
UseS3Fallback: false,
WriteThreadCount: 0,
WriteOnCacheMiss: false,
}
}

Expand Down Expand Up @@ -300,6 +301,7 @@ func BuildTestSuiteConfig(testCfg TestConfig) config.AppConfig {
AsyncPutWorkers: testCfg.WriteThreadCount,
BackendsToEnable: testCfg.BackendsToEnable,
DispersalBackend: testCfg.DispersalBackend,
WriteOnCacheMiss: testCfg.WriteOnCacheMiss,
},
ClientConfigV1: common.ClientConfigV1{
EdaClientCfg: clients.EigenDAClientConfig{
Expand Down Expand Up @@ -427,14 +429,3 @@ func createS3Bucket(bucketName string) {
log.Info(fmt.Sprintf("Successfully created %s\n", bucketName))
}
}
func RandStr(n int) string {
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz")
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
func RandBytes(n int) []byte {
return []byte(RandStr(n))
}
84 changes: 84 additions & 0 deletions api/proxy/test/testutils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package testutils

import (
"context"
"encoding/hex"
"fmt"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"golang.org/x/exp/rand"
)

func RandStr(n int) string {
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz")
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}

func RandBytes(n int) []byte {
return []byte(RandStr(n))
}

// Panics if the bucket does not exist
func RemoveBlobInfoFromBucket(bucketName string, blobInfo []byte) error {
// Initialize minio client object.
endpoint := minioEndpoint
accessKeyID := minioAdmin
secretAccessKey := minioAdmin
useSSL := false
minioClient, err := minio.New(
endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: useSSL,
})
// Panic, the bucket should already exist
if err != nil {
panic(err)
}
key := crypto.Keccak256(blobInfo[1:])
objectName := hex.EncodeToString(key)
ctx := context.Background()
err = minioClient.RemoveObject(ctx, bucketName, objectName, minio.RemoveObjectOptions{})
if err != nil {
return err
}
log.Info(fmt.Sprintf("Successfully removed %s from %s\n", objectName, bucketName))

return nil
}

// Panics if the bucket does not exist
func ExistsBlobInfotInBucket(bucketName string, blobInfo []byte) (bool, error) {
// Initialize minio client object.
endpoint := minioEndpoint
accessKeyID := minioAdmin
secretAccessKey := minioAdmin
useSSL := false
minioClient, err := minio.New(
endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: useSSL,
})
// Panic, the bucket should already exist
if err != nil {
panic(err)
}
key := crypto.Keccak256(blobInfo[1:])
objectName := hex.EncodeToString(key)
ctx := context.Background()
_, err = minioClient.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
if err != nil {
errResponse := minio.ToErrorResponse(err)
if errResponse.Code == "NoSuchKey" {
return false, nil
}
return false, err
}
return true, nil
}
Loading