From 16a96112ef055149392bfe1244c03051449f5875 Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 5 Jun 2019 20:51:08 +0800 Subject: [PATCH 1/3] cherry-pick v1.4.4 --- ChangeLog | 10 ++++++++++ src/redis-shake/common/common.go | 8 ++++++++ src/redis-shake/common/utils.go | 4 ++-- src/redis-shake/configure/configure.go | 3 ++- src/redis-shake/main/main.go | 27 +++++++++++++++++++++----- 5 files changed, 44 insertions(+), 8 deletions(-) diff --git a/ChangeLog b/ChangeLog index 1ecd11b..11b15a2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,16 @@ +2019-06-07 Alibaba Cloud. + * VERSION: 1.6.6 + * cherry-pick merge v1.4.4 + * 2019-06-06 Alibaba Cloud. * VERSION: 1.6.5 * IMPROVE: run rump in parallel to support several db nodes behind proxy. + * BUGFIX: run rump panic when the source is proxy with more than 1 db. +2019-06-05 Alibaba Cloud. + * VERSION: 1.4.4 + * BUGFIX: modify the ttl from millisecond to second in restore when + overpass big key threshold. + * IMPROVE: set some default values in configuration. 2019-05-30 Alibaba Cloud. * VERSION: 1.6.4 * BUGFIX: fix bug of `GetDetailedInfo` panic. diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 2e0dfb7..5b83e05 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -36,6 +36,14 @@ var ( TargetRoundRobin int ) +const ( + KB = 1024 + MB = 1024 * KB + GB = 1024 * MB + TB = 1024 * GB + PB = 1024 * TB +) + // read until hit the end of RESP: "\r\n" func ReadRESPEnd(c net.Conn) (string, error) { var ret string diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index afeea7a..1d5712e 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -723,7 +723,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { } restoreQuicklistEntry(c, e) if e.ExpireAt != 0 { - r, err := redigo.Int64(c.Do("expire", e.Key, ttlms)) + r, err := redigo.Int64(c.Do("pexpire", e.Key, ttlms)) if err != nil && r != 1 { log.Panicf("expire ", string(e.Key), err) } @@ -755,7 +755,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { } restoreBigRdbEntry(c, e) if e.ExpireAt != 0 { - r, err := redigo.Int64(c.Do("expire", e.Key, ttlms)) + r, err := redigo.Int64(c.Do("pexpire", e.Key, ttlms)) if err != nil && r != 1 { log.Panicf("expire ", string(e.Key), err) } diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index e36da32..a35c898 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -23,7 +23,7 @@ type Configuration struct { TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordEncoding string `config:"target.password_encoding"` TargetVersion uint `config:"target.version"` - TargetDB int `config:"target.db"` + TargetDBString string `config:"target.db"` TargetAuthType string `config:"target.auth_type"` TargetType string `config:"target.type"` TargetTLSEnable bool `config:"target.tls_enable"` @@ -67,6 +67,7 @@ type Configuration struct { ShiftTime time.Duration // shift TargetRedisVersion string // to_redis_version TargetReplace bool // to_replace + TargetDB int // int type } var Options Configuration diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 58346d5..500f718 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -181,8 +181,11 @@ func sanitizeOptions(tp string) error { conf.Options.Parallel = int(math.Max(float64(conf.Options.Parallel), float64(conf.Options.NCpu))) } - if conf.Options.BigKeyThreshold > 524288000 { - return fmt.Errorf("BigKeyThreshold[%v] should <= 524288000", conf.Options.BigKeyThreshold) + // 500 M + if conf.Options.BigKeyThreshold > 500 * utils.MB { + return fmt.Errorf("BigKeyThreshold[%v] should <= 500 MB", conf.Options.BigKeyThreshold) + } else if conf.Options.BigKeyThreshold == 0 { + conf.Options.BigKeyThreshold = 50 * utils.MB } // source password @@ -266,7 +269,10 @@ func sanitizeOptions(tp string) error { // heartbeat, 86400 = 1 day if conf.Options.HeartbeatInterval > 86400 { return fmt.Errorf("HeartbeatInterval[%v] should in [0, 86400]", conf.Options.HeartbeatInterval) + } else if conf.Options.HeartbeatInterval == 0 { + conf.Options.HeartbeatInterval = 10 } + if conf.Options.HeartbeatNetworkInterface == "" { conf.Options.HeartbeatIp = "127.0.0.1" } else { @@ -325,8 +331,14 @@ func sanitizeOptions(tp string) error { } } - if conf.Options.TargetDB >= 0 { - // pass, >= 0 means enable + if conf.Options.TargetDBString == "" { + conf.Options.TargetDB = -1 + } else if v, err := strconv.Atoi(conf.Options.TargetDBString); err != nil { + return fmt.Errorf("parse target.db[%v] failed[%v]", conf.Options.TargetDBString, err) + } else if v < 0 { + conf.Options.TargetDB = -1 + } else { + conf.Options.TargetDB = v } if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 { @@ -357,7 +369,12 @@ func sanitizeOptions(tp string) error { conf.Options.SenderCount = defaultSenderCount } - if tp == conf.TypeRestore || tp == conf.TypeSync { + + if conf.Options.SenderDelayChannelSize == 0 { + conf.Options.SenderDelayChannelSize = 32 + } + + if tp == TypeRestore || tp == TypeSync { // get target redis version and set TargetReplace. for _, address := range conf.Options.TargetAddressList { // single connection even if the target is cluster From dd790a47d05c2336661b68d42005b5f950237b67 Mon Sep 17 00:00:00 2001 From: vinllen Date: Sun, 9 Jun 2019 17:16:56 +0800 Subject: [PATCH 2/3] delete single command failed when filter key given --- ChangeLog | 2 +- src/redis-shake/command/redis-command.go | 9 +++++---- src/redis-shake/main/main.go | 2 +- src/redis-shake/sync.go | 17 ++++++++++------- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/ChangeLog b/ChangeLog index 11b15a2..0785933 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,7 +1,7 @@ 2019-06-07 Alibaba Cloud. * VERSION: 1.6.6 * cherry-pick merge v1.4.4 - * + * BUGFIX: delete single command failed when filter key given. 2019-06-06 Alibaba Cloud. * VERSION: 1.6.5 * IMPROVE: run rump in parallel to support several db nodes behind proxy. diff --git a/src/redis-shake/command/redis-command.go b/src/redis-shake/command/redis-command.go index 2913836..3245eef 100644 --- a/src/redis-shake/command/redis-command.go +++ b/src/redis-shake/command/redis-command.go @@ -17,7 +17,7 @@ var RedisCommands = map[string]redisCommand{ "setex": {nil, 1, 1, 1}, "psetex": {nil, 1, 1, 1}, "append": {nil, 1, 1, 1}, - "del": {nil, 1, -1, 1}, + "del": {nil, 1, 0, 1}, "unlink": {nil, 1, -1, 1}, "setbit": {nil, 1, 1, 1}, "bitfield": {nil, 1, 1, 1}, @@ -86,7 +86,7 @@ var RedisCommands = map[string]redisCommand{ "pfmerge": {nil, 1, -1, 1}, } -func GetMatchKeys(redis_cmd redisCommand, args [][]byte, filterkeys []string) (new_args [][]byte, ret bool) { +func GetMatchKeys(redis_cmd redisCommand, args [][]byte, filterkeys []string) (new_args [][]byte, pass bool) { lastkey := redis_cmd.lastkey - 1 keystep := redis_cmd.keystep @@ -107,10 +107,10 @@ func GetMatchKeys(redis_cmd redisCommand, args [][]byte, filterkeys []string) (n } } - ret = false + pass = false new_args = make([][]byte, number*redis_cmd.keystep+len(args)-lastkey-redis_cmd.keystep) if number > 0 { - ret = true + pass = true for i := 0; i < number; i++ { for j := 0; j < redis_cmd.keystep; j++ { new_args[i*redis_cmd.keystep+j] = args[array[i]+j] @@ -124,5 +124,6 @@ func GetMatchKeys(redis_cmd redisCommand, args [][]byte, filterkeys []string) (n new_args[number*redis_cmd.keystep+j] = args[i] j = j + 1 } + return } diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 500f718..3735ff4 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -374,7 +374,7 @@ func sanitizeOptions(tp string) error { conf.Options.SenderDelayChannelSize = 32 } - if tp == TypeRestore || tp == TypeSync { + if tp == conf.TypeRestore || tp == conf.TypeSync || tp == conf.TypeRump { // get target redis version and set TargetReplace. for _, address := range conf.Options.TargetAddressList { // single connection even if the target is cluster diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index bf87590..3a95b06 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -531,7 +531,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type id := recvId.Get() // receive id // print debug log of receive reply - log.Debugf("receive reply[%v]: [%v], error: [%v]", id, reply, err) + log.Debugf("receive reply-id[%v]: [%v], error:[%v]", id, reply, err) if conf.Options.Metric == false { continue @@ -623,25 +623,28 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type ds.nbypass.Incr() // ds.SyncStat.BypassCmdCount.Incr() metric.GetMetric(ds.id).AddBypassCmdCount(1) + log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd) continue } } - is_filter := false + pass := false if len(conf.Options.FilterKey) != 0 { - ds, ok := command.RedisCommands[scmd] + cmdNode, ok := command.RedisCommands[scmd] if ok && len(argv) > 0 { - new_argv, is_filter = command.GetMatchKeys(ds, argv, conf.Options.FilterKey) + log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd) + new_argv, pass = command.GetMatchKeys(cmdNode, argv, conf.Options.FilterKey) } else { - is_filter = true + pass = true new_argv = argv } } else { - is_filter = true + pass = true new_argv = argv } - if bypass || ignorecmd || !is_filter { + if bypass || ignorecmd || !pass { ds.nbypass.Incr() + log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd) continue } } From b518bf0be01107071d0f0e353d0d3fc444037378 Mon Sep 17 00:00:00 2001 From: vinllen Date: Sun, 9 Jun 2019 17:17:37 +0800 Subject: [PATCH 3/3] polish ChangeLog date --- ChangeLog | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ChangeLog b/ChangeLog index 0785933..870184b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,4 @@ -2019-06-07 Alibaba Cloud. +2019-06-09 Alibaba Cloud. * VERSION: 1.6.6 * cherry-pick merge v1.4.4 * BUGFIX: delete single command failed when filter key given.