Skip to content

Commit a2ec23f

Browse files
authored
cache: use redis.ParseClusterURL for Redis cluster (#925)
1 parent 310a28f commit a2ec23f

File tree

6 files changed

+42
-200
lines changed

6 files changed

+42
-200
lines changed

.github/workflows/build_test.yml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,15 @@ jobs:
273273
env:
274274
MYSQL_URI: mysql://root:password@tcp(localhost:${{ job.services.mariadb.ports[3306] }})/
275275

276-
- name: Test Redis cluster
276+
- name: Test Redis cluster (1 address)
277277
run: go test ./storage/cache -run ^TestRedis
278278
env:
279-
REDIS_URI: redis+cluster://localhost:7005
279+
REDIS_URI: redis+cluster://localhost:7000
280+
281+
- name: Test Redis cluster (6 addresses)
282+
run: go test ./storage/cache -run ^TestRedis
283+
env:
284+
REDIS_URI: redis+cluster://localhost:7000?addr=localhost:7001&addr=localhost:7002&addr=localhost:7003&addr=localhost:7004&addr=localhost:7005
280285

281286
golangci:
282287
name: lint

config/config.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
# The database for caching, support Redis, MySQL, Postgres and MongoDB:
44
# redis://<user>:<password>@<host>:<port>/<db_number>
55
# rediss://<user>:<password>@<host>:<port>/<db_number>
6-
# redis+cluster://<user>:<password>@<host1>:<port1>,<host2>:<port2>,...,<hostN>:<portN>
6+
# redis+cluster://<user>:<password>@<host>:<port>[?addr=<host2>:<port2>&addr=<host3>:<port3>]
7+
# rediss+cluster://<user>:<password>@<host>:<port>[?addr=<host2>:<port2>&addr=<host3>:<port3>]
78
# mysql://[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
89
# postgres://bob:[email protected]:5432/mydb?sslmode=verify-full
910
# postgresql://bob:[email protected]:5432/mydb?sslmode=verify-full

storage/cache/database.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,14 @@ func Open(path, tablePrefix string, opts ...storage.Option) (Database, error) {
311311
return nil, errors.Trace(err)
312312
}
313313
return database, nil
314-
} else if strings.HasPrefix(path, storage.RedisClusterPrefix) {
315-
opt, err := ParseRedisClusterURL(path)
314+
} else if strings.HasPrefix(path, storage.RedisClusterPrefix) || strings.HasPrefix(path, storage.RedissClusterPrefix) {
315+
var newURL string
316+
if strings.HasPrefix(path, storage.RedisClusterPrefix) {
317+
newURL = strings.Replace(path, storage.RedisClusterPrefix, storage.RedisPrefix, 1)
318+
} else if strings.HasPrefix(path, storage.RedissClusterPrefix) {
319+
newURL = strings.Replace(path, storage.RedissClusterPrefix, storage.RedissPrefix, 1)
320+
}
321+
opt, err := redis.ParseClusterURL(newURL)
316322
if err != nil {
317323
return nil, err
318324
}

storage/cache/redis.go

Lines changed: 0 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"encoding/base64"
2020
"fmt"
2121
"io"
22-
"net/url"
2322
"strconv"
2423
"strings"
2524
"time"
@@ -513,159 +512,3 @@ func escape(s string) string {
513512
)
514513
return r.Replace(s)
515514
}
516-
517-
func ParseRedisClusterURL(redisURL string) (*redis.ClusterOptions, error) {
518-
options := &redis.ClusterOptions{}
519-
uri := redisURL
520-
521-
var err error
522-
if strings.HasPrefix(redisURL, storage.RedisClusterPrefix) {
523-
uri = uri[len(storage.RedisClusterPrefix):]
524-
} else {
525-
return nil, fmt.Errorf("scheme must be \"redis+cluster\"")
526-
}
527-
528-
if idx := strings.Index(uri, "@"); idx != -1 {
529-
userInfo := uri[:idx]
530-
uri = uri[idx+1:]
531-
532-
username := userInfo
533-
var password string
534-
535-
if idx := strings.Index(userInfo, ":"); idx != -1 {
536-
username = userInfo[:idx]
537-
password = userInfo[idx+1:]
538-
}
539-
540-
// Validate and process the username.
541-
if strings.Contains(username, "/") {
542-
return nil, fmt.Errorf("unescaped slash in username")
543-
}
544-
options.Username, err = url.PathUnescape(username)
545-
if err != nil {
546-
return nil, errors.Wrap(err, fmt.Errorf("invalid username"))
547-
}
548-
549-
// Validate and process the password.
550-
if strings.Contains(password, ":") {
551-
return nil, fmt.Errorf("unescaped colon in password")
552-
}
553-
if strings.Contains(password, "/") {
554-
return nil, fmt.Errorf("unescaped slash in password")
555-
}
556-
options.Password, err = url.PathUnescape(password)
557-
if err != nil {
558-
return nil, errors.Wrap(err, fmt.Errorf("invalid password"))
559-
}
560-
}
561-
562-
// fetch the hosts field
563-
hosts := uri
564-
if idx := strings.IndexAny(uri, "/?@"); idx != -1 {
565-
if uri[idx] == '@' {
566-
return nil, fmt.Errorf("unescaped @ sign in user info")
567-
}
568-
hosts = uri[:idx]
569-
}
570-
571-
options.Addrs = strings.Split(hosts, ",")
572-
uri = uri[len(hosts):]
573-
if len(uri) > 0 && uri[0] == '/' {
574-
uri = uri[1:]
575-
}
576-
577-
// grab connection arguments from URI
578-
connectionArgsFromQueryString, err := extractQueryArgsFromURI(uri)
579-
if err != nil {
580-
return nil, err
581-
}
582-
for _, pair := range connectionArgsFromQueryString {
583-
err = addOption(options, pair)
584-
if err != nil {
585-
return nil, err
586-
}
587-
}
588-
589-
return options, nil
590-
}
591-
592-
func extractQueryArgsFromURI(uri string) ([]string, error) {
593-
if len(uri) == 0 {
594-
return nil, nil
595-
}
596-
597-
if uri[0] != '?' {
598-
return nil, errors.New("must have a ? separator between path and query")
599-
}
600-
601-
uri = uri[1:]
602-
if len(uri) == 0 {
603-
return nil, nil
604-
}
605-
return strings.FieldsFunc(uri, func(r rune) bool { return r == ';' || r == '&' }), nil
606-
}
607-
608-
type optionHandler struct {
609-
int *int
610-
bool *bool
611-
duration *time.Duration
612-
}
613-
614-
func addOption(options *redis.ClusterOptions, pair string) error {
615-
kv := strings.SplitN(pair, "=", 2)
616-
if len(kv) != 2 || kv[0] == "" {
617-
return fmt.Errorf("invalid option")
618-
}
619-
620-
key, err := url.QueryUnescape(kv[0])
621-
if err != nil {
622-
return errors.Wrap(err, errors.Errorf("invalid option key %q", kv[0]))
623-
}
624-
625-
value, err := url.QueryUnescape(kv[1])
626-
if err != nil {
627-
return errors.Wrap(err, errors.Errorf("invalid option value %q", kv[1]))
628-
}
629-
630-
handlers := map[string]optionHandler{
631-
"max_retries": {int: &options.MaxRetries},
632-
"min_retry_backoff": {duration: &options.MinRetryBackoff},
633-
"max_retry_backoff": {duration: &options.MaxRetryBackoff},
634-
"dial_timeout": {duration: &options.DialTimeout},
635-
"read_timeout": {duration: &options.ReadTimeout},
636-
"write_timeout": {duration: &options.WriteTimeout},
637-
"pool_fifo": {bool: &options.PoolFIFO},
638-
"pool_size": {int: &options.PoolSize},
639-
"pool_timeout": {duration: &options.PoolTimeout},
640-
"min_idle_conns": {int: &options.MinIdleConns},
641-
"max_idle_conns": {int: &options.MaxIdleConns},
642-
"conn_max_idle_time": {duration: &options.ConnMaxIdleTime},
643-
"conn_max_lifetime": {duration: &options.ConnMaxLifetime},
644-
}
645-
646-
lowerKey := strings.ToLower(key)
647-
if handler, ok := handlers[lowerKey]; ok {
648-
if handler.int != nil {
649-
*handler.int, err = strconv.Atoi(value)
650-
if err != nil {
651-
return errors.Wrap(err, fmt.Errorf("invalid '%s' value: %q", key, value))
652-
}
653-
} else if handler.duration != nil {
654-
*handler.duration, err = time.ParseDuration(value)
655-
if err != nil {
656-
return errors.Wrap(err, fmt.Errorf("invalid '%s' value: %q", key, value))
657-
}
658-
} else if handler.bool != nil {
659-
*handler.bool, err = strconv.ParseBool(value)
660-
if err != nil {
661-
return errors.Wrap(err, fmt.Errorf("invalid '%s' value: %q", key, value))
662-
}
663-
} else {
664-
return fmt.Errorf("redis: unexpected option: %s", key)
665-
}
666-
} else {
667-
return fmt.Errorf("redis: unexpected option: %s", key)
668-
}
669-
670-
return nil
671-
}

