Skip to content

Commit a2c8896

Browse files
committed
update node socket registration format
1 parent d9737b1 commit a2c8896

File tree

20 files changed

+230
-91
lines changed

20 files changed

+230
-91
lines changed

core/mock/state.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ var _ core.IndexedChainState = (*ChainDataMock)(nil)
2626

2727
type PrivateOperatorInfo struct {
2828
*core.IndexedOperatorInfo
29-
KeyPair *core.KeyPair
30-
Signer blssigner.Signer
31-
Host string
32-
DispersalPort string
33-
RetrievalPort string
29+
KeyPair *core.KeyPair
30+
Signer blssigner.Signer
31+
Host string
32+
DispersalPort string
33+
RetrievalPort string
34+
V2DispersalPort string
3435
}
3536

3637
type PrivateOperatorState struct {
@@ -138,7 +139,8 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl
138139
host := "0.0.0.0"
139140
dispersalPort := fmt.Sprintf("3%03v", 2*i)
140141
retrievalPort := fmt.Sprintf("3%03v", 2*i+1)
141-
socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort)
142+
v2DispersalPort := fmt.Sprintf("3%03v", 2*i+2)
143+
socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort, v2DispersalPort)
142144

143145
indexed := &core.IndexedOperatorInfo{
144146
Socket: string(socket),
@@ -158,6 +160,7 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl
158160
Host: host,
159161
DispersalPort: dispersalPort,
160162
RetrievalPort: retrievalPort,
163+
V2DispersalPort: v2DispersalPort,
161164
}
162165

163166
indexedOperators[id] = indexed

core/serialization.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"errors"
88
"fmt"
99
"math/big"
10-
"regexp"
1110
"slices"
1211

1312
"github.com/Layr-Labs/eigenda/api"
@@ -528,33 +527,18 @@ func decode(data []byte, obj any) error {
528527
return nil
529528
}
530529

