Skip to content

Commit 0639451

Browse files
edwardmacktimwu20
authored andcommitted
chore(lib/common) implement byte pool to improve websocket subscription efficiency (ChainSafe#1693)
* implement bufferpool * implement byte pool for subscription setup * lint * fix lint * address comments * refactor UnregisterFinalizedChannel to UnregisterFinalisedChannel * lint * change error handling * refactor NumPooled to Len
1 parent 281eef2 commit 0639451

File tree

12 files changed

+168
-45
lines changed

12 files changed

+168
-45
lines changed

dot/core/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type BlockState interface {
4444
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
4545
UnregisterImportedChannel(id byte)
4646
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
47-
UnregisterFinalizedChannel(id byte)
47+
UnregisterFinalisedChannel(id byte)
4848
HighestCommonAncestor(a, b common.Hash) (common.Hash, error)
4949
SubChain(start, end common.Hash) ([]common.Hash, error)
5050
GetBlockBody(hash common.Hash) (*types.Body, error)

dot/digest/digest.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (h *Handler) Start() error {
112112
func (h *Handler) Stop() error {
113113
h.cancel()
114114
h.blockState.UnregisterImportedChannel(h.importedID)
115-
h.blockState.UnregisterFinalizedChannel(h.finalisedID)
115+
h.blockState.UnregisterFinalisedChannel(h.finalisedID)
116116
close(h.imported)
117117
close(h.finalised)
118118
return nil

dot/digest/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type BlockState interface {
2929
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
3030
UnregisterImportedChannel(id byte)
3131
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
32-
UnregisterFinalizedChannel(id byte)
32+
UnregisterFinalisedChannel(id byte)
3333
}
3434

3535
// EpochState is the interface for state.EpochState

dot/rpc/modules/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type BlockAPI interface {
3636
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
3737
UnregisterImportedChannel(id byte)
3838
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
39-
UnregisterFinalizedChannel(id byte)
39+
UnregisterFinalisedChannel(id byte)
4040
SubChain(start, end common.Hash) ([]common.Hash, error)
4141
}
4242

dot/rpc/modules/mocks/block_api.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dot/state/block.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ type BlockState struct {
4646
lastFinalised common.Hash
4747

4848
// block notifiers
49-
imported map[byte]chan<- *types.Block
50-
finalised map[byte]chan<- *types.FinalisationInfo
51-
importedLock sync.RWMutex
52-
finalisedLock sync.RWMutex
49+
imported map[byte]chan<- *types.Block
50+
finalised map[byte]chan<- *types.FinalisationInfo
51+
importedLock sync.RWMutex
52+
finalisedLock sync.RWMutex
53+
importedBytePool *common.BytePool
54+
finalisedBytePool *common.BytePool
5355

5456
pruneKeyCh chan *types.Header
5557
}
@@ -80,6 +82,10 @@ func NewBlockState(db chaindb.Database, bt *blocktree.BlockTree) (*BlockState, e
8082
return nil, fmt.Errorf("failed to get last finalised hash: %w", err)
8183
}
8284

85+
bs.importedBytePool = common.NewBytePool256()
86+
87+
bs.finalisedBytePool = common.NewBytePool256()
88+
8389
return bs, nil
8490
}
8591

@@ -117,6 +123,10 @@ func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header) (*Block
117123
return nil, err
118124
}
119125

126+
bs.importedBytePool = common.NewBytePool256()
127+
128+
bs.finalisedBytePool = common.NewBytePool256()
129+
120130
return bs, nil
121131
}
122132

dot/state/block_notify.go

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717
package state
1818

1919
import (
20-
"errors"
21-
"math/rand"
22-
2320
"github.com/ChainSafe/gossamer/dot/types"
2421
"github.com/ChainSafe/gossamer/lib/common"
2522
)
@@ -29,16 +26,9 @@ import (
2926
func (bs *BlockState) RegisterImportedChannel(ch chan<- *types.Block) (byte, error) {
3027
bs.importedLock.RLock()
3128

32-
if len(bs.imported) == 256 {
33-
return 0, errors.New("channel limit reached")
34-
}
35-
36-
var id byte
37-
for {
38-
id = generateID()
39-
if bs.imported[id] == nil {
40-
break
41-
}
29+
id, err := bs.importedBytePool.Get()
30+
if err != nil {
31+
return 0, err
4232
}
4333

4434
bs.importedLock.RUnlock()
@@ -54,16 +44,9 @@ func (bs *BlockState) RegisterImportedChannel(ch chan<- *types.Block) (byte, err
5444
func (bs *BlockState) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
5545
bs.finalisedLock.RLock()
5646

57-
if len(bs.finalised) == 256 {
58-
return 0, errors.New("channel limit reached")
59-
}
60-
61-
var id byte
62-
for {
63-
id = generateID()
64-
if bs.finalised[id] == nil {
65-
break
66-
}
47+
id, err := bs.finalisedBytePool.Get()
48+
if err != nil {
49+
return 0, err
6750
}
6851

6952
bs.finalisedLock.RUnlock()
@@ -81,15 +64,23 @@ func (bs *BlockState) UnregisterImportedChannel(id byte) {
8164
defer bs.importedLock.Unlock()
8265

8366
delete(bs.imported, id)
67+
err := bs.importedBytePool.Put(id)
68+
if err != nil {
69+
logger.Error("failed to unregister imported channel", "error", err)
70+
}
8471
}
8572

86-
// UnregisterFinalizedChannel removes the block finalisation notification channel with the given ID.
73+
// UnregisterFinalisedChannel removes the block finalisation notification channel with the given ID.
8774
// A channel must be unregistered before closing it.
88-
func (bs *BlockState) UnregisterFinalizedChannel(id byte) {
75+
func (bs *BlockState) UnregisterFinalisedChannel(id byte) {
8976
bs.finalisedLock.Lock()
9077
defer bs.finalisedLock.Unlock()
9178

9279
delete(bs.finalised, id)
80+
err := bs.finalisedBytePool.Put(id)
81+
if err != nil {
82+
logger.Error("failed to unregister finalised channel", "error", err)
83+
}
9384
}
9485

9586
func (bs *BlockState) notifyImported(block *types.Block) {
@@ -141,9 +132,3 @@ func (bs *BlockState) notifyFinalized(hash common.Hash, round, setID uint64) {
141132
}(ch)
142133
}
143134
}
144-
145-
func generateID() byte {
146-
// skipcq: GSC-G404
147-
id := rand.Intn(256) //nolint
148-
return byte(id)
149-
}

dot/state/block_notify_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestFinalizedChannel(t *testing.T) {
5656
id, err := bs.RegisterFinalizedChannel(ch)
5757
require.NoError(t, err)
5858

59-
defer bs.UnregisterFinalizedChannel(id)
59+
defer bs.UnregisterFinalisedChannel(id)
6060

6161
chain, _ := AddBlocksToState(t, bs, 3)
6262

@@ -150,6 +150,6 @@ func TestFinalizedChannel_Multi(t *testing.T) {
150150
wg.Wait()
151151

152152
for _, id := range ids {
153-
bs.UnregisterFinalizedChannel(id)
153+
bs.UnregisterFinalisedChannel(id)
154154
}
155155
}

lib/common/bytepool.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright 2019 ChainSafe Systems (ON) Corp.
2+
// This file is part of gossamer.
3+
//
4+
// The gossamer library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The gossamer library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package common
18+
19+
import "fmt"
20+
21+
// BytePool struct to hold byte objects that will be contained in pool
22+
type BytePool struct {
23+
c chan byte
24+
}
25+
26+
// NewBytePool256 creates and initialises pool with 256 entries
27+
func NewBytePool256() *BytePool {
28+
bp := NewBytePool(256)
29+
for i := 0; i < 256; i++ {
30+
_ = bp.Put(byte(i))
31+
}
32+
return bp
33+
}
34+
35+
// NewBytePool creates a new empty byte pool with capacity of size
36+
func NewBytePool(size int) (bp *BytePool) {
37+
return &BytePool{
38+
c: make(chan byte, size),
39+
}
40+
}
41+
42+
// Get gets a Buffer from the BytePool, or creates a new one if none are
43+
// available in the pool.
44+
func (bp *BytePool) Get() (b byte, err error) {
45+
select {
46+
case b = <-bp.c:
47+
default:
48+
err = fmt.Errorf("all slots used")
49+
}
50+
return
51+
}
52+
53+
// Put returns the given Buffer to the BytePool.
54+
func (bp *BytePool) Put(b byte) error {
55+
select {
56+
case bp.c <- b:
57+
return nil
58+
default:
59+
return fmt.Errorf("pool is full")
60+
}
61+
}
62+
63+
// Len returns the number of items currently pooled.
64+
func (bp *BytePool) Len() int {
65+
return len(bp.c)
66+
}

lib/common/bytepool_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2019 ChainSafe Systems (ON) Corp.
2+
// This file is part of gossamer.
3+
//
4+
// The gossamer library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The gossamer library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package common
18+
19+
import (
20+
"math/rand"
21+
"testing"
22+
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
func TestBytePool(t *testing.T) {
27+
bp := NewBytePool(5)
28+
require.Equal(t, 0, bp.Len())
29+
30+
for i := 0; i < 5; i++ {
31+
err := bp.Put(generateID())
32+
require.NoError(t, err)
33+
}
34+
err := bp.Put(generateID())
35+
require.EqualError(t, err, "pool is full")
36+
require.Equal(t, 5, bp.Len())
37+
38+
for i := 0; i < 5; i++ {
39+
_, err := bp.Get() // nolint
40+
require.NoError(t, err)
41+
}
42+
_, err = bp.Get()
43+
require.EqualError(t, err, "all slots used")
44+
}
45+
46+
func TestBytePool256(t *testing.T) {
47+
bp := NewBytePool256()
48+
require.Equal(t, 256, bp.Len())
49+
50+
for i := 0; i < 256; i++ {
51+
_, err := bp.Get() // nolint
52+
require.NoError(t, err)
53+
}
54+
_, err := bp.Get()
55+
require.EqualError(t, err, "all slots used")
56+
}
57+
58+
func generateID() byte {
59+
// skipcq: GSC-G404
60+
id := rand.Intn(256) //nolint
61+
return byte(id)
62+
}

0 commit comments

Comments
 (0)