-
Notifications
You must be signed in to change notification settings - Fork 144
feat(dot/network): Add cache for network message. #1511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3e0be21
1838c6a
6494e22
8b67282
c117667
f3a8f3b
d7f04c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package network | ||
|
||
import ( | ||
"errors" | ||
"time" | ||
|
||
"github.com/ChainSafe/gossamer/lib/common" | ||
"github.com/dgraph-io/ristretto" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
// msgCacheTTL is default duration a key-value will be stored in messageCache. | ||
var msgCacheTTL = 5 * time.Minute | ||
|
||
// messageCache is used to detect duplicated messages per peer. | ||
type messageCache struct { | ||
cache *ristretto.Cache | ||
ttl time.Duration | ||
} | ||
|
||
// newMessageCache creates a new messageCache which takes config and TTL duration. | ||
func newMessageCache(config ristretto.Config, ttl time.Duration) (*messageCache, error) { | ||
cache, err := ristretto.NewCache(&config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if ttl == 0 { | ||
ttl = msgCacheTTL | ||
} | ||
|
||
return &messageCache{cache: cache, ttl: ttl}, nil | ||
} | ||
|
||
// put appends peer ID and message data and stores it in cache with TTL. | ||
func (m *messageCache) put(peer peer.ID, msg NotificationsMessage) (bool, error) { | ||
key, err := generateCacheKey(peer, msg) | ||
if err != nil { | ||
return false, err | ||
} | ||
|
||
_, ok := m.cache.Get(key) | ||
if ok { | ||
return false, nil | ||
} | ||
|
||
ok = m.cache.SetWithTTL(key, "", 1, m.ttl) | ||
return ok, nil | ||
} | ||
|
||
// exists checks if <peer ID, message> exist in cache. | ||
func (m *messageCache) exists(peer peer.ID, msg NotificationsMessage) bool { | ||
key, err := generateCacheKey(peer, msg) | ||
if err != nil { | ||
return false | ||
} | ||
|
||
_, ok := m.cache.Get(key) | ||
return ok | ||
} | ||
|
||
func generateCacheKey(peer peer.ID, msg NotificationsMessage) ([]byte, error) { | ||
if msg.IsHandshake() { | ||
return nil, errors.New("cache does not support handshake messages") | ||
} | ||
|
||
peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msg.Hash().ToBytes()...)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return peerMsgHash.ToBytes(), nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package network | ||
|
||
import ( | ||
"math/big" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ChainSafe/gossamer/dot/types" | ||
"github.com/ChainSafe/gossamer/lib/common" | ||
"github.com/dgraph-io/ristretto" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestMessageCache(t *testing.T) { | ||
cacheSize := 64 << 20 // 64 MB | ||
msgCache, err := newMessageCache(ristretto.Config{ | ||
NumCounters: int64(float64(cacheSize) * 0.05 * 2), | ||
MaxCost: int64(float64(cacheSize) * 0.95), | ||
BufferItems: 64, | ||
Cost: func(value interface{}) int64 { | ||
return int64(1) | ||
}, | ||
}, 800*time.Millisecond) | ||
require.NoError(t, err) | ||
|
||
peerID := peer.ID("gossamer") | ||
msg := &BlockAnnounceMessage{ | ||
ParentHash: common.Hash{1}, | ||
Number: big.NewInt(77), | ||
StateRoot: common.Hash{2}, | ||
ExtrinsicsRoot: common.Hash{3}, | ||
Digest: types.Digest{}, | ||
} | ||
|
||
ok, err := msgCache.put(peerID, msg) | ||
require.NoError(t, err) | ||
require.True(t, ok) | ||
|
||
time.Sleep(750 * time.Millisecond) | ||
|
||
ok = msgCache.exists(peerID, msg) | ||
require.True(t, ok) | ||
|
||
time.Sleep(50 * time.Millisecond) | ||
|
||
ok = msgCache.exists(peerID, msg) | ||
require.False(t, ok) | ||
} | ||
|
||
func TestMessageCacheError(t *testing.T) { | ||
cacheSize := 64 << 20 // 64 MB | ||
msgCache, err := newMessageCache(ristretto.Config{ | ||
NumCounters: int64(float64(cacheSize) * 0.05 * 2), | ||
MaxCost: int64(float64(cacheSize) * 0.95), | ||
BufferItems: 64, | ||
Cost: func(value interface{}) int64 { | ||
return int64(1) | ||
}, | ||
}, 800*time.Millisecond) | ||
require.NoError(t, err) | ||
|
||
peerID := peer.ID("gossamer") | ||
msg := &BlockAnnounceHandshake{ | ||
Roles: 4, | ||
BestBlockNumber: 77, | ||
BestBlockHash: common.Hash{1}, | ||
GenesisHash: common.Hash{2}, | ||
} | ||
|
||
ok, err := msgCache.put(peerID, msg) | ||
require.Error(t, err, "cache does not support handshake messages") | ||
require.False(t, ok) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -225,19 +225,14 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer | |
peers := s.host.peers() | ||
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) | ||
|
||
for i, peer := range peers { // TODO: check if stream is open, if not, open and send handshake | ||
// TODO: configure this and determine ideal ratio, as well as when to use broadcast vs gossip | ||
if i > len(peers)/3 { | ||
return | ||
} | ||
info.mapMu.RLock() | ||
defer info.mapMu.RUnlock() | ||
|
||
for _, peer := range peers { // TODO: check if stream is open, if not, open and send handshake | ||
if peer == excluding { | ||
continue | ||
} | ||
|
||
info.mapMu.RLock() | ||
defer info.mapMu.RUnlock() | ||
|
||
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { | ||
info.handshakeData.Store(peer, &handshakeData{ | ||
validated: false, | ||
|
@@ -247,6 +242,19 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer | |
logger.Trace("sending handshake", "protocol", info.protocolID, "peer", peer, "message", hs) | ||
err = s.host.send(peer, info.protocolID, hs) | ||
} else { | ||
if s.host.messageCache != nil { | ||
Comment on lines
244
to
+245
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put this into one line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may lead to code duplication. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do you mean? |
||
var added bool | ||
added, err = s.host.messageCache.put(peer, msg) | ||
if err != nil { | ||
logger.Error("failed to add message to cache", "peer", peer, "error", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. It will return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this case, it will log the error if the message is already in the cache, maybe remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
continue | ||
} | ||
|
||
if !added { | ||
continue | ||
} | ||
} | ||
|
||
// we've already completed the handshake with the peer, send message directly | ||
logger.Trace("sending message", "protocol", info.protocolID, "peer", peer, "message", msg) | ||
err = s.host.send(peer, info.protocolID, msg) | ||
|
Uh oh!
There was an error while loading. Please reload this page.