Skip to content

Commit 3d209a3

Browse files
arijitADnoot
authored andcommitted
fix(dot/core): Batch process transaction message. (ChainSafe#1780)
* Batch process transaction message. Co-authored-by: noot <[email protected]>
1 parent 09ca34a commit 3d209a3

File tree

9 files changed

+198
-22
lines changed

9 files changed

+198
-22
lines changed

dot/network/notifications.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,16 @@ type (
5555

5656
// NotificationsMessageHandler is called when a (non-handshake) message is received over a notifications stream.
5757
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)
58+
59+
// NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode.
60+
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*batchMessage, err error)
5861
)
5962

63+
type batchMessage struct {
64+
msg NotificationsMessage
65+
peer peer.ID
66+
}
67+
6068
type handshakeReader struct {
6169
hs Handshake
6270
err error
@@ -141,7 +149,7 @@ func createDecoder(info *notificationsProtocol, handshakeDecoder HandshakeDecode
141149
}
142150
}
143151

144-
func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, messageHandler NotificationsMessageHandler) messageHandler {
152+
func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, messageHandler NotificationsMessageHandler, batchHandler NotificationsMessageBatchHandler) messageHandler {
145153
return func(stream libp2pnetwork.Stream, m Message) error {
146154
if m == nil || info == nil || info.handshakeValidator == nil || messageHandler == nil {
147155
return nil
@@ -210,18 +218,38 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
210218
"peer", stream.Conn().RemotePeer(),
211219
)
212220

213-
propagate, err := messageHandler(peer, msg)
214-
if err != nil {
215-
return err
221+
var (
222+
propagate bool
223+
err error
224+
msgs []*batchMessage
225+
)
226+
if batchHandler != nil {
227+
msgs, err = batchHandler(peer, msg)
228+
if err != nil {
229+
return err
230+
}
231+
232+
propagate = len(msgs) > 0
233+
} else {
234+
propagate, err = messageHandler(peer, msg)
235+
if err != nil {
236+
return err
237+
}
238+
msgs = append(msgs, &batchMessage{
239+
msg: msg,
240+
peer: peer,
241+
})
216242
}
217243

218244
if !propagate || s.noGossip {
219245
return nil
220246
}
221247

222-
seen := s.gossip.hasSeen(msg)
223-
if !seen {
224-
s.broadcastExcluding(info, peer, msg)
248+
for _, data := range msgs {
249+
seen := s.gossip.hasSeen(data.msg)
250+
if !seen {
251+
s.broadcastExcluding(info, data.peer, data.msg)
252+
}
225253
}
226254

227255
return nil

dot/network/notifications_test.go

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/ChainSafe/gossamer/lib/utils"
3131
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
3232
"github.com/libp2p/go-libp2p-core/peer"
33+
"github.com/stretchr/testify/mock"
3334
"github.com/stretchr/testify/require"
3435
)
3536

@@ -143,7 +144,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounce(t *testing.T) {
143144
inboundHandshakeData: new(sync.Map),
144145
outboundHandshakeData: new(sync.Map),
145146
}
146-
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage)
147+
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage, nil)
147148

148149
// set handshake data to received
149150
info.inboundHandshakeData.Store(testPeerID, handshakeData{
@@ -176,7 +177,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)
176177
inboundHandshakeData: new(sync.Map),
177178
outboundHandshakeData: new(sync.Map),
178179
}
179-
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage)
180+
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage, nil)
180181

