Skip to content

Commit ffc81bf

Browse files
feat(telemetry): send txpool.import telemetry msg (#1966)
From this commit, a telemetry message of type `txpool.import` will be sent when a new transaction is imported in the transaction pool. Added tests for `txpool.import` and improved TestHandler_SendMulti test.
1 parent dde989d commit ffc81bf

File tree

6 files changed

+105
-66
lines changed

6 files changed

+105
-66
lines changed

dot/state/block_finalisation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er
191191
),
192192
)
193193
if err != nil {
194-
return fmt.Errorf("could not send 'notify.finalized' telemetry message, error: %s", err)
194+
logger.Debugf("could not send 'notify.finalized' telemetry message, error: %s", err)
195195
}
196196

197197
bs.lastFinalised = hash

dot/state/transaction.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package state
33
import (
44
"sync"
55

6+
"github.com/ChainSafe/gossamer/dot/telemetry"
7+
68
"github.com/ChainSafe/gossamer/dot/types"
79
"github.com/ChainSafe/gossamer/lib/common"
810
"github.com/ChainSafe/gossamer/lib/transaction"
@@ -68,7 +70,16 @@ func (s *TransactionState) RemoveExtrinsicFromPool(ext types.Extrinsic) {
6870
// AddToPool adds a transaction to the pool
6971
func (s *TransactionState) AddToPool(vt *transaction.ValidTransaction) common.Hash {
7072
s.notifyStatus(vt.Extrinsic, transaction.Future)
71-
return s.pool.Insert(vt)
73+
74+
hash := s.pool.Insert(vt)
75+
76+
if err := telemetry.GetInstance().SendMessage(
77+
telemetry.NewTxpoolImportTM(uint(s.queue.Len()), uint(s.pool.Len())),
78+
); err != nil {
79+
logger.Debugf("problem sending txpool.import telemetry message, error: %s", err)
80+
}
81+
82+
return hash
7283
}
7384

7485
// GetStatusNotifierChannel creates and returns a status notifier channel.

dot/telemetry/prepared_block_for_proposing.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,9 @@ func NewPreparedBlockForProposingTM(hash common.Hash, number string) Message {
3434
return &preparedBlockForProposingTM{
3535
Hash: hash,
3636
Number: number,
37-
Msg: "prepared_block_for_proposing",
3837
}
3938
}
4039

41-
func (tm *preparedBlockForProposingTM) messageType() string {
42-
return tm.Msg
40+
func (preparedBlockForProposingTM) messageType() string {
41+
return preparedBlockForProposingMsg
4342
}

dot/telemetry/telemetry.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ import (
2929

3030
// telemetry message types
3131
const (
32-
notifyFinalizedMsg = "notify.finalized"
33-
blockImportMsg = "block.import"
34-
systemNetworkStateMsg = "system.network_state"
35-
systemConnectedMsg = "system.connected"
36-
systemIntervalMsg = "system.interval"
32+
systemConnectedMsg = "system.connected"
33+
systemIntervalMsg = "system.interval"
34+
systemNetworkStateMsg = "system.network_state"
35+
blockImportMsg = "block.import"
36+
notifyFinalizedMsg = "notify.finalized"
37+
txPoolImportMsg = "txpool.import"
38+
preparedBlockForProposingMsg = "prepared_block_for_proposing"
3739
)
3840

3941
type telemetryConnection struct {

dot/telemetry/telemetry_test.go

Lines changed: 47 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -42,75 +42,66 @@ func TestMain(m *testing.M) {
4242
}
4343

4444
func TestHandler_SendMulti(t *testing.T) {
45-
var wg sync.WaitGroup
46-
wg.Add(6)
47-
48-
resultCh = make(chan []byte)
49-
50-
go func() {
51-
genesisHash := common.MustHexToHash("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3")
52-
53-
GetInstance().SendMessage(NewSystemConnectedTM(false, "chain", &genesisHash,
54-
"systemName", "nodeName", "netID", "startTime", "0.1"))
55-
56-
wg.Done()
57-
}()
58-
59-
go func() {
60-
bh := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")
61-
GetInstance().SendMessage(NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync"))
62-
63-
wg.Done()
64-
}()
65-
66-
go func() {
67-
GetInstance().SendMessage(NewBandwidthTM(2, 3, 1))
68-
69-
wg.Done()
70-
}()
71-
72-
go func() {
73-
bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")
74-
finalisedHash := common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2")
75-
GetInstance().SendMessage(NewBlockIntervalTM(&bestHash, big.NewInt(32375), &finalisedHash,
76-
big.NewInt(32256), big.NewInt(0), big.NewInt(1234)))
77-
78-
wg.Done()
79-
}()
80-
81-
go func() {
82-
bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")
83-
GetInstance().SendMessage(NewNotifyFinalizedTM(bestHash, "32375"))
45+
expected := [][]byte{
46+
[]byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`),
47+
[]byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`),
48+
[]byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`),
49+
[]byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`), //nolint
50+
[]byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`),
51+
[]byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`),
52+
[]byte(`{"future":2,"msg":"txpool.import","ready":1,"ts":`),
53+
}
8454

85-
wg.Done()
86-
}()
55+
messages := []Message{
56+
NewBandwidthTM(2, 3, 1),
57+
NewTxpoolImportTM(1, 2),
58+
59+
func(genesisHash common.Hash) Message {
60+
return NewSystemConnectedTM(false, "chain", &genesisHash,
61+
"systemName", "nodeName", "netID", "startTime", "0.1")
62+
}(common.MustHexToHash("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3")),
63+
64+
func(bh common.Hash) Message {
65+
return NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync")
66+
}(common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")),
67+
68+
func(bestHash, finalisedHash common.Hash) Message {
69+
return NewBlockIntervalTM(&bestHash, big.NewInt(32375), &finalisedHash,
70+
big.NewInt(32256), big.NewInt(0), big.NewInt(1234))
71+
}(
72+
common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"),
73+
common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"),
74+
),
75+
76+
NewNotifyFinalizedTM(common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), "32375"),
77+
NewPreparedBlockForProposingTM(common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c"), "1"),
78+
}
8779

88-
go func() {
89-
bestHash := common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c")
90-
GetInstance().SendMessage(NewPreparedBlockForProposingTM(bestHash, "1"))
80+
resultCh = make(chan []byte)
9181

92-
wg.Done()
93-
}()
82+
var wg sync.WaitGroup
83+
for _, message := range messages {
84+
wg.Add(1)
85+
go func(msg Message) {
86+
GetInstance().SendMessage(msg)
87+
wg.Done()
88+
}(message)
89+
}
9490

9591
wg.Wait()
9692

97-
expected1 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`)
98-
expected2 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`)
99-
expected3 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`)
100-
expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint
101-
expected5 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`)
102-
expected6 := []byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`)
103-
104-
expected := [][]byte{expected1, expected3, expected4, expected5, expected2, expected6}
105-
10693
var actual [][]byte
10794
for data := range resultCh {
10895
actual = append(actual, data)
109-
if len(actual) == 6 {
96+
if len(actual) == len(expected) {
11097
break
11198
}
11299
}
113100

101+
sort.Slice(expected, func(i, j int) bool {
102+
return bytes.Compare(expected[i], expected[j]) < 0
103+
})
104+
114105
sort.Slice(actual, func(i, j int) bool {
115106
return bytes.Compare(actual[i], actual[j]) < 0
116107
})

dot/telemetry/txpool_import.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright 2021 ChainSafe Systems (ON) Corp.
2+
// This file is part of gossamer.
3+
//
4+
// The gossamer library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The gossamer library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package telemetry
18+
19+
// txpoolImportTM holds `txpool.import` telemetry message, which is supposed to be
20+
// sent when a new transaction gets imported in the transaction pool.
21+
type txpoolImportTM struct {
22+
Ready uint `json:"ready"`
23+
Future uint `json:"future"`
24+
}
25+
26+
// NewTxpoolImportTM creates a new txpoolImportTM struct
27+
func NewTxpoolImportTM(ready, future uint) Message {
28+
return &txpoolImportTM{
29+
Ready: ready,
30+
Future: future,
31+
}
32+
}
33+
34+
func (txpoolImportTM) messageType() string {
35+
return txPoolImportMsg
36+
}

0 commit comments

Comments
 (0)