Skip to content

Commit 3513a4e

Browse files
authored
feat: distributed retrieval operator socket (#1403)
1 parent 64adb4c commit 3513a4e

24 files changed

+197
-125
lines changed

api/clients/mock/node_client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func NewNodeClient() *MockNodeClient {
1919
return &MockNodeClient{}
2020
}
2121

22-
func (c *MockNodeClient) GetBlobHeader(ctx context.Context, socket string, batchHeaderHash [32]byte, blobIndex uint32) (*core.BlobHeader, *merkletree.Proof, error) {
22+
func (c *MockNodeClient) GetBlobHeader(ctx context.Context, socket core.OperatorSocket, batchHeaderHash [32]byte, blobIndex uint32) (*core.BlobHeader, *merkletree.Proof, error) {
2323
args := c.Called(socket, batchHeaderHash, blobIndex)
2424
var hashes [][]byte
2525
if args.Get(1) != nil {
@@ -46,7 +46,7 @@ func (c *MockNodeClient) GetBlobHeader(ctx context.Context, socket string, batch
4646
func (c *MockNodeClient) GetChunks(
4747
ctx context.Context,
4848
opID core.OperatorID,
49-
opInfo *core.IndexedOperatorInfo,
49+
opInfo *core.OperatorInfo,
5050
batchHeaderHash [32]byte,
5151
blobIndex uint32,
5252
quorumID core.QuorumID,

api/clients/node_client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ type RetrievedChunks struct {
2020
}
2121

2222
type NodeClient interface {
23-
GetBlobHeader(ctx context.Context, socket string, batchHeaderHash [32]byte, blobIndex uint32) (*core.BlobHeader, *merkletree.Proof, error)
24-
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.IndexedOperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
23+
GetBlobHeader(ctx context.Context, socket core.OperatorSocket, batchHeaderHash [32]byte, blobIndex uint32) (*core.BlobHeader, *merkletree.Proof, error)
24+
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.OperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
2525
}
2626

2727
type client struct {
@@ -36,12 +36,12 @@ func NewNodeClient(timeout time.Duration) NodeClient {
3636

3737
func (c client) GetBlobHeader(
3838
ctx context.Context,
39-
socket string,
39+
socket core.OperatorSocket,
4040
batchHeaderHash [32]byte,
4141
blobIndex uint32,
4242
) (*core.BlobHeader, *merkletree.Proof, error) {
4343
conn, err := grpc.NewClient(
44-
core.OperatorSocket(socket).GetV1RetrievalSocket(),
44+
socket.GetV1RetrievalSocket(),
4545
grpc.WithTransportCredentials(insecure.NewCredentials()),
4646
)
4747
if err != nil {
@@ -79,7 +79,7 @@ func (c client) GetBlobHeader(
7979
func (c client) GetChunks(
8080
ctx context.Context,
8181
opID core.OperatorID,
82-
opInfo *core.IndexedOperatorInfo,
82+
opInfo *core.OperatorInfo,
8383
batchHeaderHash [32]byte,
8484
blobIndex uint32,
8585
quorumID core.QuorumID,

api/clients/retrieval_client.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type BlobChunks struct {
5353

5454
type retrievalClient struct {
5555
logger logging.Logger
56-
indexedChainState core.IndexedChainState
56+
chainState core.ChainState
5757
assignmentCoordinator core.AssignmentCoordinator
5858
nodeClient NodeClient
5959
verifier encoding.Verifier
@@ -63,15 +63,15 @@ type retrievalClient struct {
6363
// NewRetrievalClient creates a new retrieval client.
6464
func NewRetrievalClient(
6565
logger logging.Logger,
66-
chainState core.IndexedChainState,
66+
chainState core.ChainState,
6767
assignmentCoordinator core.AssignmentCoordinator,
6868
nodeClient NodeClient,
6969
verifier encoding.Verifier,
7070
numConnections int) (RetrievalClient, error) {
7171

7272
return &retrievalClient{
7373
logger: logger.With("component", "RetrievalClient"),
74-
indexedChainState: chainState,
74+
chainState: chainState,
7575
assignmentCoordinator: assignmentCoordinator,
7676
nodeClient: nodeClient,
7777
verifier: verifier,
@@ -104,11 +104,12 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
104104
batchRoot [32]byte,
105105
quorumID core.QuorumID) (*BlobChunks, error) {
106106

107-
indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
107+
operatorState, err := r.chainState.GetOperatorStateWithSocket(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
108108
if err != nil {
109+
r.logger.Error("failed to get operator state", "err", err)
109110
return nil, err
110111
}
111-
operators, ok := indexedOperatorState.Operators[quorumID]
112+
operators, ok := operatorState.Operators[quorumID]
112113
if !ok {
113114
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
114115
}
@@ -118,7 +119,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
118119
var proof *merkletree.Proof
119120
var proofVerified bool
120121
for opID := range operators {
121-
opInfo := indexedOperatorState.IndexedOperators[opID]
122+
opInfo := operators[opID]
122123
blobHeader, proof, err = r.nodeClient.GetBlobHeader(ctx, opInfo.Socket, batchHeaderHash, blobIndex)
123124
if err != nil {
124125
// try another operator
@@ -172,7 +173,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
172173
return nil, err
173174
}
174175

175-
assignments, info, err := r.assignmentCoordinator.GetAssignments(indexedOperatorState.OperatorState, blobHeader.Length, quorumHeader)
176+
assignments, info, err := r.assignmentCoordinator.GetAssignments(operatorState, blobHeader.Length, quorumHeader)
176177
if err != nil {
177178
return nil, errors.New("failed to get assignments")
178179
}
@@ -181,8 +182,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
181182
chunksChan := make(chan RetrievedChunks, len(operators))
182183
pool := workerpool.New(r.numConnections)
183184
for opID := range operators {
184-
opID := opID
185-
opInfo := indexedOperatorState.IndexedOperators[opID]
185+
opInfo := operators[opID]
186186
pool.Submit(func() {
187187
r.nodeClient.GetChunks(ctx, opID, opInfo, batchHeaderHash, blobIndex, quorumID, chunksChan)
188188
})

api/clients/retrieval_client_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,7 @@ func setup(t *testing.T) {
100100
indexer = &indexermock.MockIndexer{}
101101
indexer.On("Index").Return(nil).Once()
102102

103-
ics, err := coreindexer.NewIndexedChainState(chainState, indexer)
104-
if err != nil {
105-
panic("failed to create a new indexed chain state")
106-
}
107-
108-
retrievalClient, err = clients.NewRetrievalClient(logger, ics, coordinator, nodeClient, v, 2)
103+
retrievalClient, err = clients.NewRetrievalClient(logger, chainState, coordinator, nodeClient, v, 2)
109104
if err != nil {
110105
panic("failed to create a new retrieval client")
111106
}

api/clients/v2/retrieval_client.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ type RetrievalClient interface {
3434
}
3535

3636
type retrievalClient struct {
37-
logger logging.Logger
38-
ethClient core.Reader
39-
indexedChainState core.IndexedChainState
40-
verifier encoding.Verifier
41-
numConnections int
37+
logger logging.Logger
38+
ethClient core.Reader
39+
chainState core.ChainState
40+
verifier encoding.Verifier
41+
numConnections int
4242
}
4343

4444
var _ RetrievalClient = &retrievalClient{}
@@ -47,16 +47,16 @@ var _ RetrievalClient = &retrievalClient{}
4747
func NewRetrievalClient(
4848
logger logging.Logger,
4949
ethClient core.Reader,
50-
chainState core.IndexedChainState,
50+
chainState core.ChainState,
5151
verifier encoding.Verifier,
5252
numConnections int,
5353
) RetrievalClient {
5454
return &retrievalClient{
55-
logger: logger.With("component", "RetrievalClient"),
56-
ethClient: ethClient,
57-
indexedChainState: chainState,
58-
verifier: verifier,
59-
numConnections: numConnections,
55+
logger: logger.With("component", "RetrievalClient"),
56+
ethClient: ethClient,
57+
chainState: chainState,
58+
verifier: verifier,
59+
numConnections: numConnections,
6060
}
6161
}
6262

@@ -75,11 +75,11 @@ func (r *retrievalClient) GetBlob(
7575
return nil, err
7676
}
7777

78-
indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
78+
operatorState, err := r.chainState.GetOperatorStateWithSocket(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
7979
if err != nil {
8080
return nil, err
8181
}
82-
operators, ok := indexedOperatorState.Operators[quorumID]
82+
operators, ok := operatorState.Operators[quorumID]
8383
if !ok {
8484
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
8585
}
@@ -99,7 +99,7 @@ func (r *retrievalClient) GetBlob(
9999
return nil, err
100100
}
101101

102-
assignments, err := corev2.GetAssignments(indexedOperatorState.OperatorState, blobParam, quorumID)
102+
assignments, err := corev2.GetAssignments(operatorState, blobParam, quorumID)
103103
if err != nil {
104104
return nil, errors.New("failed to get assignments")
105105
}
@@ -108,8 +108,7 @@ func (r *retrievalClient) GetBlob(
108108
chunksChan := make(chan clients.RetrievedChunks, len(operators))
109109
pool := workerpool.New(r.numConnections)
110110
for opID := range operators {
111-
opID := opID
112-
opInfo := indexedOperatorState.IndexedOperators[opID]
111+
opInfo := operatorState.Operators[quorumID][opID]
113112
pool.Submit(func() {
114113
r.getChunksFromOperator(ctx, opID, opInfo, blobKey, quorumID, chunksChan)
115114
})
@@ -161,7 +160,7 @@ func (r *retrievalClient) GetBlob(
161160
func (r *retrievalClient) getChunksFromOperator(
162161
ctx context.Context,
163162
opID core.OperatorID,
164-
opInfo *core.IndexedOperatorInfo,
163+
opInfo *core.OperatorInfo,
165164
blobKey corev2.BlobKey,
166165
quorumID core.QuorumID,
167166
chunksChan chan clients.RetrievedChunks,

core/chainio.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ type OperatorStake struct {
1616
Stake *big.Int
1717
}
1818

19+
type OperatorStakeWithSocket struct {
20+
OperatorID OperatorID
21+
Stake *big.Int
22+
Socket OperatorSocket
23+
}
24+
1925
type OperatorToChurn struct {
2026
QuorumId QuorumID
2127
Operator gethcommon.Address
@@ -29,6 +35,7 @@ type OperatorSetParam struct {
2935
}
3036

3137
type OperatorStakes map[QuorumID]map[OperatorIndex]OperatorStake
38+
type OperatorStakesWithSocket map[QuorumID]map[OperatorIndex]OperatorStakeWithSocket
3239

3340
type Reader interface {
3441

@@ -44,6 +51,10 @@ type Reader interface {
4451
// The indices of the operators within each quorum are also returned.
4552
GetOperatorStakesForQuorums(ctx context.Context, quorums []QuorumID, blockNumber uint32) (OperatorStakes, error)
4653

54+
// GetOperatorStakesWithSocketForQuorums returns the stakes of all operators within the supplied quorums. The returned stakes are for the block number supplied.
55+
// The indices of the operators within each quorum are also returned.
56+
GetOperatorStakesWithSocketForQuorums(ctx context.Context, quorums []QuorumID, blockNumber uint32) (OperatorStakesWithSocket, error)
57+
4758
// GetBlockStaleMeasure returns the BLOCK_STALE_MEASURE defined onchain.
4859
GetBlockStaleMeasure(ctx context.Context) (uint32, error)
4960

core/eth/reader.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,43 @@ func (t *Reader) GetOperatorStakesForQuorums(ctx context.Context, quorums []core
483483
return state, nil
484484
}
485485

486+
// GetOperatorStakesForQuorums returns the stakes of all operators within the supplied quorums. The returned stakes are for the block number supplied.
487+
// The indices of the operators within each quorum are also returned.
488+
func (t *Reader) GetOperatorStakesWithSocketForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) (core.OperatorStakesWithSocket, error) {
489+
quorumBytes := make([]byte, len(quorums))
490+
for ind, quorum := range quorums {
491+
quorumBytes[ind] = byte(uint8(quorum))
492+
}
493+
494+
// result is a struct{Operators [][]opstateretriever.OperatorStateRetrieverOperator; Sockets [][]string}
495+
// Operators is a [][]*opstateretriever.OperatorStake with the same length and order as quorumBytes, and then indexed by operator index
496+
// Sockets is a [][]string with the same length and order as quorumBytes, and then indexed by operator index
497+
// By contract definition, Operators and Sockets are parallel arrays
498+
result, err := t.bindings.OpStateRetriever.GetOperatorStateWithSocket(&bind.CallOpts{
499+
Context: ctx,
500+
}, t.bindings.RegCoordinatorAddr, quorumBytes, blockNumber)
501+
if err != nil {
502+
t.logger.Errorf("Failed to fetch operator state: %s", err)
503+
return nil, fmt.Errorf("failed to fetch operator state: %w", err)
504+
}
505+
506+
state := make(core.OperatorStakesWithSocket, len(result.Operators))
507+
for i := range result.Operators {
508+
quorumID := quorums[i]
509+
state[quorumID] = make(map[core.OperatorIndex]core.OperatorStakeWithSocket, len(result.Operators[i]))
510+
for j, op := range result.Operators[i] {
511+
operatorIndex := core.OperatorIndex(j)
512+
state[quorumID][operatorIndex] = core.OperatorStakeWithSocket{
513+
Stake: op.Stake,
514+
OperatorID: op.OperatorId,
515+
Socket: core.OperatorSocket(result.Sockets[i][j]),
516+
}
517+
}
518+
}
519+
520+
return state, nil
521+
}
522+
486523
func (t *Reader) StakeRegistry(ctx context.Context) (gethcommon.Address, error) {
487524
return t.bindings.RegistryCoordinator.StakeRegistry(&bind.CallOpts{
488525
Context: ctx,

core/eth/state.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
4141
return getOperatorState(operatorsByQuorum, uint32(blockNumber))
4242
}
4343

44+
func (cs *ChainState) GetOperatorStateWithSocket(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.OperatorState, error) {
45+
operatorsByQuorum, err := cs.Tx.GetOperatorStakesWithSocketForQuorums(ctx, quorums, uint32(blockNumber))
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
return getOperatorStateWithSocket(operatorsByQuorum, uint32(blockNumber))
51+
}
52+
4453
func (cs *ChainState) GetCurrentBlockNumber(ctx context.Context) (uint, error) {
4554
header, err := cs.Client.HeaderByNumber(ctx, nil)
4655
if err != nil {
@@ -88,3 +97,36 @@ func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32)
8897

8998
return state, nil
9099
}
100+
101+
func getOperatorStateWithSocket(operatorsByQuorum core.OperatorStakesWithSocket, blockNumber uint32) (*core.OperatorState, error) {
102+
operators := make(map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo)
103+
totals := make(map[core.QuorumID]*core.OperatorInfo)
104+
105+
for quorumID, quorum := range operatorsByQuorum {
106+
totalStake := big.NewInt(0)
107+
operators[quorumID] = make(map[core.OperatorID]*core.OperatorInfo)
108+
109+
for ind, op := range quorum {
110+
operators[quorumID][op.OperatorID] = &core.OperatorInfo{
111+
Stake: op.Stake,
112+
Index: core.OperatorIndex(ind),
113+
Socket: core.OperatorSocket(op.Socket),
114+
}
115+
totalStake.Add(totalStake, op.Stake)
116+
}
117+
118+
totals[quorumID] = &core.OperatorInfo{
119+
Stake: totalStake,
120+
Index: core.OperatorIndex(len(quorum)),
121+
Socket: core.OperatorSocket(""),
122+
}
123+
}
124+
125+
state := &core.OperatorState{
126+
Operators: operators,
127+
Totals: totals,
128+
BlockNumber: uint(blockNumber),
129+
}
130+
131+
return state, nil
132+
}

core/mock/state.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,12 @@ func (d *ChainDataMock) GetOperatorState(ctx context.Context, blockNumber uint,
245245
return state.OperatorState, nil
246246
}
247247

248+
func (d *ChainDataMock) GetOperatorStateWithSocket(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.OperatorState, error) {
249+
state := d.GetTotalOperatorStateWithQuorums(ctx, blockNumber, quorums)
250+
251+
return state.OperatorState, nil
252+
}
253+
248254
func (d *ChainDataMock) GetOperatorStateByOperator(ctx context.Context, blockNumber uint, operator core.OperatorID) (*core.OperatorState, error) {
249255
quorums := make([]core.QuorumID, 0)
250256
for quorumID, stake := range d.Stakes {

core/mock/writer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,15 @@ func (t *MockWriter) GetOperatorStakesForQuorums(ctx context.Context, quorums []
9595
return result.(core.OperatorStakes), args.Error(1)
9696
}
9797

98+
func (t *MockWriter) GetOperatorStakesWithSocketForQuorums(ctx context.Context, quorums []core.QuorumID, blockNumber uint32) (core.OperatorStakesWithSocket, error) {
99+
args := t.Called()
100+
result := args.Get(0)
101+
if fn, ok := result.(func([]core.QuorumID, uint32) core.OperatorStakesWithSocket); ok {
102+
return fn(quorums, blockNumber), args.Error(1)
103+
}
104+
return result.(core.OperatorStakesWithSocket), args.Error(1)
105+
}
106+
98107
func (t *MockWriter) BuildConfirmBatchTxn(ctx context.Context, batchHeader *core.BatchHeader, quorums map[core.QuorumID]*core.QuorumResult, signatureAggregation *core.SignatureAggregation) (*types.Transaction, error) {
99108
args := t.Called(ctx, batchHeader, quorums, signatureAggregation)
100109
result := args.Get(0)

0 commit comments

Comments
 (0)