diff --git a/ChangeLog b/ChangeLog index 90a3f6b..82e13b6 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,13 @@ +2019-08-01 Alibaba Cloud. + * VERSION: 1.6.14 + * BUGFIX: the `rdb.parallel` parameter limits concurrency without effect. + see #133 + * BUGFIX: call `select` when target redis type is cluster in `rump` mode. + * IMPROVE: add `http_profile = -1` to exit once finish rdb syncing in + `restore` mode. + * IMPROVE: 'info xxx' command isn't supported in codis, used 'info' and + parse 'xxx'. + * IMPROVE: rename `rdb.xx` to `source.rdb.xx` or `target.rdb.xx`. 2019-07-24 Alibaba Cloud. * VERSION: 1.6.13 * IMPROVE: support `filter.db.whitelist` and `filter.db.blacklist` to let diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index b562038..bdaa0ac 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -50,6 +50,24 @@ source.auth_type = auth # tls enable, true or false. Currently, only support standalone. # open source redis does NOT support tls so far, but some cloud versions do. source.tls_enable = false +# input RDB file. +# used in `decode` and `restore`. +# if the input is list split by semicolon(;), redis-shake will restore the list one by one. +# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 +# redis-shake将会挨个进行恢复。 +source.rdb.input = local +# the concurrence of RDB syncing, default is len(source.address) or len(source.rdb.input). +# used in `dump`, `sync` and `restore`. 0 means default. +# This is useless when source.type isn't cluster or only input is only one RDB. +# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(source.rdb.input)。 +# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会 +# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕并进入增量,才会拉取第4个db节点的rdb, +# 以此类推,最后会有len(source.address)或者len(rdb.input)个增量线程同时存在。 +source.rdb.parallel = 0 +# for special cloud vendor: ucloud +# used in `decode` and `restore`. +# ucloud集群版的rdb文件添加了slot前缀,进行特判剥离: ucloud_cluster。 +source.rdb.special_cloud = # target redis configuration. used in `restore`, `sync` and `rump`. # the type of target redis can be "standalone", "proxy" or "cluster". @@ -75,28 +93,11 @@ target.db = -1 # tls enable, true or false. Currently, only support standalone. # open source redis does NOT support tls so far, but some cloud versions do. target.tls_enable = false - -# input RDB file. -# used in `decode` and `restore`. -# if the input is list split by semicolon(;), redis-shake will restore the list one by one. -# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 -# redis-shake将会挨个进行恢复。 -rdb.input = local # output RDB file prefix. # used in `decode` and `dump`. # 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: # ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 -rdb.output = local_dump -# the concurrence of fetching data, default is len(source.address) or len(rdb.input). -# used in `dump`, `sync` and `restore`. 0 means default. -# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(rdb.input)。 -# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会 -# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕并进入增量,才会拉取第4个db节点的rdb, -# 以此类推,最后会有len(source.address)或者len(rdb.input)个增量线程同时存在。 -rdb.parallel = 0 -# for special cloud vendor: ucloud -# ucloud集群版的rdb文件添加了slot前缀,进行特判剥离: ucloud_cluster。 -rdb.special_cloud = +target.rdb.output = local_dump # use for expire key, set the time gap when source and target timestamp are not the same. # 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值 diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 8c1e7c8..2543f12 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -92,11 +92,12 @@ func ParseInfo(content []byte) map[string]string { } func GetTotalLink() int { - if len(conf.Options.SourceAddressList) != 0 { + if conf.Options.Type == conf.TypeSync || conf.Options.Type == conf.TypeRump || conf.Options.Type == conf.TypeDump { return len(conf.Options.SourceAddressList) - } else { - return len(conf.Options.RdbInput) + } else if conf.Options.Type == conf.TypeDecode || conf.Options.Type == conf.TypeRestore { + return len(conf.Options.SourceRdbInput) } + return 0 } func PickTargetRoundRobin(n int) int { diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 9913998..779a205 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -688,7 +688,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { * for ucloud, special judge. * 046110.key -> key */ - if conf.Options.RdbSpecialCloud == UCloudCluster { + if conf.Options.SourceRdbSpecialCloud == UCloudCluster { e.Key = e.Key[7:] } diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 0ab42bb..2c9e15a 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -17,8 +17,10 @@ type Configuration struct { SourcePasswordEncoding string `config:"source.password_encoding"` SourceVersion uint `config:"source.version"` SourceAuthType string `config:"source.auth_type"` - SourceParallel uint `config:"source.parallel"` SourceTLSEnable bool `config:"source.tls_enable"` + SourceRdbInput []string `config:"source.rdb.input"` + SourceRdbParallel int `config:"source.rdb.parallel"` + SourceRdbSpecialCloud string `config:"source.rdb.special_cloud"` TargetAddress string `config:"target.address"` TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordEncoding string `config:"target.password_encoding"` @@ -27,10 +29,7 @@ type Configuration struct { TargetAuthType string `config:"target.auth_type"` TargetType string `config:"target.type"` TargetTLSEnable bool `config:"target.tls_enable"` - RdbInput []string `config:"rdb.input"` - RdbOutput string `config:"rdb.output"` - RdbParallel int `config:"rdb.parallel"` - RdbSpecialCloud string `config:"rdb.special_cloud"` + TargetRdbOutput string `config:"target.rdb.output"` FakeTime string `config:"fake_time"` Rewrite bool `config:"rewrite"` FilterDBWhitelist []string `config:"filter.db.whitelist"` @@ -57,6 +56,7 @@ type Configuration struct { ScanKeyFile string `config:"scan.key_file"` Qps int `config:"qps"` + /*---------------------------------------------------------*/ // inner variables ReplaceHashTag bool `config:"replace_hash_tag"` ExtraInfo bool `config:"extra"` @@ -75,6 +75,7 @@ type Configuration struct { TargetReplace bool // to_replace TargetDB int // int type Version string // version + Type string // input mode -type=xxx } var Options Configuration diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index 3aa12fd..68c7e29 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -40,11 +40,11 @@ func (cmd *CmdDecode) GetDetailedInfo() interface{} { } func (cmd *CmdDecode) Main() { - log.Infof("decode from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.RdbOutput) + log.Infof("decode from '%s' to '%s'\n", conf.Options.SourceRdbInput, conf.Options.TargetRdbOutput) - for i, input := range conf.Options.RdbInput { + for i, input := range conf.Options.SourceRdbInput { // decode one by one. By now, we don't support decoding concurrence. - output := fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i) + output := fmt.Sprintf("%s.%d", conf.Options.TargetRdbOutput, i) cmd.decode(input, output) } diff --git a/src/redis-shake/dump.go b/src/redis-shake/dump.go index 5d4e146..394a45b 100644 --- a/src/redis-shake/dump.go +++ b/src/redis-shake/dump.go @@ -37,7 +37,7 @@ func (cmd *CmdDump) Main() { nd := node{ id: i, source: source, - output: fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i), + output: fmt.Sprintf("%s.%d", conf.Options.TargetRdbOutput, i), } cmd.dumpChan <- nd } @@ -49,7 +49,7 @@ func (cmd *CmdDump) Main() { wg sync.WaitGroup ) wg.Add(len(conf.Options.SourceAddressList)) - for i := 0; i < int(conf.Options.SourceParallel); i++ { + for i := 0; i < int(conf.Options.SourceRdbParallel); i++ { go func(idx int) { log.Infof("start routine[%v]", idx) for { diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 676d3ad..89fcb72 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -66,6 +66,7 @@ func main() { } conf.Options.Version = utils.Version + conf.Options.Type = *tp var file *os.File if file, err = os.Open(*configuration); err != nil { @@ -219,31 +220,33 @@ func sanitizeOptions(tp string) error { return fmt.Errorf("mode[%v] parse address failed[%v]", tp, err) } - if (tp == conf.TypeRestore || tp == conf.TypeDecode) && len(conf.Options.RdbInput) == 0 { - return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}") - } - if tp == conf.TypeDump && conf.Options.RdbOutput == "" { - conf.Options.RdbOutput = "output-rdb-dump" - } - - if conf.Options.RdbParallel == 0 { - if tp == conf.TypeDump || tp == conf.TypeSync { - conf.Options.RdbParallel = len(conf.Options.SourceAddressList) - } else if tp == conf.TypeRestore { - conf.Options.RdbParallel = len(conf.Options.RdbInput) + if tp == conf.TypeRestore || tp == conf.TypeDecode { + if len(conf.Options.SourceRdbInput) == 0 { + return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}") + } + // check file exist + for _, rdb := range conf.Options.SourceRdbInput { + if _, err := os.Stat(rdb); os.IsNotExist(err) { + return fmt.Errorf("input rdb file[%v] not exists", rdb) + } } } - - if tp == conf.TypeRestore && conf.Options.RdbParallel > len(conf.Options.RdbInput) { - conf.Options.RdbParallel = len(conf.Options.RdbInput) + if tp == conf.TypeDump && conf.Options.TargetRdbOutput == "" { + conf.Options.TargetRdbOutput = "output-rdb-dump" } - if conf.Options.RdbSpecialCloud != "" && conf.Options.RdbSpecialCloud != utils.UCloudCluster { - return fmt.Errorf("rdb special cloud type[%s] is not supported", conf.Options.RdbSpecialCloud) + if tp == conf.TypeDump || tp == conf.TypeSync { + if conf.Options.SourceRdbParallel <= 0 || conf.Options.SourceRdbParallel > len(conf.Options.SourceAddressList) { + conf.Options.SourceRdbParallel = len(conf.Options.SourceAddressList) + } + } else if tp == conf.TypeRestore || tp == conf.TypeDecode { + if conf.Options.SourceRdbParallel <= 0 || conf.Options.SourceRdbParallel > len(conf.Options.SourceRdbInput) { + conf.Options.SourceRdbParallel = len(conf.Options.SourceRdbInput) + } } - if conf.Options.SourceParallel == 0 || conf.Options.SourceParallel > uint(len(conf.Options.SourceAddressList)) { - conf.Options.SourceParallel = uint(len(conf.Options.SourceAddressList)) + if conf.Options.SourceRdbSpecialCloud != "" && conf.Options.SourceRdbSpecialCloud != utils.UCloudCluster { + return fmt.Errorf("rdb special cloud type[%s] is not supported", conf.Options.SourceRdbSpecialCloud) } if conf.Options.LogFile != "" { diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index 7102953..4c000f8 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -36,7 +36,7 @@ func (cmd *CmdRestore) GetDetailedInfo() interface{} { } func (cmd *CmdRestore) Main() { - log.Infof("restore from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.TargetAddressList) + log.Infof("restore from '%s' to '%s'\n", conf.Options.SourceRdbInput, conf.Options.TargetAddressList) type restoreNode struct { id int @@ -46,13 +46,13 @@ func (cmd *CmdRestore) Main() { total := utils.GetTotalLink() restoreChan := make(chan restoreNode, total) - for i, rdb := range conf.Options.RdbInput { + for i, rdb := range conf.Options.SourceRdbInput { restoreChan <- restoreNode{id: i, input: rdb} } var wg sync.WaitGroup - wg.Add(len(conf.Options.RdbInput)) - for i := 0; i < conf.Options.RdbParallel; i++ { + wg.Add(len(conf.Options.SourceRdbInput)) + for i := 0; i < conf.Options.SourceRdbParallel; i++ { go func() { for { node, ok := <-restoreChan @@ -87,7 +87,7 @@ func (cmd *CmdRestore) Main() { wg.Wait() close(restoreChan) - log.Infof("restore from '%s' to '%s' done", conf.Options.RdbInput, conf.Options.TargetAddressList) + log.Infof("restore from '%s' to '%s' done", conf.Options.SourceRdbInput, conf.Options.TargetAddressList) if conf.Options.HttpProfile != -1 { //fake status if set http_port. and wait forever base.Status = "incr" diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index a5a12ef..4e6cdd6 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -105,7 +105,7 @@ func (cmd *CmdSync) Main() { var wg sync.WaitGroup wg.Add(len(conf.Options.SourceAddressList)) - for i := 0; i < int(conf.Options.SourceParallel); i++ { + for i := 0; i < int(conf.Options.SourceRdbParallel); i++ { go func() { for { nd, ok := <-syncChan