Skip to content

Commit e4151dc

Browse files
committed
added cache, gate and stringsutil packages
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
1 parent 3e308a4 commit e4151dc

28 files changed

+2373
-288
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
* [ENHANCEMENT] Replace go-kit/kit/log with go-kit/log. #52
4848
* [ENHANCEMENT] Add spanlogger package. #42
4949
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
50+
* [ENHANCEMENT] Add cache, gate and stringsutil packages. #239
5051
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 #84 #91 #93
5152
* [ENHANCEMENT] Memberlist: prepare the data to send on the write before starting counting the elapsed time for `-memberlist.packet-write-timeout`, in order to reduce chances we hit the timeout when sending a packet to other node. #89
5253
* [ENHANCEMENT] Memberlist: parallelize processing of messages received by memberlist. #110

cache/cache.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/go-kit/log"
9+
"github.com/pkg/errors"
10+
"github.com/prometheus/client_golang/prometheus"
11+
)
12+
13+
// Cache is a generic interface.
14+
type Cache interface {
15+
// Store data into the cache.
16+
//
17+
// Note that individual byte buffers may be retained by the cache!
18+
Store(ctx context.Context, data map[string][]byte, ttl time.Duration)
19+
20+
// Fetch multiple keys from cache. Returns map of input keys to data.
21+
// If key isn't in the map, data for given key was not found.
22+
Fetch(ctx context.Context, keys []string) map[string][]byte
23+
24+
Name() string
25+
}
26+
27+
const (
28+
BackendMemcached = "memcached"
29+
)
30+
31+
type BackendConfig struct {
32+
Backend string `yaml:"backend"`
33+
Memcached MemcachedConfig `yaml:"memcached"`
34+
}
35+
36+
// Validate the config.
37+
func (cfg *BackendConfig) Validate() error {
38+
if cfg.Backend != "" && cfg.Backend != BackendMemcached {
39+
return fmt.Errorf("unsupported cache backend: %s", cfg.Backend)
40+
}
41+
42+
if cfg.Backend == BackendMemcached {
43+
if err := cfg.Memcached.Validate(); err != nil {
44+
return err
45+
}
46+
}
47+
48+
return nil
49+
}
50+
51+
func CreateClient(cacheName string, cfg BackendConfig, logger log.Logger, reg prometheus.Registerer) (Cache, error) {
52+
switch cfg.Backend {
53+
case "":
54+
// No caching.
55+
return nil, nil
56+
57+
case BackendMemcached:
58+
client, err := NewMemcachedClientWithConfig(logger, cacheName, cfg.Memcached.ToMemcachedClientConfig(), reg)
59+
if err != nil {
60+
return nil, errors.Wrapf(err, "failed to create memcached client")
61+
}
62+
return NewMemcachedCache(cacheName, logger, client, reg), nil
63+
64+
default:
65+
return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cfg.Backend)
66+
}
67+
}

