Skip to content

Commit 3445d84

Browse files
edwardmacktimwu20
authored andcommitted
fix(dot/telemetry): refactor telemetry to reduce CPU usage (ChainSafe#1597)
1 parent a00f881 commit 3445d84

File tree

5 files changed

+181
-160
lines changed

5 files changed

+181
-160
lines changed

dot/network/service.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,15 @@ main:
313313

314314
case <-ticker.C:
315315
o := s.host.bwc.GetBandwidthTotals()
316-
telemetry.GetInstance().SendNetworkData(telemetry.NewNetworkData(s.host.peerCount(), o.RateIn, o.RateOut))
316+
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
317+
telemetry.NewKeyValue("bandwidth_download", o.RateIn),
318+
telemetry.NewKeyValue("bandwidth_upload", o.RateOut),
319+
telemetry.NewKeyValue("msg", "system.interval"),
320+
telemetry.NewKeyValue("peers", s.host.peerCount())))
321+
if err != nil {
322+
logger.Debug("problem sending system.interval telemetry message", "error", err)
323+
}
317324
}
318-
319325
}
320326
}
321327

@@ -330,14 +336,17 @@ func (s *Service) sentBlockIntervalTelemetry() {
330336
continue
331337
}
332338

333-
telemetry.GetInstance().SendBlockIntervalData(&telemetry.BlockIntervalData{
334-
BestHash: best.Hash(),
335-
BestHeight: best.Number,
336-
FinalizedHash: finalized.Hash(),
337-
FinalizedHeight: finalized.Number,
338-
TXCount: 0, // todo (ed) determine where to get tx count
339-
UsedStateCacheSize: 0, // todo (ed) determine where to get used_state_cache_size
340-
})
339+
err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
340+
telemetry.NewKeyValue("best", best.Hash().String()),
341+
telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint
342+
telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint
343+
telemetry.NewKeyValue("height", best.Number),
344+
telemetry.NewKeyValue("msg", "system.interval"),
345+
telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count
346+
telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size
347+
if err != nil {
348+
logger.Debug("problem sending system.interval telemetry message", "error", err)
349+
}
341350
time.Sleep(s.telemetryInterval)
342351
}
343352
}

dot/node.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -350,18 +350,20 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
350350
}
351351

352352
telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
353-
data := &telemetry.ConnectionData{
354-
Authority: cfg.Core.GrandpaAuthority,
355-
Chain: sysSrvc.ChainName(),
356-
GenesisHash: stateSrvc.Block.GenesisHash().String(),
357-
SystemName: sysSrvc.SystemName(),
358-
NodeName: cfg.Global.Name,
359-
SystemVersion: sysSrvc.SystemVersion(),
360-
NetworkID: networkSrvc.NetworkState().PeerID,
361-
StartTime: strconv.FormatInt(time.Now().UnixNano(), 10),
362-
}
363-
telemetry.GetInstance().SendConnection(data)
364353

354+
err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
355+
telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority),
356+
telemetry.NewKeyValue("chain", sysSrvc.ChainName()),
357+
telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()),
358+
telemetry.NewKeyValue("implementation", sysSrvc.SystemName()),
359+
telemetry.NewKeyValue("msg", "system.connected"),
360+
telemetry.NewKeyValue("name", cfg.Global.Name),
361+
telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID),
362+
telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)),
363+
telemetry.NewKeyValue("version", sysSrvc.SystemVersion())))
364+
if err != nil {
365+
logger.Debug("problem sending system.connected telemetry message", "err", err)
366+
}
365367
return node, nil
366368
}
367369

dot/sync/syncer.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,14 @@ func (s *Service) handleBlock(block *types.Block) error {
364364
}
365365
} else {
366366
logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())
367-
telemetry.GetInstance().SendBlockImport(block.Header.Hash().String(), block.Header.Number)
367+
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
368+
telemetry.NewKeyValue("best", block.Header.Hash().String()),
369+
telemetry.NewKeyValue("height", block.Header.Number.Uint64()),
370+
telemetry.NewKeyValue("msg", "block.import"),
371+
telemetry.NewKeyValue("origin", "NetworkInitialSync")))
372+
if err != nil {
373+
logger.Debug("problem sending block.import telemetry message", "error", err)
374+
}
368375
}
369376