storage/cache/redis_test.go

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/redis/go-redis/v9"
2526
"github.com/stretchr/testify/assert"
2627
"github.com/stretchr/testify/suite"
2728
"github.com/zhenghaoz/gorse/base/log"
@@ -52,8 +53,17 @@ func (suite *RedisTestSuite) SetupSuite() {
5253
suite.Database, err = Open(redisDSN, "gorse_")
5354
suite.NoError(err)
5455
// flush db
55-
err = suite.Database.(*Redis).client.FlushDB(context.TODO()).Err()
56-
suite.NoError(err)
56+
redisClient, ok := suite.Database.(*Redis)
57+
suite.True(ok)
58+
if clusterClient, ok := redisClient.client.(*redis.ClusterClient); ok {
59+
err = clusterClient.ForEachMaster(context.Background(), func(ctx context.Context, client *redis.Client) error {
60+
return client.FlushDB(ctx).Err()
61+
})
62+
suite.NoError(err)
63+
} else {
64+
err = redisClient.client.FlushDB(context.TODO()).Err()
65+
suite.NoError(err)
66+
}
5767
// create schema
5868
err = suite.Database.Init()
5969
suite.NoError(err)
@@ -114,27 +124,3 @@ func BenchmarkRedis(b *testing.B) {
114124
// benchmark
115125
benchmark(b, database)
116126
}
117-
118-
func TestParseRedisClusterURL(t *testing.T) {
119-
options, err := ParseRedisClusterURL("redis+cluster://username:[email protected]:6379,127.0.0.1:6380,127.0.0.1:6381/?" +
120-
"max_retries=1000&dial_timeout=1h&pool_fifo=true")
121-
if assert.NoError(t, err) {
122-
assert.Equal(t, "username", options.Username)
123-
assert.Equal(t, "password", options.Password)
124-
assert.Equal(t, []string{"127.0.0.1:6379", "127.0.0.1:6380", "127.0.0.1:6381"}, options.Addrs)
125-
assert.Equal(t, 1000, options.MaxRetries)
126-
assert.Equal(t, time.Hour, options.DialTimeout)
127-
assert.True(t, options.PoolFIFO)
128-
}
129-
130-
_, err = ParseRedisClusterURL("redis://")
131-
assert.Error(t, err)
132-
_, err = ParseRedisClusterURL("redis+cluster://username:[email protected]:6379/?max_retries=a")
133-
assert.Error(t, err)
134-
_, err = ParseRedisClusterURL("redis+cluster://username:[email protected]:6379/?dial_timeout=a")
135-
assert.Error(t, err)
136-
_, err = ParseRedisClusterURL("redis+cluster://username:[email protected]:6379/?pool_fifo=a")
137-
assert.Error(t, err)
138-
_, err = ParseRedisClusterURL("redis+cluster://username:[email protected]:6379/?a=1")
139-
assert.Error(t, err)
140-
}

storage/scheme.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,19 @@ import (
2929
)
3030

3131
const (
32-
MySQLPrefix = "mysql://"
33-
MongoPrefix = "mongodb://"
34-
MongoSrvPrefix = "mongodb+srv://"
35-
PostgresPrefix = "postgres://"
36-
PostgreSQLPrefix = "postgresql://"
37-
ClickhousePrefix = "clickhouse://"
38-
CHHTTPPrefix = "chhttp://"
39-
CHHTTPSPrefix = "chhttps://"
40-
SQLitePrefix = "sqlite://"
41-
RedisPrefix = "redis://"
42-
RedissPrefix = "rediss://"
43-
RedisClusterPrefix = "redis+cluster://"
32+
MySQLPrefix = "mysql://"
33+
MongoPrefix = "mongodb://"
34+
MongoSrvPrefix = "mongodb+srv://"
35+
PostgresPrefix = "postgres://"
36+
PostgreSQLPrefix = "postgresql://"
37+
ClickhousePrefix = "clickhouse://"
38+
CHHTTPPrefix = "chhttp://"
39+
CHHTTPSPrefix = "chhttps://"
40+
SQLitePrefix = "sqlite://"
41+
RedisPrefix = "redis://"
42+
RedissPrefix = "rediss://"
43+
RedisClusterPrefix = "redis+cluster://"
44+
RedissClusterPrefix = "rediss+cluster://"
4445
)
4546

4647
func AppendURLParams(rawURL string, params []lo.Tuple2[string, string]) (string, error) {

0 commit comments

Comments
 (0)