531-
func (s OperatorSocket) GetDispersalSocket() string {
532-
ip, port1, _, err := extractIPAndPorts(string(s))
530+
func (s OperatorSocket) GetV1DispersalSocket() string {
531+
ip, v1DispersalPort, _, _, err := ParseOperatorSocket(string(s))
533532
if err != nil {
534533
return ""
535534
}
536-
return fmt.Sprintf("%s:%s", ip, port1)
535+
return fmt.Sprintf("%s:%s", ip, v1DispersalPort)
537536
}
538537

539538
func (s OperatorSocket) GetRetrievalSocket() string {
540-
ip, _, port2, err := extractIPAndPorts(string(s))
539+
ip, _, retrievalPort, _, err := ParseOperatorSocket(string(s))
541540
if err != nil {
542541
return ""
543542
}
544-
return fmt.Sprintf("%s:%s", ip, port2)
545-
}
546-
547-
func extractIPAndPorts(s string) (string, string, string, error) {
548-
regex := regexp.MustCompile(`^([^:]+):([^;]+);([^;]+)$`)
549-
matches := regex.FindStringSubmatch(s)
550-
551-
if len(matches) != 4 {
552-
return "", "", "", errors.New("input string does not match expected format")
553-
}
554-
555-
ip := matches[1]
556-
port1 := matches[2]
557-
port2 := matches[3]
558-
559-
return ip, port1, port2, nil
543+
return fmt.Sprintf("%s:%s", ip, retrievalPort)
560544
}

core/serialization_test.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,20 +195,68 @@ func TestHashPubKeyG1(t *testing.T) {
195195
}
196196

197197
func TestParseOperatorSocket(t *testing.T) {
198-
operatorSocket := "localhost:1234;5678"
199-
host, dispersalPort, retrievalPort, err := core.ParseOperatorSocket(operatorSocket)
198+
operatorSocket := "localhost:1234;5678;9999"
199+
host, dispersalPort, retrievalPort, v2DispersalPort, err := core.ParseOperatorSocket(operatorSocket)
200200
assert.NoError(t, err)
201201
assert.Equal(t, "localhost", host)
202202
assert.Equal(t, "1234", dispersalPort)
203203
assert.Equal(t, "5678", retrievalPort)
204+
assert.Equal(t, "9999", v2DispersalPort)
204205

205-
_, _, _, err = core.ParseOperatorSocket("localhost:12345678")
206+
host, dispersalPort, retrievalPort, v2DispersalPort, err = core.ParseOperatorSocket("localhost:1234;5678")
207+
assert.NoError(t, err)
208+
assert.Equal(t, "localhost", host)
209+
assert.Equal(t, "1234", dispersalPort)
210+
assert.Equal(t, "5678", retrievalPort)
211+
assert.Equal(t, "", v2DispersalPort)
212+
213+
_, _, _, _, err = core.ParseOperatorSocket("localhost;1234;5678")
206214
assert.NotNil(t, err)
207-
assert.Equal(t, "invalid socket address format, missing retrieval port: localhost:12345678", err.Error())
215+
assert.ErrorContains(t, err, "invalid socket address format")
208216

209-
_, _, _, err = core.ParseOperatorSocket("localhost1234;5678")
217+
_, _, _, _, err = core.ParseOperatorSocket("localhost:12345678")
210218
assert.NotNil(t, err)
211-
assert.Equal(t, "invalid socket address format: localhost1234;5678", err.Error())
219+
assert.ErrorContains(t, err, "invalid socket address format")
220+
221+
_, _, _, _, err = core.ParseOperatorSocket("localhost1234;5678")
222+
assert.NotNil(t, err)
223+
assert.ErrorContains(t, err, "invalid socket address format")
224+
}
225+
226+
func TestGetV1DispersalSocket(t *testing.T) {
227+
operatorSocket := core.OperatorSocket("localhost:1234;5678;9999")
228+
socket := operatorSocket.GetV1DispersalSocket()
229+
assert.Equal(t, "localhost:1234", socket)
230+
231+
operatorSocket = core.OperatorSocket("localhost:1234;5678")
232+
socket = operatorSocket.GetV1DispersalSocket()
233+
assert.Equal(t, "localhost:1234", socket)
234+
235+
operatorSocket = core.OperatorSocket("localhost:1234;5678;")
236+
socket = operatorSocket.GetV1DispersalSocket()
237+
assert.Equal(t, "localhost:1234", socket)
238+
239+
operatorSocket = core.OperatorSocket("localhost:1234")
240+
socket = operatorSocket.GetV1DispersalSocket()
241+
assert.Equal(t, "", socket)
242+
}
243+
244+
func TestGetRetrievalSocket(t *testing.T) {
245+
operatorSocket := core.OperatorSocket("localhost:1234;5678;9999")
246+
socket := operatorSocket.GetRetrievalSocket()
247+
assert.Equal(t, "localhost:5678", socket)
248+
249+
operatorSocket = core.OperatorSocket("localhost:1234;5678")
250+
socket = operatorSocket.GetRetrievalSocket()
251+
assert.Equal(t, "localhost:5678", socket)
252+
253+
operatorSocket = core.OperatorSocket("localhost:1234;5678;")
254+
socket = operatorSocket.GetRetrievalSocket()
255+
assert.Equal(t, "localhost:5678", socket)
256+
257+
operatorSocket = core.OperatorSocket("localhost:1234")
258+
socket = operatorSocket.GetRetrievalSocket()
259+
assert.Equal(t, "", socket)
212260
}
213261

214262
func TestSignatureBytes(t *testing.T) {

core/state.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,55 @@ import (
1212

1313
// Operators
1414

15+
// OperatorSocket is formatted as "host:dispersalPort;retrievalPort;v2DispersalPort"
1516
type OperatorSocket string
1617

1718
func (s OperatorSocket) String() string {
1819
return string(s)
1920
}
2021

21-
func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort string) OperatorSocket {
22-
return OperatorSocket(fmt.Sprintf("%s:%s;%s", nodeIP, dispersalPort, retrievalPort))
22+
func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort, v2DispersalPort string) OperatorSocket {
23+
if v2DispersalPort == "" {
24+
return OperatorSocket(fmt.Sprintf("%s:%s;%s", nodeIP, dispersalPort, retrievalPort))
25+
}
26+
return OperatorSocket(fmt.Sprintf("%s:%s;%s;%s", nodeIP, dispersalPort, retrievalPort, v2DispersalPort))
2327
}
2428

2529
type StakeAmount = *big.Int
2630

27-
func ParseOperatorSocket(socket string) (host string, dispersalPort string, retrievalPort string, err error) {
31+
func ParseOperatorSocket(socket string) (host string, dispersalPort string, retrievalPort string, v2DispersalPort string, err error) {
2832
s := strings.Split(socket, ";")
29-
if len(s) != 2 {
30-
err = fmt.Errorf("invalid socket address format, missing retrieval port: %s", socket)
33+
34+
if len(s) == 2 {
35+
// no v2 dispersal port
36+
retrievalPort = s[1]
37+
s = strings.Split(s[0], ":")
38+
if len(s) != 2 {
39+
err = fmt.Errorf("invalid socket address format: %s", socket)
40+
return
41+
}
42+
host = s[0]
43+
dispersalPort = s[1]
44+
3145
return
3246
}
33-
retrievalPort = s[1]
3447

35-
s = strings.Split(s[0], ":")
36-
if len(s) != 2 {
37-
err = fmt.Errorf("invalid socket address format: %s", socket)
48+
if len(s) == 3 {
49+
// all ports specified
50+
v2DispersalPort = s[2]
51+
retrievalPort = s[1]
52+
53+
s = strings.Split(s[0], ":")
54+
if len(s) != 2 {
55+
err = fmt.Errorf("invalid socket address format: %s", socket)
56+
return
57+
}
58+
host = s[0]
59+
dispersalPort = s[1]
3860
return
3961
}
40-
host = s[0]
41-
dispersalPort = s[1]
4262

43-
return
63+
return "", "", "", "", fmt.Errorf("invalid socket address format %s: it must specify dispersal port, retrieval port, and/or v2 dispersal port (ex. 0.0.0.0:32004;32005;32006)", socket)
4464
}
4565

4666
// OperatorInfo contains information about an operator which is stored on the blockchain state,

disperser/batcher/grpc/dispatcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,11 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.EncodedBlobMe
114114
// TODO Add secure Grpc
115115

116116
conn, err := grpc.NewClient(
117-
core.OperatorSocket(op.Socket).GetDispersalSocket(),
117+
core.OperatorSocket(op.Socket).GetV1DispersalSocket(),
118118
grpc.WithTransportCredentials(insecure.NewCredentials()),
119119
)
120120
if err != nil {
121-
c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err)
121+
c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetV1DispersalSocket(), "err", err)
122122
return nil, err
123123
}
124124
defer conn.Close()

disperser/common/semver/semver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, oper
3333
if useRetrievalSocket {
3434
socket = operatorSocket.GetRetrievalSocket()
3535
} else {
36-
socket = operatorSocket.GetDispersalSocket()
36+
socket = operatorSocket.GetV1DispersalSocket()
3737
}
3838
semver := GetSemverInfo(context.Background(), socket, useRetrievalSocket, operatorId, logger, nodeInfoTimeout)
3939

disperser/controller/dispatcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,12 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
149149
for opID, op := range state.IndexedOperators {
150150
opID := opID
151151
op := op
152-
host, dispersalPort, _, err := core.ParseOperatorSocket(op.Socket)
152+
host, _, _, v2DispersalPort, err := core.ParseOperatorSocket(op.Socket)
153153
if err != nil {
154-
return nil, nil, fmt.Errorf("failed to parse operator socket: %w", err)
154+
return nil, nil, fmt.Errorf("failed to parse operator socket (%s): %w", op.Socket, err)
155155
}
156156

157-
client, err := d.nodeClientManager.GetClient(host, dispersalPort)
157+
client, err := d.nodeClientManager.GetClient(host, v2DispersalPort)
158158
if err != nil {
159159
d.logger.Error("failed to get node client", "operator", opID.Hex(), "err", err)
160160
continue

disperser/controller/dispatcher_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ func TestDispatcherHandleBatch(t *testing.T) {
7575
mockClient0 := clientsmock.NewNodeClient()
7676
sig0 := mockChainState.KeyPairs[opId0].SignMessage(bhh)
7777
mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(sig0, nil)
78-
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort
79-
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort
80-
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort
78+
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].V2DispersalPort
79+
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].V2DispersalPort
80+
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].V2DispersalPort
8181
require.NotEqual(t, op0Port, op1Port)
8282
require.NotEqual(t, op0Port, op2Port)
8383
components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil)
@@ -150,9 +150,9 @@ func TestDispatcherInsufficientSignatures(t *testing.T) {
150150
// only op2 signs - quorum 0 will have 0 signing rate, quorum 1 will have 20%
151151
mockClient0 := clientsmock.NewNodeClient()
152152
mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure"))
153-
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort
154-
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort
155-
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort
153+
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].V2DispersalPort
154+
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].V2DispersalPort
155+
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].V2DispersalPort
156156
require.NotEqual(t, op0Port, op1Port)
157157
require.NotEqual(t, op0Port, op2Port)
158158
components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil)
@@ -223,9 +223,9 @@ func TestDispatcherInsufficientSignatures2(t *testing.T) {
223223
// no operators sign, all blobs will have insufficient signatures
224224
mockClient0 := clientsmock.NewNodeClient()
225225
mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure"))
226-
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort
227-
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort
228-
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort
226+
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].V2DispersalPort
227+
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].V2DispersalPort
228+
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].V2DispersalPort
229229
require.NotEqual(t, op0Port, op1Port)
230230
require.NotEqual(t, op0Port, op2Port)
231231
components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil)

disperser/dataapi/operator_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st
4646
retrievalSocket := operatorSocket.GetRetrievalSocket()
4747
retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, oh.logger)
4848

49-
dispersalSocket := operatorSocket.GetDispersalSocket()
49+
dispersalSocket := operatorSocket.GetV1DispersalSocket()
5050
dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, oh.logger)
5151

5252
// Create the metadata regardless of online status

disperser/dataapi/subgraph_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ var (
330330
},
331331
SocketUpdates: []subgraph.SocketUpdates{
332332
{
333-
Socket: "localhost:32008;32009",
333+
Socket: "localhost:32008;32009;32010",
334334
},
335335
},
336336
}

0 commit comments

Comments
 (0)