Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (s *ServerV2) GetNodeInfo(ctx context.Context, in *pb.GetNodeInfoRequest) (
Os: runtime.GOOS,
Arch: runtime.GOARCH,
NumCpu: uint32(runtime.GOMAXPROCS(0)),
MemBytes: memBytes}, nil
MemBytes: memBytes,
}, nil
}

func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) {
Expand All @@ -103,19 +104,6 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
return nil, api.NewErrorInvalidArg("v2 API is disabled")
}

if s.authenticator != nil {
disperserPeer, ok := peer.FromContext(ctx)
if !ok {
return nil, errors.New("could not get peer information")
}
disperserAddress := disperserPeer.Addr.String()

err := s.authenticator.AuthenticateStoreChunksRequest(ctx, disperserAddress, in, time.Now())
if err != nil {
return nil, fmt.Errorf("failed to authenticate request: %w", err)
}
}

if s.node.StoreV2 == nil {
return nil, api.NewErrorInternal("v2 store not initialized")
}
Expand All @@ -124,26 +112,41 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
return nil, api.NewErrorInternal("missing bls signer")
}

// Validate the request parameters (which is cheap) before starting any further
// processing of the request.
batch, err := s.validateStoreChunksRequest(in)
if err != nil {
return nil, err
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to validate store chunk request: %v", err))
}

batchHeaderHash, err := batch.BatchHeader.Hash()
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("invalid batch header: %v", err))
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to serialize batch header hash: %v", err))
}

if s.authenticator != nil {
disperserPeer, ok := peer.FromContext(ctx)
if !ok {
return nil, api.NewErrorInvalidArg("could not get peer information from request context")
}
disperserAddress := disperserPeer.Addr.String()

err := s.authenticator.AuthenticateStoreChunksRequest(ctx, disperserAddress, in, time.Now())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to authenticate request: %v", err))
}
}

s.logger.Info("new StoreChunks request", "batchHeaderHash", hex.EncodeToString(batchHeaderHash[:]), "numBlobs", len(batch.BlobCertificates), "referenceBlockNumber", batch.BatchHeader.ReferenceBlockNumber)
operatorState, err := s.node.ChainState.GetOperatorStateByOperator(ctx, uint(batch.BatchHeader.ReferenceBlockNumber), s.node.Config.ID)
if err != nil {
return nil, err
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get the operator state: %v", err))
}

stageTimer := time.Now()
blobShards, rawBundles, err := s.node.DownloadBundles(ctx, batch, operatorState)
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to download batch: %v", err))
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get the operator state: %v", err))
}
s.metrics.ReportStoreChunksLatency("download", time.Since(stageTimer))

