diff --git a/ChangeLog b/ChangeLog index 82e13b6..ca1fd63 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +2019-08-09 Alibaba Cloud. + * VERSION: 1.6.15 + * IMPROVE: add `target.version` to support some proxy like twemproxy. + * BUGFIX: filter `select` command in `rump` when only db0 supports in some + redis version. + * IMPROVE: remove the cluster limit when target type is rump. + * IMPROVE: add `scan.key_number` limit judge when target type is cluster + in type `rump`. see #136. 2019-08-01 Alibaba Cloud. * VERSION: 1.6.14 * BUGFIX: the `rdb.parallel` parameter limits concurrency without effect. diff --git a/README.md b/README.md index 15dfb22..4292327 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,8 @@ Please check out the `conf/redis-shake.conf` to see the detailed parameters desc # Support --- Redis version from 2.x to 5.0. -Supports `Standalone`, `Cluster`, `Codis`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on. +Supports `Standalone`, `Cluster`, and some proxies type like `Codis`, `twemproxy`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on.
+For `codis` and `twemproxy`, there maybe some constraints, please checkout this [question](https://github.com/alibaba/RedisShake/wiki/FAQ#q-does-redisshake-supports-codis-and-twemproxy). # Configuration Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters. diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index bdaa0ac..08b30dc 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -98,6 +98,9 @@ target.tls_enable = false # 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: # ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 target.rdb.output = local_dump +# some redis proxy like twemproxy doesn't support to fetch version, so please set it here. +# e.g., target.version = 4.0 +target.version = # use for expire key, set the time gap when source and target timestamp are not the same. # 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值 diff --git a/src/redis-shake/common/configure.go b/src/redis-shake/common/configure.go index 5a0b597..bbf6cfe 100644 --- a/src/redis-shake/common/configure.go +++ b/src/redis-shake/common/configure.go @@ -96,9 +96,9 @@ func parseAddress(tp, address, redisType string, isSource bool) error { } } case conf.RedisTypeCluster: - if isSource == false && tp == conf.TypeRump { - return fmt.Errorf("target type[%v] can't be cluster when type is 'rump' currently", redisType) - } + //if isSource == false && tp == conf.TypeRump { + // return fmt.Errorf("target type[%v] can't be cluster when type is 'rump' currently", redisType) + //} if strings.Contains(address, AddressSplitter) { arr := strings.Split(address, AddressSplitter) if len(arr) != 2 { diff --git a/src/redis-shake/common/split.go b/src/redis-shake/common/split.go index bbf53b2..10ffdd6 100644 --- a/src/redis-shake/common/split.go +++ b/src/redis-shake/common/split.go @@ -8,9 +8,12 @@ import ( redigo "github.com/garyburd/redigo/redis" ) -func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int) { - if _, err := client.Do("select", db); err != nil { - log.Panicf("send select db[%v] failed[%v]", db, err) +func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int, preDb *int) { + if db != *preDb { + if _, err := client.Do("select", db); err != nil { + log.Panicf("send select db[%v] failed[%v]", db, err) + } + *preDb = db } entry := rdb.BinEntry{ diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 2c9e15a..5a8cc85 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -15,7 +15,6 @@ type Configuration struct { SourceAddress string `config:"source.address"` SourcePasswordRaw string `config:"source.password_raw"` SourcePasswordEncoding string `config:"source.password_encoding"` - SourceVersion uint `config:"source.version"` SourceAuthType string `config:"source.auth_type"` SourceTLSEnable bool `config:"source.tls_enable"` SourceRdbInput []string `config:"source.rdb.input"` @@ -24,12 +23,12 @@ type Configuration struct { TargetAddress string `config:"target.address"` TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordEncoding string `config:"target.password_encoding"` - TargetVersion uint `config:"target.version"` TargetDBString string `config:"target.db"` TargetAuthType string `config:"target.auth_type"` TargetType string `config:"target.type"` TargetTLSEnable bool `config:"target.tls_enable"` TargetRdbOutput string `config:"target.rdb.output"` + TargetVersion string `config:"target.version"` FakeTime string `config:"fake_time"` Rewrite bool `config:"rewrite"` FilterDBWhitelist []string `config:"filter.db.whitelist"` @@ -67,15 +66,14 @@ type Configuration struct { /*---------------------------------------------------------*/ // generated variables - SourceAddressList []string // source address list - TargetAddressList []string // target address list - HeartbeatIp string // heartbeat ip - ShiftTime time.Duration // shift - TargetRedisVersion string // to_redis_version - TargetReplace bool // to_replace - TargetDB int // int type - Version string // version - Type string // input mode -type=xxx + SourceAddressList []string // source address list + TargetAddressList []string // target address list + HeartbeatIp string // heartbeat ip + ShiftTime time.Duration // shift + TargetReplace bool // to_replace + TargetDB int // int type + Version string // version + Type string // input mode -type=xxx } var Options Configuration diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 89fcb72..3ee087c 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -409,21 +409,24 @@ func sanitizeOptions(tp string) error { } if tp == conf.TypeRestore || tp == conf.TypeSync || tp == conf.TypeRump { - // get target redis version and set TargetReplace. - for _, address := range conf.Options.TargetAddressList { - // single connection even if the target is cluster - if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType, - conf.Options.TargetPasswordRaw, conf.Options.TargetTLSEnable); err != nil { - return fmt.Errorf("get target redis version failed[%v]", err) - } else if conf.Options.TargetRedisVersion != "" && conf.Options.TargetRedisVersion != v { - return fmt.Errorf("target redis version is different: [%v %v]", conf.Options.TargetRedisVersion, v) - } else { - conf.Options.TargetRedisVersion = v + if conf.Options.TargetVersion == "" { + // get target redis version and set TargetReplace. + for _, address := range conf.Options.TargetAddressList { + // single connection even if the target is cluster + if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType, + conf.Options.TargetPasswordRaw, conf.Options.TargetTLSEnable); err != nil { + return fmt.Errorf("get target redis version failed[%v]", err) + } else if conf.Options.TargetVersion != "" && conf.Options.TargetVersion != v { + return fmt.Errorf("target redis version is different: [%v %v]", conf.Options.TargetVersion, v) + } else { + conf.Options.TargetVersion = v + } } } - if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") || - strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") || - strings.HasPrefix(conf.Options.TargetRedisVersion, "5.") { + + if strings.HasPrefix(conf.Options.TargetVersion, "4.") || + strings.HasPrefix(conf.Options.TargetVersion, "3.") || + strings.HasPrefix(conf.Options.TargetVersion, "5.") { conf.Options.TargetReplace = true } else { conf.Options.TargetReplace = false @@ -445,6 +448,10 @@ func sanitizeOptions(tp string) error { conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) } + if conf.Options.ScanKeyNumber > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster { + return fmt.Errorf("scan.key_number should less than [%v] when target type is cluster", utils.RecvChanSize) + } + //if len(conf.Options.SourceAddressList) == 1 { // return fmt.Errorf("source address length should == 1 when type is 'rump'") //} diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 87feac8..aaf1b25 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -327,6 +327,7 @@ func (dre *dbRumperExecutor) writer() { // used in QoS bucket := utils.StartQoS(conf.Options.Qps) preDb := 0 + preBigKeyDb := 0 for ele := range dre.keyChan { if filter.FilterKey(ele.key) { continue @@ -354,8 +355,7 @@ func (dre *dbRumperExecutor) writer() { batch = dre.writeSend(batch, &count, &wBytes) // handle big key - utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db) - + utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db, &preBigKeyDb) // all the reply has been handled in RestoreBigkey // dre.resultChan <- ele continue