Skip to content

Commit 21d4d59

Browse files
authored
Refactor node (#54)
1 parent 777f49d commit 21d4d59

37 files changed

+642
-254
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
inabox/testdata/*
12
test/testdata/*
23
test/resources/kzg/SRSTables/*

core/attestation.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Signature interface {
2121
Add(Signature) Signature
2222
Verify(PubKeyG2, [32]byte) bool
2323
Serialize() [32]byte
24+
Deserialize([]byte) Signature
2425
}
2526

2627
type PubKeyG1 interface {
@@ -66,6 +67,8 @@ var _ SignatureAggregator = (*StdSignatureAggregator)(nil)
6667

6768
func (a *StdSignatureAggregator) AggregateSignatures(state *OperatorState, quorumParams []QuorumParam, message [32]byte, messageChan chan SignerMessage) (*SignatureAggregation, error) {
6869

70+
// TODO: Add logging
71+
6972
// Ensure all quorums are found in state
7073
for _, quorum := range quorumParams {
7174
_, found := state.QuorumInfo[quorum.QuorumID]

core/bn254/attestation.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ func (s Signature) Serialize() [32]byte {
8282
return s.G1Affine.Bytes()
8383
}
8484

85+
func (s Signature) Deserialize(data []byte) core.Signature {
86+
p := new(bn254.G1Affine)
87+
_, err := p.SetBytes(data)
88+
if err != nil {
89+
return nil
90+
}
91+
return Signature{p}
92+
}
93+
8594
type PubKeyG1 struct {
8695
*bn254.G1Affine
8796
}

core/mock/state.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ type ChainDataMock struct {
1717
var _ core.ChainState = (*ChainDataMock)(nil)
1818

1919
type PrivateOperatorInfo struct {
20-
OperatorInfo *core.OperatorInfo
21-
KeyPair core.KeyPair
20+
*core.OperatorInfo
21+
KeyPair core.KeyPair
22+
Host string
23+
Port string
2224
}
2325

2426
type PrivateOperatorState struct {
@@ -65,8 +67,11 @@ func (d *ChainDataMock) GetTotalOperatorState(blockNumber uint) (*core.OperatorS
6567

6668
stake := ind + 1
6769

70+
host := "0.0.0.0"
71+
port := fmt.Sprintf("3%03v", int(ind))
72+
6873
op := &core.OperatorInfo{
69-
Socket: fmt.Sprintf("0.0.0.0:%v", ind),
74+
Socket: fmt.Sprintf("%v:%v", host, port),
7075
PubkeyG1: d.KeyPairs[ind].GetPubKeyG1(),
7176
PubkeyG2: d.KeyPairs[ind].GetPubKeyG2(),
7277
FromBlockNumber: 0,
@@ -76,6 +81,8 @@ func (d *ChainDataMock) GetTotalOperatorState(blockNumber uint) (*core.OperatorS
7681
private := &PrivateOperatorInfo{
7782
OperatorInfo: op,
7883
KeyPair: d.KeyPairs[ind],
84+
Host: host,
85+
Port: port,
7986
}
8087

8188
id := core.OperatorId(common.HexToAddress(big.NewInt(int64(ind + 1)).Text(16)))

core/validator.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@ type ChunkValidator struct {
1717
Id OperatorId
1818
}
1919

20-
func NewChunkValidator() *ChunkValidator {
21-
return &ChunkValidator{}
20+
func NewChunkValidator(enc Encoder, asgn AssignmentCoordinator, cst ChainState, id OperatorId) *ChunkValidator {
21+
return &ChunkValidator{
22+
Encoder: enc,
23+
Assignment: asgn,
24+
ChainState: cst,
25+
Id: id,
26+
}
2227
}
2328

2429
func (v *ChunkValidator) ValidateChunks(chunks []Chunk, glom GlomHeader, batch BatchHeader) error {

disperser/batcher.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package disperser
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"time"
87

98
"github.com/Layr-Labs/eigenda/common"
@@ -76,8 +75,6 @@ func NewBatcher(
7675

7776
func (b *Batcher) Start(ctx context.Context) {
7877

79-
fmt.Println("Starting batcher...")
80-
8178
go func() {
8279

8380
log := b.Logger
@@ -292,6 +289,7 @@ func (b *Batcher) ConstructGlom(blobs []*core.Blob, plan *GlomPlan, quorum Quoru
292289
Commitment: commits.Commitment,
293290
LengthProof: commits.LengthProof,
294291
Length: commits.Length,
292+
ChunkLength: plan.ChunkLength,
295293
QuantizationFactor: quorum.QuantizationFactor,
296294
NumBlobs: 1,
297295
SecurityParam: plan.SecurityParam,
@@ -311,10 +309,10 @@ func (b *Batcher) ConstructBatch(state *core.OperatorState, blobs []*core.Blob,
311309
batchBundles := make(map[core.OperatorId][][]core.Chunk, plan.NumOperators)
312310
gloms := make([]core.GlomHeader, len(plan.Gloms))
313311

314-
for _, glom := range plan.Gloms {
312+
for ind, glomPlan := range plan.Gloms {
315313

316314
// Construct the glom
317-
glomBundles, glom, err := b.ConstructGlom(blobs, &glom, plan.Quorums[glom.QuorumID])
315+
glomBundles, glom, err := b.ConstructGlom(blobs, &glomPlan, plan.Quorums[glomPlan.QuorumID])
318316
if err != nil {
319317
return nil, nil, err
320318
}
@@ -328,7 +326,7 @@ func (b *Batcher) ConstructBatch(state *core.OperatorState, blobs []*core.Blob,
328326
}
329327

330328
}
331-
gloms = append(gloms, *glom)
329+
gloms[ind] = *glom
332330
}
333331

334332
chunkBatches := make(map[core.OperatorId]core.ChunkBatch, len(batchBundles))

disperser/cmd/basic/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func NewConfig(ctx *cli.Context) Config {
3434
PullInterval: ctx.GlobalDuration(flags.PullIntervalFlag.Name),
3535
},
3636
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
37-
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManageFlag.Name),
37+
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
3838
Address: ctx.GlobalString(flags.AddressFlag.Name),
3939
}
4040
return config

disperser/cmd/basic/flags/flags.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,19 @@ var (
2424
Required: true,
2525
EnvVar: common.PrefixEnvVar(envVarPrefix, "GRPC_PORT"),
2626
}
27-
BlsOperatorStateRetrieverFlag = cli.StringFlag{
28-
Name: "bls-operator-state-retriever",
29-
Usage: "Address of the BLS Operator State Retriever",
30-
}
3127
AddressFlag = cli.StringFlag{
3228
Name: "address",
3329
Usage: "Address of the disperser",
3430
Required: true,
31+
EnvVar: common.PrefixEnvVar(envVarPrefix, "Address"),
32+
}
33+
BlsOperatorStateRetrieverFlag = cli.StringFlag{
34+
Name: "bls-operator-state-retriever",
35+
Usage: "Address of the BLS Operator State Retriever",
36+
Required: true,
3537
EnvVar: common.PrefixEnvVar(envVarPrefix, "BLS_OPERATOR_STATE_RETRIVER"),
3638
}
37-
EigenDAServiceManageFlag = cli.StringFlag{
39+
EigenDAServiceManagerFlag = cli.StringFlag{
3840
Name: "eigenda-service-manager",
3941
Usage: "Address of the EigenDA Service Manager",
4042
Required: true,
@@ -46,7 +48,7 @@ var requiredFlags = []cli.Flag{
4648
PullIntervalFlag,
4749
GrpcPortFlag,
4850
BlsOperatorStateRetrieverFlag,
49-
EigenDAServiceManageFlag,
51+
EigenDAServiceManagerFlag,
5052
AddressFlag,
5153
}
5254

disperser/grpc/dispatcher.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/Layr-Labs/eigenda/api/grpc/node"
88
"github.com/Layr-Labs/eigenda/common"
99
"github.com/Layr-Labs/eigenda/core"
10+
"github.com/Layr-Labs/eigenda/core/bn254"
1011
"github.com/Layr-Labs/eigenda/disperser"
1112

1213
"google.golang.org/grpc"
@@ -60,12 +61,12 @@ func (c *Dispatcher) sendAllChunks(ctx context.Context, state *core.OperatorStat
6061
Signature: nil,
6162
Operator: id,
6263
}
63-
}
64-
65-
update <- core.SignerMessage{
66-
Signature: sig,
67-
Operator: id,
68-
Err: nil,
64+
} else {
65+
update <- core.SignerMessage{
66+
Signature: sig,
67+
Operator: id,
68+
Err: nil,
69+
}
6970
}
7071

7172
}(op, id)
@@ -76,6 +77,7 @@ func (c *Dispatcher) sendAllChunks(ctx context.Context, state *core.OperatorStat
7677
func (c *Dispatcher) sendChunks(ctx context.Context, chunkBatch core.ChunkBatch, header *core.BatchHeader, op *core.OperatorInfo) (core.Signature, error) {
7778

7879
// TODO Add secure Grpc
80+
7981
conn, err := grpc.Dial(
8082
op.Socket,
8183
grpc.WithTransportCredentials(insecure.NewCredentials()),
@@ -98,13 +100,14 @@ func (c *Dispatcher) sendChunks(ctx context.Context, chunkBatch core.ChunkBatch,
98100
opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)
99101
reply, err := gc.StoreChunks(ctx, request, opt)
100102

101-
if err == nil {
102-
sig := reply.GetSignature()
103-
_ = sig
104-
return nil, nil
105-
} else {
103+
if err != nil {
106104
return nil, err
107105
}
106+
107+
sigBytes := reply.GetSignature()
108+
sig := new(bn254.Signature).Deserialize(sigBytes)
109+
return sig, nil
110+
108111
}
109112

110113
func getStoreChunksRequest(chunkBatch core.ChunkBatch, header *core.BatchHeader) (*node.StoreChunksRequest, error) {
File renamed without changes.

0 commit comments

Comments
 (0)