181182
configB := &Config{
182183
BasePath: utils.NewTestBasePath(t, "nodeB"),
@@ -316,6 +317,108 @@ func Test_HandshakeTimeout(t *testing.T) {
316317
require.Len(t, connAToB[0].GetStreams(), 0)
317318
}
318319

320+
func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
321+
basePath := utils.NewTestBasePath(t, "nodeA")
322+
mockhandler := &MockTransactionHandler{}
323+
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
324+
mockhandler.On("TransactionsCount").Return(0)
325+
config := &Config{
326+
BasePath: basePath,
327+
Port: 7001,
328+
NoBootstrap: true,
329+
NoMDNS: true,
330+
TransactionHandler: mockhandler,
331+
}
332+
333+
s := createTestService(t, config)
334+
s.batchSize = 5
335+
336+
configB := &Config{
337+
BasePath: utils.NewTestBasePath(t, "nodeB"),
338+
Port: 7002,
339+
NoBootstrap: true,
340+
NoMDNS: true,
341+
}
342+
343+
b := createTestService(t, configB)
344+
345+
txnBatch := make(chan *batchMessage, s.batchSize)
346+
txnBatchHandler := s.createBatchMessageHandler(txnBatch)
347+
348+
// don't set handshake data ie. this stream has just been opened
349+
testPeerID := b.host.id()
350+
351+
// connect nodes
352+
addrInfoB := b.host.addrInfo()
353+
err := s.host.connect(addrInfoB)
354+
if failedToDial(err) {
355+
time.Sleep(TestBackoffTimeout)
356+
err = s.host.connect(addrInfoB)
357+
}
358+
require.NoError(t, err)
359+
360+
stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID)
361+
require.NoError(t, err)
362+
require.Len(t, txnBatch, 0)
363+
364+
// create info and handler
365+
info := &notificationsProtocol{
366+
protocolID: s.host.protocolID + transactionsID,
367+
getHandshake: s.getTransactionHandshake,
368+
handshakeValidator: validateTransactionHandshake,
369+
inboundHandshakeData: new(sync.Map),
370+
outboundHandshakeData: new(sync.Map),
371+
}
372+
handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler)
373+
374+
// set handshake data to received
375+
info.inboundHandshakeData.Store(testPeerID, handshakeData{
376+
received: true,
377+
validated: true,
378+
})
379+
msg := &TransactionMessage{
380+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
381+
}
382+
err = handler(stream, msg)
383+
require.NoError(t, err)
384+
require.Len(t, txnBatch, 1)
385+
386+
msg = &TransactionMessage{
387+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
388+
}
389+
err = handler(stream, msg)
390+
require.NoError(t, err)
391+
require.Len(t, txnBatch, 2)
392+
393+
msg = &TransactionMessage{
394+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
395+
}
396+
err = handler(stream, msg)
397+
require.NoError(t, err)
398+
require.Len(t, txnBatch, 3)
399+
400+
msg = &TransactionMessage{
401+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
402+
}
403+
err = handler(stream, msg)
404+
require.NoError(t, err)
405+
require.Len(t, txnBatch, 4)
406+
407+
msg = &TransactionMessage{
408+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
409+
}
410+
err = handler(stream, msg)
411+
require.NoError(t, err)
412+
require.Len(t, txnBatch, 0)
413+
414+
msg = &TransactionMessage{
415+
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
416+
}
417+
err = handler(stream, msg)
418+
require.NoError(t, err)
419+
require.Len(t, txnBatch, 1)
420+
}
421+
319422
func TestBlockAnnounceHandshakeSize(t *testing.T) {
320423
require.Equal(t, unsafe.Sizeof(BlockAnnounceHandshake{}), reflect.TypeOf(BlockAnnounceHandshake{}).Size())
321424
}

dot/network/service.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ type Service struct {
9898
// telemetry
9999
telemetryInterval time.Duration
100100
closeCh chan interface{}
101+
102+
batchSize int
101103
}
102104

103105
// NewService creates a new network service from the configuration and message channels
@@ -171,6 +173,7 @@ func NewService(cfg *Config) (*Service, error) {
171173
closeCh: make(chan interface{}),
172174
bufPool: bufPool,
173175
streamManager: newStreamManager(ctx),
176+
batchSize: 100,
174177
}
175178

176179
network.syncQueue = newSyncQueue(network)
@@ -218,11 +221,15 @@ func (s *Service) Start() error {
218221
s.validateBlockAnnounceHandshake,
219222
decodeBlockAnnounceMessage,
220223
s.handleBlockAnnounceMessage,
224+
nil,
221225
)
222226
if err != nil {
223227
logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err)
224228
}
225229

