Skip to content

Commit c3d0c20

Browse files
committed
temp: add socket read fn
1 parent 5fbcdcd commit c3d0c20

File tree

7 files changed

+66
-21
lines changed

7 files changed

+66
-21
lines changed

api/clients/retrieval_client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,25 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
106106

107107
operatorState, err := r.chainState.GetOperatorStateWithSocket(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
108108
if err != nil {
109+
r.logger.Error("failed to get operator state", "err", err)
109110
return nil, err
110111
}
112+
113+
// Log operator state in a structured way
114+
if operatorState != nil && operatorState.Operators != nil {
115+
if operators, ok := operatorState.Operators[quorumID]; ok {
116+
for opID, opInfo := range operators {
117+
r.logger.Info("operator info",
118+
"quorumID", quorumID,
119+
"operatorID", opID.Hex(),
120+
"stake", opInfo.Stake.String(),
121+
"index", opInfo.Index,
122+
"socket", opInfo.Socket,
123+
)
124+
}
125+
}
126+
}
127+
111128
operators, ok := operatorState.Operators[quorumID]
112129
if !ok {
113130
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)

core/chainio.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type OperatorStake struct {
1919
type OperatorStakeWithSocket struct {
2020
OperatorID OperatorID
2121
Stake *big.Int
22-
Socket string
22+
Socket OperatorSocket
2323
}
2424

2525
type OperatorToChurn struct {
@@ -51,6 +51,10 @@ type Reader interface {
5151
// The indices of the operators within each quorum are also returned.
5252
GetOperatorStakesForQuorums(ctx context.Context, quorums []QuorumID, blockNumber uint32) (OperatorStakes, error)
5353

54+
// GetOperatorStakesWithSocketForQuorums returns the stakes of all operators within the supplied quorums. The returned stakes are for the block number supplied.
55+
// The indices of the operators within each quorum are also returned.
56+
GetOperatorStakesWithSocketForQuorums(ctx context.Context, quorums []QuorumID, blockNumber uint32) (OperatorStakesWithSocket, error)
57+
5458
// GetOperatorStakesWithSocket returns the operator state with socket for the given block number and quorums.
5559
GetOperatorStakesWithSocket(ctx context.Context, operatorID OperatorID, blockNumber uint32) (OperatorStakesWithSocket, []QuorumID, error)
5660

core/eth/reader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ func (t *Reader) GetOperatorStakesWithSocket(ctx context.Context, operator core.
461461
state[quorumID][operatorIndex] = core.OperatorStakeWithSocket{
462462
Stake: op.Stake,
463463
OperatorID: op.OperatorId,
464-
Socket: result.Sockets[i][j],
464+
Socket: core.OperatorSocket(result.Sockets[i][j]),
465465
}
466466
}
467467
}
@@ -552,7 +552,7 @@ func (t *Reader) GetOperatorStakesWithSocketForQuorums(ctx context.Context, quor
552552
state[quorumID][operatorIndex] = core.OperatorStakeWithSocket{
553553
Stake: op.Stake,
554554
OperatorID: op.OperatorId,
555-
Socket: result.Sockets[i][j],
555+
Socket: core.OperatorSocket(result.Sockets[i][j]),
556556
}
557557
}
558558
}

core/eth/state.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
4242
}
4343

4444
func (cs *ChainState) GetOperatorStateWithSocket(ctx context.Context, blockNumber uint, quorums []core.QuorumID) (*core.OperatorState, error) {
45-
operatorsByQuorum, err := cs.Tx.GetOperatorStakesForQuorums(ctx, quorums, uint32(blockNumber))
45+
operatorsByQuorum, err := cs.Tx.GetOperatorStakesWithSocketForQuorums(ctx, quorums, uint32(blockNumber))
4646
if err != nil {
4747
return nil, err
4848
}
4949

50-
return getOperatorState(operatorsByQuorum, uint32(blockNumber))
50+
return getOperatorStateWithSocket(operatorsByQuorum, uint32(blockNumber))
5151
}
5252

5353
func (cs *ChainState) GetCurrentBlockNumber(ctx context.Context) (uint, error) {
@@ -97,3 +97,36 @@ func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32)
9797

9898
return state, nil
9999
}
100+
101+
func getOperatorStateWithSocket(operatorsByQuorum core.OperatorStakesWithSocket, blockNumber uint32) (*core.OperatorState, error) {
102+
operators := make(map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo)
103+
totals := make(map[core.QuorumID]*core.OperatorInfo)
104+
105+
for quorumID, quorum := range operatorsByQuorum {
106+
totalStake := big.NewInt(0)
107+
operators[quorumID] = make(map[core.OperatorID]*core.OperatorInfo)
108+
109+
for ind, op := range quorum {
110+
operators[quorumID][op.OperatorID] = &core.OperatorInfo{
111+
Stake: op.Stake,
112+
Index: core.OperatorIndex(ind),
113+
Socket: core.OperatorSocket(op.Socket),
114+
}
115+
totalStake.Add(totalStake, op.Stake)
116+
}
117+
118+
totals[quorumID] = &core.OperatorInfo{
119+
Stake: totalStake,
120+
Index: core.OperatorIndex(len(quorum)),
121+
Socket: core.OperatorSocket(""),
122+
}
123+
}
124+
125+
state := &core.OperatorState{
126+
Operators: operators,
127+
Totals: totals,
128+
BlockNumber: uint(blockNumber),
129+
}
130+
131+
return state, nil
132+
}