370377
// handle consensus digest for authority changes

dot/telemetry/telemetry.go

Lines changed: 86 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,38 @@
1717
package telemetry
1818

1919
import (
20-
"bytes"
2120
"encoding/json"
22-
"fmt"
23-
"math/big"
21+
"errors"
2422
"sync"
2523
"time"
2624

27-
"github.com/ChainSafe/gossamer/lib/common"
2825
"github.com/ChainSafe/gossamer/lib/genesis"
26+
log "github.com/ChainSafe/log15"
2927
"github.com/gorilla/websocket"
30-
log "github.com/sirupsen/logrus"
3128
)
3229

33-
// Handler struct for holding telemetry related things
34-
type Handler struct {
35-
buf bytes.Buffer
36-
wsConn []*websocket.Conn
37-
sync.RWMutex
30+
type telemetryConnection struct {
31+
wsconn *websocket.Conn
32+
verbosity int
33+
sync.Mutex
3834
}
3935

40-
// MyJSONFormatter struct for defining JSON Formatter
41-
type MyJSONFormatter struct {
36+
// Message struct to hold telemetry message data
37+
type Message struct {
38+
values map[string]interface{}
4239
}
4340

44-
// Format function for handling JSON formatting, this overrides default logging formatter to remove
45-
// log level, line number and timestamp
46-
func (f *MyJSONFormatter) Format(entry *log.Entry) ([]byte, error) {
47-
serialised, err := json.Marshal(entry.Data)
48-
if err != nil {
49-
return nil, fmt.Errorf("failed to marshal fields to JSON, %w", err)
50-
}
51-
return append(serialised, '\n'), nil
41+
// Handler struct for holding telemetry related things
42+
type Handler struct {
43+
msg chan Message
44+
connections []*telemetryConnection
45+
log log.Logger
46+
}
47+
48+
// KeyValue object to hold key value pairs used in telemetry messages
49+
type KeyValue struct {
50+
key string
51+
value interface{}
5252
}
5353

5454
var (
@@ -57,126 +57,98 @@ var (
5757
)
5858

5959
// GetInstance singleton pattern to for accessing TelemetryHandler
60-
func GetInstance() *Handler {
60+
func GetInstance() *Handler { //nolint
6161
if handlerInstance == nil {
6262
once.Do(
6363
func() {
6464
handlerInstance = &Handler{
65-
buf: bytes.Buffer{},
65+
msg: make(chan Message, 256),
66+
log: log.New("pkg", "telemetry"),
6667
}
67-
log.SetOutput(&handlerInstance.buf)
68-
log.SetFormatter(new(MyJSONFormatter))
69-
go handlerInstance.sender()
68+
go handlerInstance.startListening()
7069
})
7170
}
7271
return handlerInstance
7372
}
7473

75-
// AddConnections adds connections to telemetry sever
74+
// NewTelemetryMessage builds a telemetry message
75+
func NewTelemetryMessage(values ...*KeyValue) *Message { //nolint
76+
mvals := make(map[string]interface{})
77+
for _, v := range values {
78+
mvals[v.key] = v.value
79+
}
80+
return &Message{
81+
values: mvals,
82+
}
83+
}
84+
85+
// NewKeyValue builds a key value pair for telemetry messages
86+
func NewKeyValue(key string, value interface{}) *KeyValue { //nolint
87+
return &KeyValue{
88+
key: key,
89+
value: value,
90+
}
91+
}
92+
93+
// AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data
7694
func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
7795
for _, v := range conns {
7896
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil)
7997
if err != nil {
80-
fmt.Printf("Error %v\n", err)
98+
// todo (ed) try reconnecting if there is an error connecting
99+
h.log.Debug("issue adding telemetry connection", "error", err)
81100
continue
82101
}
83-
h.wsConn = append(h.wsConn, c)
102+
tConn := &telemetryConnection{
103+
wsconn: c,
104+
verbosity: v.Verbosity,
105+
}
106+
h.connections = append(h.connections, tConn)
84107
}
85108
}
86109

87-
// ConnectionData struct to hold connection data
88-
type ConnectionData struct {
89-
Authority bool
90-
Chain string
91-
GenesisHash string
92-
SystemName string
93-
NodeName string
94-
SystemVersion string
95-
NetworkID string
96-
StartTime string
97-
}
98-
99-
// SendConnection sends connection request message to telemetry connection
100-
func (h *Handler) SendConnection(data *ConnectionData) {
101-
h.Lock()
102-
defer h.Unlock()
103-
payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash,
104-
"implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime,
105-
"version": data.SystemVersion}
106-
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
107-
telemetryLogger.Print()
108-
}
109-
110-
// SendBlockImport sends block imported message to telemetry connection
111-
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {
112-
h.Lock()
113-
defer h.Unlock()
114-
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
115-
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
116-
telemetryLogger.Print()
117-
}
110+
// SendMessage sends Message to connected telemetry listeners
111+
func (h *Handler) SendMessage(msg *Message) error {
112+
select {
113+
case h.msg <- *msg:
118114

119-
// NetworkData struct to hold network data telemetry information
120-
type NetworkData struct {
121-
peers int
122-
rateIn float64
123-
rateOut float64
124-
}
125-
126-
// NewNetworkData creates networkData struct
127-
func NewNetworkData(peers int, rateIn, rateOut float64) *NetworkData {
128-
return &NetworkData{
129-
peers: peers,
130-
rateIn: rateIn,
131-
rateOut: rateOut,
115+
case <-time.After(time.Second * 1):
116+
return errors.New("timeout sending message")
132117
}
118+
return nil
133119
}
134120

135-
// SendNetworkData send network data system.interval message to telemetry connection
136-
func (h *Handler) SendNetworkData(data *NetworkData) {
137-
h.Lock()
138-
defer h.Unlock()
139-
payload := log.Fields{"bandwidth_download": data.rateIn, "bandwidth_upload": data.rateOut, "msg": "system.interval", "peers": data.peers}
140-
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
141-
telemetryLogger.Print()
142-
}
143-
144-
// BlockIntervalData struct to hold data for block system.interval message
145-
type BlockIntervalData struct {
146-
BestHash common.Hash
147-
BestHeight *big.Int
148-
FinalizedHash common.Hash
149-
FinalizedHeight *big.Int
150-
TXCount int
151-
UsedStateCacheSize int
121+
func (h *Handler) startListening() {
122+
for {
123+
msg := <-h.msg
124+
go func() {
125+
for _, conn := range h.connections {
126+
conn.Lock()
127+
err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg))
128+
if err != nil {
129+
h.log.Warn("issue while sending telemetry message", "error", err)
130+
}
131+
conn.Unlock()
132+
}
133+
}()
134+
}
152135
}
153136

154-
// SendBlockIntervalData send block data system interval information to telemetry connection
155-
func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) {
156-
h.Lock()
157-
defer h.Unlock()
158-
payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(), // nolint
159-
"finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount, // nolint
160-
"used_state_cache_size": data.UsedStateCacheSize}
161-
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
162-
telemetryLogger.Print()
137+
type response struct {
138+
ID int `json:"id"`
139+
Payload map[string]interface{} `json:"payload"`
140+
Timestamp time.Time `json:"ts"`
163141
}
164142

165-
func (h *Handler) sender() {
166-
for {
167-
h.RLock()
168-
line, err := h.buf.ReadBytes(byte(10)) // byte 10 is newline character, used as delimiter
169-
h.RUnlock()
170-
if err != nil {
171-
continue
172-
}
173-
174-
for _, c := range h.wsConn {
175-
err := c.WriteMessage(websocket.TextMessage, line)
176-
if err != nil {
177-
// TODO (ed) determine how to handle this error
178-
fmt.Printf("ERROR connecting to telemetry %v\n", err)
179-
}
180-
}
143+
func msgToBytes(message Message) []byte {
144+
res := response{
145+
ID: 1, // todo (ed) determine how this is used
146+
Payload: message.values,
147+
Timestamp: time.Now(),
148+
}
149+
resB, err := json.Marshal(res)
150+
if err != nil {
151+
return nil
181152
}
153+
return resB
182154
}

0 commit comments

Comments
 (0)