Merge pull request #139 from alibaba/feature-1.6

Feature 1.6
v4
Vinllen Chen 5 years ago committed by GitHub
commit f428eafab6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      ChangeLog
  2. 3
      README.md
  3. 3
      conf/redis-shake.conf
  4. 6
      src/redis-shake/common/configure.go
  5. 9
      src/redis-shake/common/split.go
  6. 20
      src/redis-shake/configure/configure.go
  7. 33
      src/redis-shake/main/main.go
  8. 4
      src/redis-shake/rump.go

@ -1,3 +1,11 @@
2019-08-09 Alibaba Cloud.
* VERSION: 1.6.15
* IMPROVE: add `target.version` to support some proxy like twemproxy.
* BUGFIX: filter `select` command in `rump` when only db0 supports in some
redis version.
* IMPROVE: remove the cluster limit when target type is rump.
* IMPROVE: add `scan.key_number` limit judge when target type is cluster
in type `rump`. see #136.
2019-08-01 Alibaba Cloud. 2019-08-01 Alibaba Cloud.
* VERSION: 1.6.14 * VERSION: 1.6.14
* BUGFIX: the `rdb.parallel` parameter limits concurrency without effect. * BUGFIX: the `rdb.parallel` parameter limits concurrency without effect.

@ -26,7 +26,8 @@ Please check out the `conf/redis-shake.conf` to see the detailed parameters desc
# Support # Support
--- ---
Redis version from 2.x to 5.0. Redis version from 2.x to 5.0.
Supports `Standalone`, `Cluster`, `Codis`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on. Supports `Standalone`, `Cluster`, and some proxies type like `Codis`, `twemproxy`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on.<br>
For `codis` and `twemproxy`, there maybe some constraints, please checkout this [question](https://github.com/alibaba/RedisShake/wiki/FAQ#q-does-redisshake-supports-codis-and-twemproxy).
# Configuration # Configuration
Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters. Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters.

@ -98,6 +98,9 @@ target.tls_enable = false
# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: # 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是:
# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 # ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2
target.rdb.output = local_dump target.rdb.output = local_dump
# some redis proxy like twemproxy doesn't support to fetch version, so please set it here.
# e.g., target.version = 4.0
target.version =
# use for expire key, set the time gap when source and target timestamp are not the same. # use for expire key, set the time gap when source and target timestamp are not the same.
# 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值 # 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值

@ -96,9 +96,9 @@ func parseAddress(tp, address, redisType string, isSource bool) error {
} }
} }
case conf.RedisTypeCluster: case conf.RedisTypeCluster:
if isSource == false && tp == conf.TypeRump { //if isSource == false && tp == conf.TypeRump {
return fmt.Errorf("target type[%v] can't be cluster when type is 'rump' currently", redisType) // return fmt.Errorf("target type[%v] can't be cluster when type is 'rump' currently", redisType)
} //}
if strings.Contains(address, AddressSplitter) { if strings.Contains(address, AddressSplitter) {
arr := strings.Split(address, AddressSplitter) arr := strings.Split(address, AddressSplitter)
if len(arr) != 2 { if len(arr) != 2 {

@ -8,9 +8,12 @@ import (
redigo "github.com/garyburd/redigo/redis" redigo "github.com/garyburd/redigo/redis"
) )
func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int) { func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int, preDb *int) {
if _, err := client.Do("select", db); err != nil { if db != *preDb {
log.Panicf("send select db[%v] failed[%v]", db, err) if _, err := client.Do("select", db); err != nil {
log.Panicf("send select db[%v] failed[%v]", db, err)
}
*preDb = db
} }
entry := rdb.BinEntry{ entry := rdb.BinEntry{

@ -15,7 +15,6 @@ type Configuration struct {
SourceAddress string `config:"source.address"` SourceAddress string `config:"source.address"`
SourcePasswordRaw string `config:"source.password_raw"` SourcePasswordRaw string `config:"source.password_raw"`
SourcePasswordEncoding string `config:"source.password_encoding"` SourcePasswordEncoding string `config:"source.password_encoding"`
SourceVersion uint `config:"source.version"`
SourceAuthType string `config:"source.auth_type"` SourceAuthType string `config:"source.auth_type"`
SourceTLSEnable bool `config:"source.tls_enable"` SourceTLSEnable bool `config:"source.tls_enable"`
SourceRdbInput []string `config:"source.rdb.input"` SourceRdbInput []string `config:"source.rdb.input"`
@ -24,12 +23,12 @@ type Configuration struct {
TargetAddress string `config:"target.address"` TargetAddress string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"` TargetPasswordEncoding string `config:"target.password_encoding"`
TargetVersion uint `config:"target.version"`
TargetDBString string `config:"target.db"` TargetDBString string `config:"target.db"`
TargetAuthType string `config:"target.auth_type"` TargetAuthType string `config:"target.auth_type"`
TargetType string `config:"target.type"` TargetType string `config:"target.type"`
TargetTLSEnable bool `config:"target.tls_enable"` TargetTLSEnable bool `config:"target.tls_enable"`
TargetRdbOutput string `config:"target.rdb.output"` TargetRdbOutput string `config:"target.rdb.output"`
TargetVersion string `config:"target.version"`
FakeTime string `config:"fake_time"` FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"` Rewrite bool `config:"rewrite"`
FilterDBWhitelist []string `config:"filter.db.whitelist"` FilterDBWhitelist []string `config:"filter.db.whitelist"`
@ -67,15 +66,14 @@ type Configuration struct {
/*---------------------------------------------------------*/ /*---------------------------------------------------------*/
// generated variables // generated variables
SourceAddressList []string // source address list SourceAddressList []string // source address list
TargetAddressList []string // target address list TargetAddressList []string // target address list
HeartbeatIp string // heartbeat ip HeartbeatIp string // heartbeat ip
ShiftTime time.Duration // shift ShiftTime time.Duration // shift
TargetRedisVersion string // to_redis_version TargetReplace bool // to_replace
TargetReplace bool // to_replace TargetDB int // int type
TargetDB int // int type Version string // version
Version string // version Type string // input mode -type=xxx
Type string // input mode -type=xxx
} }
var Options Configuration var Options Configuration

@ -409,21 +409,24 @@ func sanitizeOptions(tp string) error {
} }
if tp == conf.TypeRestore || tp == conf.TypeSync || tp == conf.TypeRump { if tp == conf.TypeRestore || tp == conf.TypeSync || tp == conf.TypeRump {
// get target redis version and set TargetReplace. if conf.Options.TargetVersion == "" {
for _, address := range conf.Options.TargetAddressList { // get target redis version and set TargetReplace.
// single connection even if the target is cluster for _, address := range conf.Options.TargetAddressList {
if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType, // single connection even if the target is cluster
conf.Options.TargetPasswordRaw, conf.Options.TargetTLSEnable); err != nil { if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType,
return fmt.Errorf("get target redis version failed[%v]", err) conf.Options.TargetPasswordRaw, conf.Options.TargetTLSEnable); err != nil {
} else if conf.Options.TargetRedisVersion != "" && conf.Options.TargetRedisVersion != v { return fmt.Errorf("get target redis version failed[%v]", err)
return fmt.Errorf("target redis version is different: [%v %v]", conf.Options.TargetRedisVersion, v) } else if conf.Options.TargetVersion != "" && conf.Options.TargetVersion != v {
} else { return fmt.Errorf("target redis version is different: [%v %v]", conf.Options.TargetVersion, v)
conf.Options.TargetRedisVersion = v } else {
conf.Options.TargetVersion = v
}
} }
} }
if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") ||
strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") || if strings.HasPrefix(conf.Options.TargetVersion, "4.") ||
strings.HasPrefix(conf.Options.TargetRedisVersion, "5.") { strings.HasPrefix(conf.Options.TargetVersion, "3.") ||
strings.HasPrefix(conf.Options.TargetVersion, "5.") {
conf.Options.TargetReplace = true conf.Options.TargetReplace = true
} else { } else {
conf.Options.TargetReplace = false conf.Options.TargetReplace = false
@ -445,6 +448,10 @@ func sanitizeOptions(tp string) error {
conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile)
} }
if conf.Options.ScanKeyNumber > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster {
return fmt.Errorf("scan.key_number should less than [%v] when target type is cluster", utils.RecvChanSize)
}
//if len(conf.Options.SourceAddressList) == 1 { //if len(conf.Options.SourceAddressList) == 1 {
// return fmt.Errorf("source address length should == 1 when type is 'rump'") // return fmt.Errorf("source address length should == 1 when type is 'rump'")
//} //}

@ -327,6 +327,7 @@ func (dre *dbRumperExecutor) writer() {
// used in QoS // used in QoS
bucket := utils.StartQoS(conf.Options.Qps) bucket := utils.StartQoS(conf.Options.Qps)
preDb := 0 preDb := 0
preBigKeyDb := 0
for ele := range dre.keyChan { for ele := range dre.keyChan {
if filter.FilterKey(ele.key) { if filter.FilterKey(ele.key) {
continue continue
@ -354,8 +355,7 @@ func (dre *dbRumperExecutor) writer() {
batch = dre.writeSend(batch, &count, &wBytes) batch = dre.writeSend(batch, &count, &wBytes)
// handle big key // handle big key
utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db) utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db, &preBigKeyDb)
// all the reply has been handled in RestoreBigkey // all the reply has been handled in RestoreBigkey
// dre.resultChan <- ele // dre.resultChan <- ele
continue continue

Loading…
Cancel
Save