retriever/cmd/main.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"github.com/Layr-Labs/eigenda/common/healthcheck"
1818
"github.com/Layr-Labs/eigenda/core"
1919
"github.com/Layr-Labs/eigenda/core/eth"
20-
"github.com/Layr-Labs/eigenda/core/thegraph"
2120
"github.com/Layr-Labs/eigenda/encoding/kzg/verifier"
2221
"github.com/Layr-Labs/eigenda/retriever"
2322
retrivereth "github.com/Layr-Labs/eigenda/retriever/eth"
@@ -100,12 +99,9 @@ func RetrieverMain(ctx *cli.Context) error {
10099
log.Fatalln("could not start tcp listener", err)
101100
}
102101

103-
logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint)
104-
ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger)
105-
106102
if config.EigenDAVersion == 1 {
107103
agn := &core.StdAssignmentCoordinator{}
108-
retrievalClient, err := clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, config.NumConnections)
104+
retrievalClient, err := clients.NewRetrievalClient(logger, cs, agn, nodeClient, v, config.NumConnections)
109105
if err != nil {
110106
log.Fatalln("could not start tcp listener", err)
111107
}
@@ -131,8 +127,8 @@ func RetrieverMain(ctx *cli.Context) error {
131127
}
132128

133129
if config.EigenDAVersion == 2 {
134-
retrievalClient := clientsv2.NewRetrievalClient(logger, tx, ics, v, config.NumConnections)
135-
retrieverServiceServer := retrieverv2.NewServer(config, logger, retrievalClient, ics)
130+
retrievalClient := clientsv2.NewRetrievalClient(logger, tx, cs, v, config.NumConnections)
131+
retrieverServiceServer := retrieverv2.NewServer(config, logger, retrievalClient, cs)
136132
if err = retrieverServiceServer.Start(context.Background()); err != nil {
137133
log.Fatalln("failed to start retriever service server", err)
138134
}

retriever/config.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,16 @@ import (
66

77
"github.com/Layr-Labs/eigenda/common"
88
"github.com/Layr-Labs/eigenda/common/geth"
9-
"github.com/Layr-Labs/eigenda/core/thegraph"
109
"github.com/Layr-Labs/eigenda/encoding/kzg"
1110
"github.com/Layr-Labs/eigenda/retriever/flags"
1211
"github.com/urfave/cli"
1312
)
1413

1514
type Config struct {
16-
EncoderConfig kzg.KzgConfig
17-
EthClientConfig geth.EthClientConfig
18-
LoggerConfig common.LoggerConfig
19-
MetricsConfig MetricsConfig
20-
ChainStateConfig thegraph.Config
15+
EncoderConfig kzg.KzgConfig
16+
EthClientConfig geth.EthClientConfig
17+
LoggerConfig common.LoggerConfig
18+
MetricsConfig MetricsConfig
2119

2220
Timeout time.Duration
2321
NumConnections int
@@ -44,7 +42,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
4442
MetricsConfig: MetricsConfig{
4543
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPortFlag.Name),
4644
},
47-
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
4845
Timeout: ctx.Duration(flags.TimeoutFlag.Name),
4946
NumConnections: ctx.Int(flags.NumConnectionsFlag.Name),
5047
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),

retriever/flags/flags.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package flags
33
import (
44
"github.com/Layr-Labs/eigenda/common"
55
"github.com/Layr-Labs/eigenda/common/geth"
6-
"github.com/Layr-Labs/eigenda/core/thegraph"
76
"github.com/Layr-Labs/eigenda/encoding/kzg"
87
"github.com/urfave/cli"
98
)
@@ -91,5 +90,4 @@ func init() {
9190
Flags = append(Flags, kzg.CLIFlags(envPrefix)...)
9291
Flags = append(Flags, geth.EthClientFlags(envPrefix)...)
9392
Flags = append(Flags, common.LoggerCLIFlags(envPrefix, FlagPrefix)...)
94-
Flags = append(Flags, thegraph.CLIFlags(envPrefix)...)
9593
}

0 commit comments

Comments
 (0)