From 877633dc5fccabaea5a0ef65a80232e41e22cfb8 Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 31 Jul 2019 16:26:05 +0800 Subject: [PATCH 1/5] polish conf --- conf/redis-shake.conf | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 81c5a06..b562038 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -23,6 +23,7 @@ http_profile = 9320 ncpu = 0 # parallel routines number used in RDB file syncing. default is 64. +# 启动多少个并发线程同步一个RDB文件。 parallel = 32 # source redis configuration. @@ -90,7 +91,8 @@ rdb.output = local_dump # used in `dump`, `sync` and `restore`. 0 means default. # 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(rdb.input)。 # 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会 -# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。 +# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕并进入增量,才会拉取第4个db节点的rdb, +# 以此类推,最后会有len(source.address)或者len(rdb.input)个增量线程同时存在。 rdb.parallel = 0 # for special cloud vendor: ucloud # ucloud集群版的rdb文件添加了slot前缀,进行特判剥离: ucloud_cluster。 From 8795662d5def3743b2d684cd3638a87a76d6ee31 Mon Sep 17 00:00:00 2001 From: vinllen Date: Thu, 1 Aug 2019 10:58:52 +0800 Subject: [PATCH 2/5] bugfix: source.parallel doesn't work --- conf/redis-shake.conf | 37 ++++++++++----------- src/redis-shake/common/common.go | 7 ++-- src/redis-shake/common/utils.go | 2 +- src/redis-shake/configure/configure.go | 11 ++++--- src/redis-shake/decode.go | 6 ++-- src/redis-shake/dump.go | 4 +-- src/redis-shake/main/main.go | 45 +++++++++++++++----------- src/redis-shake/restore.go | 12 +++---- src/redis-shake/sync.go | 2 +- 9 files changed, 68 insertions(+), 58 deletions(-) diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index b562038..bdaa0ac 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -50,6 +50,24 @@ source.auth_type = auth # tls enable, true or false. Currently, only support standalone. # open source redis does NOT support tls so far, but some cloud versions do. source.tls_enable = false +# input RDB file. +# used in `decode` and `restore`. +# if the input is list split by semicolon(;), redis-shake will restore the list one by one. +# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 +# redis-shake将会挨个进行恢复。 +source.rdb.input = local +# the concurrence of RDB syncing, default is len(source.address) or len(source.rdb.input). +# used in `dump`, `sync` and `restore`. 0 means default. +# This is useless when source.type isn't cluster or only input is only one RDB. +# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(source.rdb.input)。 +# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会 +# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕并进入增量,才会拉取第4个db节点的rdb, +# 以此类推,最后会有len(source.address)或者len(rdb.input)个增量线程同时存在。 +source.rdb.parallel = 0 +# for special cloud vendor: ucloud +# used in `decode` and `restore`. +# ucloud集群版的rdb文件添加了slot前缀,进行特判剥离: ucloud_cluster。 +source.rdb.special_cloud = # target redis configuration. used in `restore`, `sync` and `rump`. # the type of target redis can be "standalone", "proxy" or "cluster". @@ -75,28 +93,11 @@ target.db = -1 # tls enable, true or false. Currently, only support standalone. # open source redis does NOT support tls so far, but some cloud versions do. target.tls_enable = false - -# input RDB file. -# used in `decode` and `restore`. -# if the input is list split by semicolon(;), redis-shake will restore the list one by one. -# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 -# redis-shake将会挨个进行恢复。 -rdb.input = local # output RDB file prefix. # used in `decode` and `dump`. # 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: # ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 -rdb.output = local_dump -# the concurrence of fetching data, default is len(source.address) or len(rdb.input). -# used in `dump`, `sync` and `restore`. 0 means default. -# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(rdb.input)。 -# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会 -# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕并进入增量,才会拉取第4个db节点的rdb, -# 以此类推,最后会有len(source.address)或者len(rdb.input)个增量线程同时存在。 -rdb.parallel = 0 -# for special cloud vendor: ucloud -# ucloud集群版的rdb文件添加了slot前缀,进行特判剥离: ucloud_cluster。 -rdb.special_cloud = +target.rdb.output = local_dump # use for expire key, set the time gap when source and target timestamp are not the same. # 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值 diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 8c1e7c8..2543f12 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -92,11 +92,12 @@ func ParseInfo(content []byte) map[string]string { } func GetTotalLink() int { - if len(conf.Options.SourceAddressList) != 0 { + if conf.Options.Type == conf.TypeSync || conf.Options.Type == conf.TypeRump || conf.Options.Type == conf.TypeDump { return len(conf.Options.SourceAddressList) - } else { - return len(conf.Options.RdbInput) + } else if conf.Options.Type == conf.TypeDecode || conf.Options.Type == conf.TypeRestore { + return len(conf.Options.SourceRdbInput) } + return 0 } func PickTargetRoundRobin(n int) int { diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 9913998..779a205 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -688,7 +688,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { * for ucloud, special judge. * 046110.key -> key */ - if conf.Options.RdbSpecialCloud == UCloudCluster { + if conf.Options.SourceRdbSpecialCloud == UCloudCluster { e.Key = e.Key[7:] } diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 0ab42bb..2c9e15a 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -17,8 +17,10 @@ type Configuration struct { SourcePasswordEncoding string `config:"source.password_encoding"` SourceVersion uint `config:"source.version"` SourceAuthType string `config:"source.auth_type"` - SourceParallel uint `config:"source.parallel"` SourceTLSEnable bool `config:"source.tls_enable"` + SourceRdbInput []string `config:"source.rdb.input"` + SourceRdbParallel int `config:"source.rdb.parallel"` + SourceRdbSpecialCloud string `config:"source.rdb.special_cloud"` TargetAddress string `config:"target.address"` TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordEncoding string `config:"target.password_encoding"` @@ -27,10 +29,7 @@ type Configuration struct { TargetAuthType string `config:"target.auth_type"` TargetType string `config:"target.type"` TargetTLSEnable bool `config:"target.tls_enable"` - RdbInput []string `config:"rdb.input"` - RdbOutput string `config:"rdb.output"` - RdbParallel int `config:"rdb.parallel"` - RdbSpecialCloud string `config:"rdb.special_cloud"` + TargetRdbOutput string `config:"target.rdb.output"` FakeTime string `config:"fake_time"` Rewrite bool `config:"rewrite"` FilterDBWhitelist []string `config:"filter.db.whitelist"` @@ -57,6 +56,7 @@ type Configuration struct { ScanKeyFile string `config:"scan.key_file"` Qps int `config:"qps"` + /*---------------------------------------------------------*/ // inner variables ReplaceHashTag bool `config:"replace_hash_tag"` ExtraInfo bool `config:"extra"` @@ -75,6 +75,7 @@ type Configuration struct { TargetReplace bool // to_replace TargetDB int // int type Version string // version + Type string // input mode -type=xxx } var Options Configuration diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index 3aa12fd..68c7e29 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -40,11 +40,11 @@ func (cmd *CmdDecode) GetDetailedInfo() interface{} { } func (cmd *CmdDecode) Main() { - log.Infof("decode from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.RdbOutput) + log.Infof("decode from '%s' to '%s'\n", conf.Options.SourceRdbInput, conf.Options.TargetRdbOutput) - for i, input := range conf.Options.RdbInput { + for i, input := range conf.Options.SourceRdbInput { // decode one by one. By now, we don't support decoding concurrence. - output := fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i) + output := fmt.Sprintf("%s.%d", conf.Options.TargetRdbOutput, i) cmd.decode(input, output) } diff --git a/src/redis-shake/dump.go b/src/redis-shake/dump.go index 5d4e146..394a45b 100644 --- a/src/redis-shake/dump.go +++ b/src/redis-shake/dump.go @@ -37,7 +37,7 @@ func (cmd *CmdDump) Main() { nd := node{ id: i, source: source, - output: fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i), + output: fmt.Sprintf("%s.%d", conf.Options.TargetRdbOutput, i), } cmd.dumpChan <- nd } @@ -49,7 +49,7 @@ func (cmd *CmdDump) Main() { wg sync.WaitGroup ) wg.Add(len(conf.Options.SourceAddressList)) - for i := 0; i < int(conf.Options.SourceParallel); i++ { + for i := 0; i < int(conf.Options.SourceRdbParallel); i++ { go func(idx int) { log.Infof("start routine[%v]", idx) for { diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 676d3ad..d1565e4 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -66,6 +66,7 @@ func main() { } conf.Options.Version = utils.Version + conf.Options.Type = *tp var file *os.File if file, err = os.Open(*configuration); err != nil { @@ -219,31 +220,37 @@ func sanitizeOptions(tp string) error { return fmt.Errorf("mode[%v] parse address failed[%v]", tp, err) } - if (tp == conf.TypeRestore || tp == conf.TypeDecode) && len(conf.Options.RdbInput) == 0 { - return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}") - } - if tp == conf.TypeDump && conf.Options.RdbOutput == "" { - conf.Options.RdbOutput = "output-rdb-dump" - } - - if conf.Options.RdbParallel == 0 { - if tp == conf.TypeDump || tp == conf.TypeSync { - conf.Options.RdbParallel = len(conf.Options.SourceAddressList) - } else if tp == conf.TypeRestore { - conf.Options.RdbParallel = len(conf.Options.RdbInput) + if tp == conf.TypeRestore || tp == conf.TypeDecode { + if len(conf.Options.SourceRdbInput) == 0 { + return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}") + } + // check file exist + for _, rdb := range conf.Options.SourceRdbInput { + if _, err := os.Stat(rdb); os.IsNotExist(err) { + return fmt.Errorf("input rdb file[%v] not exists", rdb) + } } } - - if tp == conf.TypeRestore && conf.Options.RdbParallel > len(conf.Options.RdbInput) { - conf.Options.RdbParallel = len(conf.Options.RdbInput) + if tp == conf.TypeDump && conf.Options.TargetRdbOutput == "" { + conf.Options.TargetRdbOutput = "output-rdb-dump" } - if conf.Options.RdbSpecialCloud != "" && conf.Options.RdbSpecialCloud != utils.UCloudCluster { - return fmt.Errorf("rdb special cloud type[%s] is not supported", conf.Options.RdbSpecialCloud) + if tp == conf.TypeDump || tp == conf.TypeSync { + if conf.Options.SourceRdbParallel <= 0 { + conf.Options.SourceRdbParallel = len(conf.Options.SourceAddressList) + } else if conf.Options.SourceRdbParallel > len(conf.Options.SourceAddressList) { + conf.Options.SourceRdbParallel = len(conf.Options.SourceAddressList) + } + } else if tp == conf.TypeRestore || tp == conf.TypeDecode { + if conf.Options.SourceRdbParallel <= 0 { + conf.Options.SourceRdbParallel = len(conf.Options.SourceRdbInput) + } else if conf.Options.SourceRdbParallel > len(conf.Options.SourceRdbInput) { + conf.Options.SourceRdbParallel = len(conf.Options.SourceRdbInput) + } } - if conf.Options.SourceParallel == 0 || conf.Options.SourceParallel > uint(len(conf.Options.SourceAddressList)) { - conf.Options.SourceParallel = uint(len(conf.Options.SourceAddressList)) + if conf.Options.SourceRdbSpecialCloud != "" && conf.Options.SourceRdbSpecialCloud != utils.UCloudCluster { + return fmt.Errorf("rdb special cloud type[%s] is not supported", conf.Options.SourceRdbSpecialCloud) } if conf.Options.LogFile != "" { diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index 7102953..708e2af 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -36,7 +36,7 @@ func (cmd *CmdRestore) GetDetailedInfo() interface{} { } func (cmd *CmdRestore) Main() { - log.Infof("restore from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.TargetAddressList) + log.Infof("restore from '%s' to '%s'\n", conf.Options.SourceRdbInput, conf.Options.TargetAddressList) type restoreNode struct { id int @@ -45,14 +45,14 @@ func (cmd *CmdRestore) Main() { base.Status = "waitRestore" total := utils.GetTotalLink() restoreChan := make(chan restoreNode, total) - - for i, rdb := range conf.Options.RdbInput { + + for i, rdb := range conf.Options.SourceRdbInput { restoreChan <- restoreNode{id: i, input: rdb} } var wg sync.WaitGroup - wg.Add(len(conf.Options.RdbInput)) - for i := 0; i < conf.Options.RdbParallel; i++ { + wg.Add(len(conf.Options.SourceRdbInput)) + for i := 0; i < conf.Options.SourceRdbParallel; i++ { go func() { for { node, ok := <-restoreChan @@ -87,7 +87,7 @@ func (cmd *CmdRestore) Main() { wg.Wait() close(restoreChan) - log.Infof("restore from '%s' to '%s' done", conf.Options.RdbInput, conf.Options.TargetAddressList) + log.Infof("restore from '%s' to '%s' done", conf.Options.SourceRdbInput, conf.Options.TargetAddressList) if conf.Options.HttpProfile != -1 { //fake status if set http_port. and wait forever base.Status = "incr" diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index a5a12ef..4e6cdd6 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -105,7 +105,7 @@ func (cmd *CmdSync) Main() { var wg sync.WaitGroup wg.Add(len(conf.Options.SourceAddressList)) - for i := 0; i < int(conf.Options.SourceParallel); i++ { + for i := 0; i < int(conf.Options.SourceRdbParallel); i++ { go func() { for { nd, ok := <-syncChan From d2416735e02fc71dd6486d37e517fea08f6873ac Mon Sep 17 00:00:00 2001 From: vinllen Date: Thu, 1 Aug 2019 11:02:56 +0800 Subject: [PATCH 3/5] release v1.6.14 --- ChangeLog | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ChangeLog b/ChangeLog index 90a3f6b..2092749 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +2019-08-01 Alibaba Cloud. + * VERSION: 1.6.14 + * BUGFIX: the `rdb.parallel` parameter limits concurrency without effect. + see #133 + * BUGFIX: call `select` when target redis type is cluster in `rump` mode. + * IMPROVE: add `http_profile = -1` to exit once finish rdb syncing in + `restore` mode. + * IMPROVE: 'info xxx' command isn't supported in codis, used 'info' and + parse 'xxx'. 2019-07-24 Alibaba Cloud. * VERSION: 1.6.13 * IMPROVE: support `filter.db.whitelist` and `filter.db.blacklist` to let From 29fb1fb2eff660f1a5ee4b6495b66f4c9daf8d49 Mon Sep 17 00:00:00 2001 From: vinllen Date: Thu, 1 Aug 2019 11:04:42 +0800 Subject: [PATCH 4/5] release v1.6.14 2 --- ChangeLog | 1 + 1 file changed, 1 insertion(+) diff --git a/ChangeLog b/ChangeLog index 2092749..82e13b6 100644 --- a/ChangeLog +++ b/ChangeLog @@ -7,6 +7,7 @@ `restore` mode. * IMPROVE: 'info xxx' command isn't supported in codis, used 'info' and parse 'xxx'. + * IMPROVE: rename `rdb.xx` to `source.rdb.xx` or `target.rdb.xx`. 2019-07-24 Alibaba Cloud. * VERSION: 1.6.13 * IMPROVE: support `filter.db.whitelist` and `filter.db.blacklist` to let From f7d91fb3572d3f7eafd53a2826e96dfb67a99143 Mon Sep 17 00:00:00 2001 From: vinllen Date: Thu, 1 Aug 2019 11:06:56 +0800 Subject: [PATCH 5/5] release v1.6.14 3 --- src/redis-shake/main/main.go | 8 ++------ src/redis-shake/restore.go | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index d1565e4..89fcb72 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -236,15 +236,11 @@ func sanitizeOptions(tp string) error { } if tp == conf.TypeDump || tp == conf.TypeSync { - if conf.Options.SourceRdbParallel <= 0 { - conf.Options.SourceRdbParallel = len(conf.Options.SourceAddressList) - } else if conf.Options.SourceRdbParallel > len(conf.Options.SourceAddressList) { + if conf.Options.SourceRdbParallel <= 0 || conf.Options.SourceRdbParallel > len(conf.Options.SourceAddressList) { conf.Options.SourceRdbParallel = len(conf.Options.SourceAddressList) } } else if tp == conf.TypeRestore || tp == conf.TypeDecode { - if conf.Options.SourceRdbParallel <= 0 { - conf.Options.SourceRdbParallel = len(conf.Options.SourceRdbInput) - } else if conf.Options.SourceRdbParallel > len(conf.Options.SourceRdbInput) { + if conf.Options.SourceRdbParallel <= 0 || conf.Options.SourceRdbParallel > len(conf.Options.SourceRdbInput) { conf.Options.SourceRdbParallel = len(conf.Options.SourceRdbInput) } } diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index 708e2af..4c000f8 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -45,7 +45,7 @@ func (cmd *CmdRestore) Main() { base.Status = "waitRestore" total := utils.GetTotalLink() restoreChan := make(chan restoreNode, total) - + for i, rdb := range conf.Options.SourceRdbInput { restoreChan <- restoreNode{id: i, input: rdb} }