From d06127710801cba6b239d9871abc2d4f254e4b09 Mon Sep 17 00:00:00 2001 From: suxb201 Date: Thu, 28 Sep 2023 11:31:36 +0800 Subject: [PATCH] bugfix: Corrected cluster nodes parsing error #681 --- internal/utils/cluster_nodes.go | 6 +- internal/writer/redis_cluster_writer.go | 79 ++++--------------------- 2 files changed, 17 insertions(+), 68 deletions(-) diff --git a/internal/utils/cluster_nodes.go b/internal/utils/cluster_nodes.go index 1c0eaea..d98ad64 100644 --- a/internal/utils/cluster_nodes.go +++ b/internal/utils/cluster_nodes.go @@ -19,7 +19,7 @@ func GetRedisClusterNodes(address string, username string, password string, Tls if !strings.Contains(words[2], "master") { continue } - if len(words) < 9 { + if len(words) < 8 { log.Panicf("invalid cluster nodes line: %s", line) } log.Infof("redisClusterWriter load cluster nodes. line=%v", line) @@ -35,6 +35,10 @@ func GetRedisClusterNodes(address string, username string, password string, Tls ipv6Addr := strings.Join(tok[:len(tok)-1], ":") address = fmt.Sprintf("[%s]:%s", ipv6Addr, port) } + if len(words) < 9 { + log.Warnf("the current master node does not hold any slots. address=[%v]", address) + continue + } addresses = append(addresses, address) // parse slots diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go index 62acb4a..7685390 100644 --- a/internal/writer/redis_cluster_writer.go +++ b/internal/writer/redis_cluster_writer.go @@ -1,12 +1,9 @@ package writer import ( - "RedisShake/internal/client" "RedisShake/internal/entry" "RedisShake/internal/log" - "fmt" - "strconv" - "strings" + "RedisShake/internal/utils" ) const KeySlots = 16384 @@ -33,73 +30,21 @@ func (r *RedisClusterWriter) Close() { } func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) { - c := client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) - reply := c.DoWithStringReply("cluster", "nodes") - reply = strings.TrimSpace(reply) - for _, line := range strings.Split(reply, "\n") { - line = strings.TrimSpace(line) - words := strings.Split(line, " ") - if !strings.Contains(words[2], "master") { - continue - } - if len(words) < 9 { - log.Panicf("invalid cluster nodes line: %s", line) - } - log.Infof("redisClusterWriter load cluster nodes. line=%v", line) - // address - address := strings.Split(words[1], "@")[0] - - // handle ipv6 address - tok := strings.Split(address, ":") - if len(tok) > 2 { - // ipv6 address - port := tok[len(tok)-1] - - ipv6Addr := strings.Join(tok[:len(tok)-1], ":") - address = fmt.Sprintf("[%s]:%s", ipv6Addr, port) - } - - r.addresses = append(r.addresses, address) - - // writers - opts := &RedisWriterOptions{ - Address: address, - Username: opts.Username, - Password: opts.Password, - Tls: opts.Tls, - } - redisWriter := NewRedisStandaloneWriter(opts) + addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + r.addresses = addresses + for i, address := range addresses { + theOpts := *opts + theOpts.Address = address + redisWriter := NewRedisStandaloneWriter(&theOpts) r.writers = append(r.writers, redisWriter) - // parse slots - for i := 8; i < len(words); i++ { - words[i] = strings.TrimSpace(words[i]) - var start, end int - var err error - if strings.Contains(words[i], "-") { - seg := strings.Split(words[i], "-") - start, err = strconv.Atoi(seg[0]) - if err != nil { - log.Panicf(err.Error()) - } - end, err = strconv.Atoi(seg[1]) - if err != nil { - log.Panicf(err.Error()) - } - } else { - start, err = strconv.Atoi(words[i]) - if err != nil { - log.Panicf(err.Error()) - } - end = start - } - for j := start; j <= end; j++ { - if r.router[j] != nil { - log.Panicf("redisClusterWriter: slot %d already occupied", j) - } - r.router[j] = redisWriter + for _, s := range slots[i] { + if r.router[s] != nil { + log.Panicf("redisClusterWriter: slot %d already occupied", s) } + r.router[s] = redisWriter } } + for i := 0; i < KeySlots; i++ { if r.router[i] == nil { log.Panicf("redisClusterWriter: slot %d not occupied", i)