cache/compression.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"errors"
6+
"flag"
7+
"fmt"
8+
"strings"
9+
"time"
10+
11+
"github.com/go-kit/log"
12+
"github.com/go-kit/log/level"
13+
"github.com/golang/snappy"
14+
15+
"github.com/grafana/dskit/util/stringsutil"
16+
)
17+
18+
const (
19+
// CompressionSnappy is the value of the snappy compression.
20+
CompressionSnappy = "snappy"
21+
)
22+
23+
var (
24+
supportedCompressions = []string{CompressionSnappy}
25+
errUnsupportedCompression = errors.New("unsupported compression")
26+
)
27+
28+
type CompressionConfig struct {
29+
Compression string `yaml:"compression"`
30+
}
31+
32+
// RegisterFlagsWithPrefix registers flags with provided prefix.
33+
func (cfg *CompressionConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
34+
f.StringVar(&cfg.Compression, prefix+"compression", "", fmt.Sprintf("Enable cache compression, if not empty. Supported values are: %s.", strings.Join(supportedCompressions, ", ")))
35+
}
36+
37+
func (cfg *CompressionConfig) Validate() error {
38+
if cfg.Compression != "" && !stringsutil.SliceContains(supportedCompressions, cfg.Compression) {
39+
return errUnsupportedCompression
40+
}
41+
return nil
42+
}
43+
44+
func NewCompression(cfg CompressionConfig, next Cache, logger log.Logger) Cache {
45+
switch cfg.Compression {
46+
case CompressionSnappy:
47+
return NewSnappy(next, logger)
48+
default:
49+
// No compression.
50+
return next
51+
}
52+
}
53+
54+
type snappyCache struct {
55+
next Cache
56+
logger log.Logger
57+
}
58+
59+
// NewSnappy makes a new snappy encoding cache wrapper.
60+
func NewSnappy(next Cache, logger log.Logger) Cache {
61+
return &snappyCache{
62+
next: next,
63+
logger: logger,
64+
}
65+
}
66+
67+
// Store implements Cache.
68+
func (s *snappyCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
69+
encoded := make(map[string][]byte, len(data))
70+
for key, value := range data {
71+
encoded[key] = snappy.Encode(nil, value)
72+
}
73+
74+
s.next.Store(ctx, encoded, ttl)
75+
}
76+
77+
// Fetch implements Cache.
78+
func (s *snappyCache) Fetch(ctx context.Context, keys []string) map[string][]byte {
79+
found := s.next.Fetch(ctx, keys)
80+
decoded := make(map[string][]byte, len(found))
81+
82+
for key, encodedValue := range found {
83+
decodedValue, err := snappy.Decode(nil, encodedValue)
84+
if err != nil {
85+
level.Error(s.logger).Log("msg", "failed to decode cache entry", "err", err)
86+
continue
87+
}
88+
89+
decoded[key] = decodedValue
90+
}
91+
92+
return decoded
93+
}
94+
95+
func (s *snappyCache) Name() string {
96+
return s.next.Name()
97+
}

cache/compression_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/go-kit/log"
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestCompressionConfig_Validate(t *testing.T) {
13+
tests := map[string]struct {
14+
cfg CompressionConfig
15+
expected error
16+
}{
17+
"should pass with default config": {
18+
cfg: CompressionConfig{},
19+
},
20+
"should pass with snappy compression": {
21+
cfg: CompressionConfig{
22+
Compression: "snappy",
23+
},
24+
},
25+
"should fail with unsupported compression": {
26+
cfg: CompressionConfig{
27+
Compression: "unsupported",
28+
},
29+
expected: errUnsupportedCompression,
30+
},
31+
}
32+
33+
for testName, testData := range tests {
34+
t.Run(testName, func(t *testing.T) {
35+
assert.Equal(t, testData.expected, testData.cfg.Validate())
36+
})
37+
}
38+
}
39+
40+
func TestSnappyCache(t *testing.T) {
41+
ctx := context.Background()
42+
backend := NewMockCache()
43+
c := NewSnappy(backend, log.NewNopLogger())
44+
45+
t.Run("Fetch() should return empty results if no key has been found", func(t *testing.T) {
46+
assert.Empty(t, c.Fetch(ctx, []string{"a", "b", "c"}))
47+
})
48+
49+
t.Run("Fetch() should return previously set keys", func(t *testing.T) {
50+
expected := map[string][]byte{
51+
"a": []byte("value-a"),
52+
"b": []byte("value-b"),
53+
}
54+
55+
c.Store(ctx, expected, time.Hour)
56+
assert.Equal(t, expected, c.Fetch(ctx, []string{"a", "b", "c"}))
57+
})
58+
59+
t.Run("Fetch() should skip entries failing to decode", func(t *testing.T) {
60+
c.Store(ctx, map[string][]byte{"a": []byte("value-a")}, time.Hour)
61+
backend.Store(ctx, map[string][]byte{"b": []byte("value-b")}, time.Hour)
62+
63+
expected := map[string][]byte{
64+
"a": []byte("value-a"),
65+
}
66+
assert.Equal(t, expected, c.Fetch(ctx, []string{"a", "b", "c"}))
67+
})
68+
}

cache/jump_hash.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package cache
2+
3+
// jumpHash consistently chooses a hash bucket number in the range
4+
// [0, numBuckets) for the given key. numBuckets must be >= 1.
5+
//
6+
// Copied from github.com/dgryski/go-jump/blob/master/jump.go (MIT license).
7+
func jumpHash(key uint64, numBuckets int) int32 {
8+
var b int64 = -1
9+
var j int64
10+
11+
for j < int64(numBuckets) {
12+
b = j
13+
key = key*2862933555777941757 + 1
14+
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
15+
}
16+
17+
return int32(b)
18+
}