230+
txnBatch := make(chan *batchMessage, s.batchSize)
231+
txnBatchHandler := s.createBatchMessageHandler(txnBatch)
232+
226233
// register transactions protocol
227234
err = s.RegisterNotificationsProtocol(
228235
s.host.protocolID+transactionsID,
@@ -232,6 +239,7 @@ func (s *Service) Start() error {
232239
validateTransactionHandshake,
233240
decodeTransactionMessage,
234241
s.handleTransactionMessage,
242+
txnBatchHandler,
235243
)
236244
if err != nil {
237245
logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err)
@@ -420,6 +428,7 @@ func (s *Service) RegisterNotificationsProtocol(
420428
handshakeValidator HandshakeValidator,
421429
messageDecoder MessageDecoder,
422430
messageHandler NotificationsMessageHandler,
431+
batchHandler NotificationsMessageBatchHandler,
423432
) error {
424433
s.notificationsMu.Lock()
425434
defer s.notificationsMu.Unlock()
@@ -462,7 +471,7 @@ func (s *Service) RegisterNotificationsProtocol(
462471
info := s.notificationsProtocols[messageID]
463472

464473
decoder := createDecoder(info, handshakeDecoder, messageDecoder)
465-
handlerWithValidate := s.createNotificationsMessageHandler(info, messageHandler)
474+
handlerWithValidate := s.createNotificationsMessageHandler(info, messageHandler, batchHandler)
466475

467476
s.host.registerStreamHandler(protocolID, func(stream libp2pnetwork.Stream) {
468477
logger.Trace("received stream", "sub-protocol", protocolID)

dot/network/transaction.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,39 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) {
119119
return &transactionHandshake{}, nil
120120
}
121121

122+
func (s *Service) createBatchMessageHandler(txnBatch chan *batchMessage) NotificationsMessageBatchHandler {
123+
return func(peer peer.ID, msg NotificationsMessage) (msgs []*batchMessage, err error) {
124+
data := &batchMessage{
125+
msg: msg,
126+
peer: peer,
127+
}
128+
txnBatch <- data
129+
130+
if len(txnBatch) < s.batchSize {
131+
return nil, nil
132+
}
133+
134+
var propagateMsgs []*batchMessage
135+
for txnData := range txnBatch {
136+
propagate, err := s.handleTransactionMessage(txnData.peer, txnData.msg)
137+
if err != nil {
138+
continue
139+
}
140+
if propagate {
141+
propagateMsgs = append(propagateMsgs, &batchMessage{
142+
msg: txnData.msg,
143+
peer: txnData.peer,
144+
})
145+
}
146+
if len(txnBatch) == 0 {
147+
break
148+
}
149+
}
150+
// May be use error to compute peer score.
151+
return propagateMsgs, nil
152+
}
153+
}
154+
122155
func validateTransactionHandshake(_ peer.ID, _ Handshake) error {
123156
return nil
124157
}

dot/sync/interface.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package sync
1818

1919
import (
2020
"math/big"
21+
"sync"
2122

2223
"github.com/ChainSafe/gossamer/dot/types"
2324
"github.com/ChainSafe/gossamer/lib/common"
@@ -56,8 +57,7 @@ type StorageState interface {
5657
TrieState(root *common.Hash) (*rtstorage.TrieState, error)
5758
LoadCodeHash(*common.Hash) (common.Hash, error)
5859
SetSyncing(bool)
59-
Lock()
60-
Unlock()
60+
sync.Locker
6161
}
6262

6363
// CodeSubstitutedState interface to handle storage of code substitute state

lib/babe/state.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package babe
1818

1919
import (
2020
"math/big"
21+
"sync"
2122
"time"
2223

2324
"github.com/ChainSafe/gossamer/dot/types"
@@ -52,8 +53,7 @@ type BlockState interface {
5253
// StorageState interface for storage state methods
5354
type StorageState interface {
5455
TrieState(hash *common.Hash) (*rtstorage.TrieState, error)
55-
Lock()
56-
Unlock()
56+
sync.Locker
5757
}
5858

5959
// TransactionState is the interface for transaction queue methods

lib/grandpa/network.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func (s *Service) registerProtocol() error {
102102
s.validateHandshake,
103103
s.decodeMessage,
104104
s.handleNetworkMessage,
105+
nil,
105106
)
106107
}
107108

lib/grandpa/round_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,15 @@ func (n *testNetwork) SendJustificationRequest(to peer.ID, num uint32) {
8282
}
8383
}
8484

85-
func (n *testNetwork) RegisterNotificationsProtocol(
86-
pid protocol.ID,
87-
messageID byte,
88-
handshakeGetter network.HandshakeGetter,
89-
handshakeDecoder network.HandshakeDecoder,
90-
handshakeValidator network.HandshakeValidator,
91-
messageDecoder network.MessageDecoder,
92-
messageHandler network.NotificationsMessageHandler,
85+
func (*testNetwork) RegisterNotificationsProtocol(
86+
_ protocol.ID,
87+
_ byte,
88+
_ network.HandshakeGetter,
89+
_ network.HandshakeDecoder,
90+
_ network.HandshakeValidator,
91+
_ network.MessageDecoder,
92+
_ network.NotificationsMessageHandler,
93+
_ network.NotificationsMessageBatchHandler,
9394
) error {
9495
return nil
9596
}

lib/grandpa/state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,6 @@ type Network interface {
8484
handshakeValidator network.HandshakeValidator,
8585
messageDecoder network.MessageDecoder,
8686
messageHandler network.NotificationsMessageHandler,
87+
batchHandler network.NotificationsMessageBatchHandler,
8788
) error
8889
}

0 commit comments

Comments
 (0)