Skip to content

Commit 1e0b820

Browse files
committed
feat: on-demand check
1 parent c192f20 commit 1e0b820

File tree

4 files changed

+25
-28
lines changed

4 files changed

+25
-28
lines changed

disperser/apiserver/server_v2.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
157157
if err := s.RefreshOnchainState(ctx); err != nil {
158158
return fmt.Errorf("failed to refresh onchain quorum state: %w", err)
159159
}
160+
s.logger.Debug("Refreshed onchain quorum state", "onchainState", s.onchainState.Load())
160161

161162
go func() {
162163
ticker := time.NewTicker(s.onchainStateRefreshInterval)
@@ -165,6 +166,7 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
165166
for {
166167
select {
167168
case <-ticker.C:
169+
s.logger.Debug("Refreshing onchain quorum state", "onchainState", s.onchainState.Load())
168170
if err := s.RefreshOnchainState(ctx); err != nil {
169171
s.logger.Error("failed to refresh onchain quorum state", "err", err)
170172
}
@@ -258,6 +260,8 @@ func (s *DispersalServerV2) RefreshOnchainState(ctx context.Context) error {
258260
if err != nil {
259261
return fmt.Errorf("failed to get blob version parameters: %w", err)
260262
}
263+
264+
s.logger.Debug("Refreshed onchain quorum state", "quorumCount", quorumCount, "requiredQuorums", requiredQuorums, "blobParams", blobParams, "storeDurationBlocks", storeDurationBlocks, "blockStaleMeasure", blockStaleMeasure)
261265
onchainState := &OnchainState{
262266
QuorumCount: quorumCount,
263267
RequiredQuorums: requiredQuorums,
@@ -267,6 +271,8 @@ func (s *DispersalServerV2) RefreshOnchainState(ctx context.Context) error {
267271

268272
s.onchainState.Store(onchainState)
269273

274+
s.logger.Debug("stored onchain quorum state", "onchainState", onchainState)
275+
270276
return nil
271277
}
272278

node/auth/authenticator.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ type requestAuthenticator struct {
4747
// reloaded from the chain state in case the key has been changed.
4848
keyTimeoutDuration time.Duration
4949

50-
// disperserIDFilter is a function that returns true if the given disperser ID is valid.
51-
disperserIDFilter func(uint32) bool
50+
//TODO: add blacklistedDispersers is a set of disperser IDs that are blacklisted.
5251
}
5352

5453
// NewRequestAuthenticator creates a new RequestAuthenticator.
@@ -57,7 +56,6 @@ func NewRequestAuthenticator(
5756
chainReader core.Reader,
5857
keyCacheSize int,
5958
keyTimeoutDuration time.Duration,
60-
disperserIDFilter func(uint32) bool,
6159
now time.Time) (RequestAuthenticator, error) {
6260

6361
keyCache, err := lru.New[uint32, *keyWithTimeout](keyCacheSize)
@@ -69,7 +67,6 @@ func NewRequestAuthenticator(
6967
chainReader: chainReader,
7068
keyCache: keyCache,
7169
keyTimeoutDuration: keyTimeoutDuration,
72-
disperserIDFilter: disperserIDFilter,
7370
}
7471

7572
err = authenticator.preloadCache(ctx, now)
@@ -81,7 +78,7 @@ func NewRequestAuthenticator(
8178
}
8279

8380
func (a *requestAuthenticator) preloadCache(ctx context.Context, now time.Time) error {
84-
// this will need to be updated for decentralized dispersers
81+
//TODO: preload disperser blacklist from storage
8582
_, err := a.getDisperserKey(ctx, now, api.EigenLabsDisperserID)
8683
if err != nil {
8784
return fmt.Errorf("failed to get operator key: %w", err)
@@ -114,9 +111,7 @@ func (a *requestAuthenticator) getDisperserKey(
114111
now time.Time,
115112
disperserID uint32) (*gethcommon.Address, error) {
116113

117-
if !a.disperserIDFilter(disperserID) {
118-
return nil, fmt.Errorf("invalid disperser ID: %d", disperserID)
119-
}
114+
//TODO: Reject blacklisted dispersers
120115

121116
key, ok := a.keyCache.Get(disperserID)
122117
if ok {

node/auth/authenticator_test.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"crypto/ecdsa"
66
"errors"
7-
"sync/atomic"
87
"testing"
98
"time"
109

@@ -32,7 +31,6 @@ func TestValidRequest(t *testing.T) {
3231
&chainReader,
3332
10,
3433
time.Minute,
35-
func(uint32) bool { return true },
3634
start)
3735
require.NoError(t, err)
3836

@@ -66,7 +64,6 @@ func TestInvalidRequestWrongHash(t *testing.T) {
6664
&chainReader,
6765
10,
6866
time.Minute,
69-
func(uint32) bool { return true },
7067
start)
7168
require.NoError(t, err)
7269

@@ -100,7 +97,6 @@ func TestInvalidRequestWrongKey(t *testing.T) {
10097
&chainReader,
10198
10,
10299
time.Minute,
103-
func(uint32) bool { return true },
104100
start)
105101
require.NoError(t, err)
106102

@@ -126,7 +122,7 @@ func TestInvalidRequestInvalidDisperserID(t *testing.T) {
126122
require.NoError(t, err)
127123
disperserAddress0 := crypto.PubkeyToAddress(*publicKey0)
128124

129-
// This disperser will be loaded on chain (simulated), but will fail the valid disperser ID filter.
125+
// Non EigenLabs disperser is registered on-chain
130126
publicKey1, privateKey1, err := rand.ECDSA()
131127
require.NoError(t, err)
132128
disperserAddress1 := crypto.PubkeyToAddress(*publicKey1)
@@ -137,20 +133,13 @@ func TestInvalidRequestInvalidDisperserID(t *testing.T) {
137133
chainReader.Mock.On("GetDisperserAddress", uint32(1234)).Return(
138134
nil, errors.New("disperser not found"))
139135

140-
filterCallCount := atomic.Uint32{}
141-
142136
authenticator, err := NewRequestAuthenticator(
143137
context.Background(),
144138
&chainReader,
145139
10,
146140
time.Minute,
147-
func(id uint32) bool {
148-
filterCallCount.Add(1)
149-
return id != uint32(1)
150-
},
151141
start)
152142
require.NoError(t, err)
153-
require.Equal(t, uint32(1), filterCallCount.Load())
154143

155144
request := RandomStoreChunksRequest(rand)
156145
request.DisperserID = 0
@@ -162,14 +151,16 @@ func TestInvalidRequestInvalidDisperserID(t *testing.T) {
162151
expectedHash, err := hashing.HashStoreChunksRequest(request)
163152
require.NoError(t, err)
164153
require.Equal(t, expectedHash, hash)
165-
require.Equal(t, uint32(2), filterCallCount.Load())
166154

167155
request.DisperserID = 1
168156
signature, err = SignStoreChunksRequest(privateKey1, request)
169157
require.NoError(t, err)
170158
request.Signature = signature
171-
_, err = authenticator.AuthenticateStoreChunksRequest(context.Background(), request, start)
172-
require.Error(t, err)
159+
hash, err = authenticator.AuthenticateStoreChunksRequest(context.Background(), request, start)
160+
require.NoError(t, err)
161+
expectedHash, err = hashing.HashStoreChunksRequest(request)
162+
require.NoError(t, err)
163+
require.Equal(t, expectedHash, hash)
173164

174165
request.DisperserID = 1234
175166
signature, err = SignStoreChunksRequest(privateKey1, request)
@@ -196,7 +187,6 @@ func TestKeyExpiry(t *testing.T) {
196187
&chainReader,
197188
10,
198189
time.Minute,
199-
func(uint32) bool { return true },
200190
start)
201191
require.NoError(t, err)
202192

@@ -260,7 +250,6 @@ func TestKeyCacheSize(t *testing.T) {
260250
&chainReader,
261251
cacheSize,
262252
time.Minute,
263-
func(uint32) bool { return true },
264253
start)
265254
require.NoError(t, err)
266255

node/grpc/server_v2.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/Layr-Labs/eigenda/common/kvstore"
1515
"github.com/Layr-Labs/eigenda/common/replay"
1616
"github.com/Layr-Labs/eigenda/core"
17+
"github.com/Layr-Labs/eigenda/core/meterer"
1718
corev2 "github.com/Layr-Labs/eigenda/core/v2"
1819
"github.com/Layr-Labs/eigenda/node"
1920
"github.com/Layr-Labs/eigenda/node/auth"
@@ -58,9 +59,6 @@ func NewServerV2(
5859
reader,
5960
config.DispersalAuthenticationKeyCacheSize,
6061
config.DisperserKeyTimeout,
61-
func(id uint32) bool {
62-
return id == api.EigenLabsDisperserID
63-
},
6462
time.Now())
6563
if err != nil {
6664
return nil, fmt.Errorf("failed to create authenticator: %w", err)
@@ -140,6 +138,15 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
140138
if err != nil {
141139
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to verify request: %v", err))
142140
}
141+
// TODO: move to blob authenticator later
142+
for _, blob := range batch.BlobCertificates {
143+
if meterer.IsOnDemandPayment(&blob.BlobHeader.PaymentMetadata) {
144+
// Batch contains on-demand payments, so the chunk must be from EigenLabsDisperser
145+
if in.DisperserID != api.EigenLabsDisperserID {
146+
return nil, api.NewErrorInvalidArg("on-demand payments are only allowed for EigenLabsDisperser")
147+
}
148+
}
149+
}
143150
}
144151

145152
probe.SetStage("get_operator_state")

0 commit comments

Comments
 (0)