cache/lru.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
lru "github.com/hashicorp/golang-lru/simplelru"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
11+
)
12+
13+
type LRUCache struct {
14+
c Cache
15+
defaultTTL time.Duration
16+
name string
17+
18+
mtx sync.Mutex
19+
lru *lru.LRU
20+
21+
requests prometheus.Counter
22+
hits prometheus.Counter
23+
items prometheus.GaugeFunc
24+
}
25+
26+
type Item struct {
27+
Data []byte
28+
ExpiresAt time.Time
29+
}
30+
31+
// WrapWithLRUCache wraps a given `Cache` c with a LRU cache. The LRU cache will always store items in both caches.
32+
// However it will only fetch items from the underlying cache if the LRU cache doesn't have the item.
33+
// Items fetched from the underlying cache will be stored in the LRU cache with a default TTL.
34+
// The LRU cache will also remove items from the underlying cache if they are expired.
35+
// The LRU cache is limited in number of items using `lruSize`. This means this cache is not tailored for large items or items that have a big
36+
// variation in size.
37+
func WrapWithLRUCache(c Cache, name string, reg prometheus.Registerer, lruSize int, defaultTTL time.Duration) (*LRUCache, error) {
38+
lru, err := lru.NewLRU(lruSize, nil)
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
cache := &LRUCache{
44+
c: c,
45+
lru: lru,
46+
name: name,
47+
defaultTTL: defaultTTL,
48+
49+
requests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
50+
Name: "cache_memory_requests_total",
51+
Help: "Total number of requests to the in-memory cache.",
52+
ConstLabels: map[string]string{"name": name},
53+
}),
54+
hits: promauto.With(reg).NewCounter(prometheus.CounterOpts{
55+
Name: "cache_memory_hits_total",
56+
Help: "Total number of requests to the in-memory cache that were a hit.",
57+
ConstLabels: map[string]string{"name": name},
58+
}),
59+
}
60+
61+
cache.items = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
62+
Name: "cache_memory_items_count",
63+
Help: "Total number of items currently in the in-memory cache.",
64+
ConstLabels: map[string]string{"name": name},
65+
}, func() float64 {
66+
cache.mtx.Lock()
67+
defer cache.mtx.Unlock()
68+
69+
return float64(cache.lru.Len())
70+
})
71+
72+
return cache, nil
73+
}
74+
75+
func (l *LRUCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
76+
// store the data in the shared cache.
77+
l.c.Store(ctx, data, ttl)
78+
79+
l.mtx.Lock()
80+
defer l.mtx.Unlock()
81+
82+
for k, v := range data {
83+
l.lru.Add(k, &Item{
84+
Data: v,
85+
ExpiresAt: time.Now().Add(ttl),
86+
})
87+
}
88+
}
89+
90+
func (l *LRUCache) Fetch(ctx context.Context, keys []string) (result map[string][]byte) {
91+
l.requests.Add(float64(len(keys)))
92+
l.mtx.Lock()
93+
defer l.mtx.Unlock()
94+
var (
95+
found = make(map[string][]byte, len(keys))
96+
miss = make([]string, 0, len(keys))
97+
now = time.Now()
98+
)
99+
100+
for _, k := range keys {
101+
val, ok := l.lru.Get(k)
102+
if !ok {
103+
miss = append(miss, k)
104+
continue
105+
}
106+
item := val.(*Item)
107+
if item.ExpiresAt.After(now) {
108+
found[k] = item.Data
109+
continue
110+
}
111+
l.lru.Remove(k)
112+
miss = append(miss, k)
113+
114+
}
115+
l.hits.Add(float64(len(found)))
116+
117+
if len(miss) > 0 {
118+
result = l.c.Fetch(ctx, miss)
119+
for k, v := range result {
120+
// we don't know the ttl of the result, so we use the default one.
121+
l.lru.Add(k, &Item{
122+
Data: v,
123+
ExpiresAt: now.Add(l.defaultTTL),
124+
})
125+
found[k] = v
126+
}
127+
}
128+
129+
return found
130+
}
131+
132+
func (l *LRUCache) Name() string {
133+
return "in-memory-" + l.name
134+
}

0 commit comments

Comments
 (0)