Expand All @@ -158,7 +161,7 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
if err != nil {
storeChan <- storeResult{
keys: nil,
err: fmt.Errorf("failed to store batch: %v", err),
err: err,
}
return
}
Expand Down Expand Up @@ -203,13 +206,20 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (

// validateStoreChunksRequest validates the StoreChunksRequest and returns deserialized batch in the request
func (s *ServerV2) validateStoreChunksRequest(req *pb.StoreChunksRequest) (*corev2.Batch, error) {
// The signature is created by go-ethereum library, which contains 1 additional byte (for
// recovering the public key from signature), so it's 65 bytes.
if len(req.GetSignature()) != 65 {
return nil, fmt.Errorf("signature must be 65 bytes, found %d bytes", len(req.GetSignature()))
}

if req.GetBatch() == nil {
return nil, api.NewErrorInvalidArg("missing batch in request")
return nil, errors.New("missing batch in request")
}

// BatchFromProtobuf internally validates the Batch while deserializing
batch, err := corev2.BatchFromProtobuf(req.GetBatch())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to deserialize batch: %v", err))
return nil, fmt.Errorf("failed to deserialize batch: %v", err)
}

return batch, nil
Expand Down
55 changes: 46 additions & 9 deletions node/grpc/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"os"
"strings"
"sync/atomic"
"testing"

Expand Down Expand Up @@ -43,6 +44,17 @@ var (
blobParamsMap = map[v2.BlobVersion]*core.BlobVersionParameters{
0: blobParams,
}
ecdsaSig = []byte{
0x2a, 0x3b, 0x4c, 0x5d, 0x6e, 0x7f, 0x8a, 0x9b,
0x0c, 0x1d, 0x2e, 0x3f, 0x4a, 0x5b, 0x6c, 0x7d,
0x8e, 0x9f, 0x0a, 0x1b, 0x2c, 0x3d, 0x4e, 0x5f,
0x6a, 0x7b, 0x8c, 0x9d, 0x0e, 0x1f, 0x2a, 0x3b,
0x4c, 0x5d, 0x6e, 0x7f, 0x8a, 0x9b, 0x0c, 0x1d,
0x2e, 0x3f, 0x4a, 0x5b, 0x6c, 0x7d, 0x8e, 0x9f,
0x0a, 0x1b, 0x2c, 0x3d, 0x4e, 0x5f, 0x6a, 0x7b,
0x8c, 0x9d, 0x0e, 0x1f, 0x2a, 0x3b, 0x4c, 0x5d,
0x66,
}
)

type testComponents struct {
Expand Down Expand Up @@ -143,28 +155,40 @@ func TestV2StoreChunksInputValidation(t *testing.T) {
require.NoError(t, err)

req := &validator.StoreChunksRequest{
Batch: &pbcommon.Batch{},
DisperserID: 0,
}
_, err = c.server.StoreChunks(context.Background(), req)
requireErrorStatus(t, err, codes.InvalidArgument)
requireErrorStatusAndMsg(t, err, codes.InvalidArgument, "signature must be 65 bytes")

req = &validator.StoreChunksRequest{
DisperserID: 0,
Signature: ecdsaSig,
Batch: &pbcommon.Batch{},
}
_, err = c.server.StoreChunks(context.Background(), req)
requireErrorStatusAndMsg(t, err, codes.InvalidArgument, "failed to deserialize batch")

req = &validator.StoreChunksRequest{
DisperserID: 0,
Signature: ecdsaSig,
Batch: &pbcommon.Batch{
Header: &pbcommon.BatchHeader{},
BlobCertificates: batchProto.BlobCertificates,
},
}
_, err = c.server.StoreChunks(context.Background(), req)
requireErrorStatus(t, err, codes.InvalidArgument)
requireErrorStatusAndMsg(t, err, codes.InvalidArgument, "failed to deserialize batch")

req = &validator.StoreChunksRequest{
DisperserID: 0,
Signature: ecdsaSig,
Batch: &pbcommon.Batch{
Header: batchProto.Header,
BlobCertificates: []*pbcommon.BlobCertificate{},
},
}
_, err = c.server.StoreChunks(context.Background(), req)
requireErrorStatus(t, err, codes.InvalidArgument)
requireErrorStatusAndMsg(t, err, codes.InvalidArgument, "failed to deserialize batch")
}

func TestV2StoreChunksSuccess(t *testing.T) {
Expand Down Expand Up @@ -207,7 +231,9 @@ func TestV2StoreChunksSuccess(t *testing.T) {
})
c.store.On("StoreBatch", batch, mock.Anything).Return(nil, nil)
reply, err := c.server.StoreChunks(context.Background(), &validator.StoreChunksRequest{
Batch: batchProto,
DisperserID: 0,
Signature: ecdsaSig,
Batch: batchProto,
})
require.NoError(t, err)
require.NotNil(t, reply.GetSignature())
Expand Down Expand Up @@ -235,7 +261,9 @@ func TestV2StoreChunksDownloadFailure(t *testing.T) {
c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(0), mock.Anything).Return([][]byte{}, relayErr)
c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(1), mock.Anything).Return([][]byte{}, relayErr)
reply, err := c.server.StoreChunks(context.Background(), &validator.StoreChunksRequest{
Batch: batchProto,
DisperserID: 0,
Signature: ecdsaSig,
Batch: batchProto,
})
require.Nil(t, reply.GetSignature())
requireErrorStatus(t, err, codes.Internal)
Expand Down Expand Up @@ -281,10 +309,12 @@ func TestV2StoreChunksStorageFailure(t *testing.T) {
})
c.store.On("StoreBatch", batch, mock.Anything).Return(nil, errors.New("error"))
reply, err := c.server.StoreChunks(context.Background(), &validator.StoreChunksRequest{
Batch: batchProto,
DisperserID: 0,
Signature: ecdsaSig,
Batch: batchProto,
})
require.Nil(t, reply.GetSignature())
requireErrorStatus(t, err, codes.Internal)
requireErrorStatusAndMsg(t, err, codes.Internal, "failed to store batch")
}

func TestV2StoreChunksValidationFailure(t *testing.T) {
Expand Down Expand Up @@ -328,7 +358,9 @@ func TestV2StoreChunksValidationFailure(t *testing.T) {
c.store.On("StoreBatch", batch, mock.Anything).Return([]kvstore.Key{mockKey{}}, nil)
c.store.On("DeleteKeys", mock.Anything, mock.Anything).Return(nil)
reply, err := c.server.StoreChunks(context.Background(), &validator.StoreChunksRequest{
Batch: batchProto,
DisperserID: 0,
Signature: ecdsaSig,
Batch: batchProto,
})
require.Nil(t, reply.GetSignature())
requireErrorStatus(t, err, codes.Internal)
Expand Down Expand Up @@ -364,6 +396,11 @@ func requireErrorStatus(t *testing.T, err error, code codes.Code) {
assert.Equal(t, s.Code(), code)
}

func requireErrorStatusAndMsg(t *testing.T, err error, code codes.Code, substring string) {
requireErrorStatus(t, err, code)
assert.True(t, strings.Contains(err.Error(), substring))
}

type mockKey struct{}
type mockKeyBuilder struct{}

Expand Down
Loading