From 6a33d25a4e309a925b6ea9d3eb4aa43873c48338 Mon Sep 17 00:00:00 2001 From: vinllen Date: Sun, 21 Apr 2019 22:16:55 +0800 Subject: [PATCH] release v1.4: support rump type to using dump+restore command to syncing data --- ChangeLog | 4 + README.md | 3 +- conf/redis-shake.conf | 35 +++-- src/redis-shake/common/mix.go | 4 +- src/redis-shake/configure/configure.go | 79 +++++------ src/redis-shake/main/main.go | 47 ++++++- src/redis-shake/rump.go | 173 +++++++++++++++++++++++++ src/redis-shake/sync.go | 1 + 8 files changed, 296 insertions(+), 50 deletions(-) create mode 100644 src/redis-shake/rump.go diff --git a/ChangeLog b/ChangeLog index c2d4540..b0fa823 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +2019-04-21 Alibaba Cloud. + * version: 1.4.0 + * IMPROVE: support "rump" type to syncing data when `sync` and `psync` + commands are not supported. 2019-04-13 Alibaba Cloud. * version: 1.2.3 * IMPROVE: polish log print to print more error info. diff --git a/README.md b/README.md index 12d11f4..43f7b80 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,8 @@ The type can be one of the followings:
* **decode**: Decode dumped payload to human readable format (hex-encoding). * **restore**: Restore RDB file to target redis. * **dump**: Dump RDB file from souce redis. -* **sync**: Sync data from source redis to target redis. +* **sync**: Sync data from source redis to target redis by `sync` or `psync` command. Including full synchronization and incremental synchronization. +* **rump**: Sync data from source redis to target redis by `scan` command. Only support full synchronization. Please check out the `conf/redis-shake.conf` to see the detailed parameters description.
diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index d27cbe1..6148365 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -1,9 +1,13 @@ # this is the configuration of redis-shake. +# if you have any problem, please visit https://github.com/alibaba/RedisShake/wiki/FAQ # id id = redis-shake + # log file,日志文件,不配置将打印到stdout (e.g. /var/log/redis-shake.log ) -log_file = +log.file = +# log level: "none", "error", "warn", "info", "all". default is "info". +log.level = info # pid path,进程文件存储地址(e.g. /var/run/),不配置将默认输出到执行下面, # 注意这个是目录,真正的pid是`{pid_path}/{id}.pid` pid_path = @@ -30,7 +34,7 @@ input_rdb = local output_rdb = local_dump # source redis configuration. -# used in `dump` and `sync`. +# used in `dump`, `sync` and `rump`. # ip:port # 源redis地址 source.address = 127.0.0.1:20441 @@ -38,11 +42,9 @@ source.address = 127.0.0.1:20441 source.password_raw = 123456 # auth type, don't modify it source.auth_type = auth -# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0) -source.version = 9 # target redis configuration. used in `restore` and `sync`. -# used in `restore` and `sync`. +# used in `restore`, `sync` and `rump`. # ip:port # 目的redis地址 target.address = 127.0.0.1:20551 @@ -50,8 +52,6 @@ target.address = 127.0.0.1:20551 target.password_raw = 123456 # auth type, don't modify it target.auth_type = auth -# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0) -target.version = 6 # all the data will come into this db. < 0 means disable. # used in `restore` and `sync`. target.db = -1 @@ -61,7 +61,7 @@ target.db = -1 fake_time = # force rewrite when destination restore has the key -# used in `restore` and `sync`. +# used in `restore`, `sync` and `rump`. # 当源目的有重复key,是否进行覆写 rewrite = true @@ -137,8 +137,25 @@ sender.delay_channel_size = 65535 # TCP keep-alive保活参数,单位秒,0表示不启用。 keep_alive = 0 +# used in `rump`. +# number of keys captured each time. default is 100. +# 每次scan的个数,不配置则默认100. +scan.key_number = 50 + +# used in `rump`. +# we support some special redis types that don't use default `scan` command like alibaba cloud and tencent cloud. +# 有些版本具有特殊的格式,与普通的scan命令有所不同,我们进行了特殊的适配。目前支持腾讯云的集群版"tencent_cluster" +# 和阿里云的集群版"aliyun_cluster"。 +scan.special_cloud = +# 如果源端是腾讯云的集群版,那么需要传入不同子节点的id(通过`cluster nodes`命令),以数组形式表示(分号分割)。 +# shake会"串行"进行遍历并抓取。例如:"25b21f1836026bd49c52b2d10e09fbf8c6aa1fdc;da6041781b5d7fe21404811d430cdffea2bf84de" +# 具体请参考:https://cloud.tencent.com/document/product/239/18336 中的"自定义命令"小节。 +scan.special_cloud.tencent.urls = +# 如果源端是阿里云的集群版,那么需要传入子节点的个数。例如:16 +scan.special_cloud.aliyun.node_number = + # ----------------splitter---------------- -# below variables are useless for current opensource version so don't set. +# below variables are useless for current open source version so don't set. # replace hash tag. # used in `sync`. diff --git a/src/redis-shake/common/mix.go b/src/redis-shake/common/mix.go index 1f5c0e0..a17daec 100644 --- a/src/redis-shake/common/mix.go +++ b/src/redis-shake/common/mix.go @@ -58,8 +58,8 @@ func Welcome() { / / (o) ------------------------------ ` - - log.Warn("\n", welcome) + startMsg := "if you have any problem, please visit https://github.com/alibaba/RedisShake/wiki/FAQ" + log.Warnf("\n%s%s\n\n", welcome, startMsg) } func Goodbye() { diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index bff85bb..ac95450 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -4,43 +4,48 @@ import "time" type Configuration struct { // config file variables - Id string `config:"id"` - LogFile string `config:"log_file"` - SystemProfile int `config:"system_profile"` - HttpProfile int `config:"http_profile"` - NCpu int `config:"ncpu"` - Parallel int `config:"parallel"` - InputRdb string `config:"input_rdb"` - OutputRdb string `config:"output_rdb"` - SourceAddress string `config:"source.address"` - SourcePasswordRaw string `config:"source.password_raw"` - SourcePasswordEncoding string `config:"source.password_encoding"` - SourceVersion uint `config:"source.version"` - SourceAuthType string `config:"source.auth_type"` - TargetAddress string `config:"target.address"` - TargetPasswordRaw string `config:"target.password_raw"` - TargetPasswordEncoding string `config:"target.password_encoding"` - TargetVersion uint `config:"target.version"` - TargetDB int `config:"target.db"` - TargetAuthType string `config:"target.auth_type"` - FakeTime string `config:"fake_time"` - Rewrite bool `config:"rewrite"` - FilterDB string `config:"filter.db"` - FilterKey []string `config:"filter.key"` - FilterSlot []string `config:"filter.slot"` - BigKeyThreshold uint64 `config:"big_key_threshold"` - Psync bool `config:"psync"` - Metric bool `config:"metric"` - MetricPrintLog bool `config:"metric.print_log"` - HeartbeatUrl string `config:"heartbeat.url"` - HeartbeatInterval uint `config:"heartbeat.interval"` - HeartbeatExternal string `config:"heartbeat.external"` - HeartbeatNetworkInterface string `config:"heartbeat.network_interface"` - SenderSize uint64 `config:"sender.size"` - SenderCount uint `config:"sender.count"` - SenderDelayChannelSize uint `config:"sender.delay_channel_size"` - KeepAlive uint `config:"keep_alive"` - PidPath string `config:"pid_path"` + Id string `config:"id"` + LogFile string `config:"log.file"` + LogLevel string `config:"log.level"` + SystemProfile int `config:"system_profile"` + HttpProfile int `config:"http_profile"` + NCpu int `config:"ncpu"` + Parallel int `config:"parallel"` + InputRdb string `config:"input_rdb"` + OutputRdb string `config:"output_rdb"` + SourceAddress string `config:"source.address"` + SourcePasswordRaw string `config:"source.password_raw"` + SourcePasswordEncoding string `config:"source.password_encoding"` + SourceVersion uint `config:"source.version"` + SourceAuthType string `config:"source.auth_type"` + TargetAddress string `config:"target.address"` + TargetPasswordRaw string `config:"target.password_raw"` + TargetPasswordEncoding string `config:"target.password_encoding"` + TargetVersion uint `config:"target.version"` + TargetDB int `config:"target.db"` + TargetAuthType string `config:"target.auth_type"` + FakeTime string `config:"fake_time"` + Rewrite bool `config:"rewrite"` + FilterDB string `config:"filter.db"` + FilterKey []string `config:"filter.key"` + FilterSlot []string `config:"filter.slot"` + BigKeyThreshold uint64 `config:"big_key_threshold"` + Psync bool `config:"psync"` + Metric bool `config:"metric"` + MetricPrintLog bool `config:"metric.print_log"` + HeartbeatUrl string `config:"heartbeat.url"` + HeartbeatInterval uint `config:"heartbeat.interval"` + HeartbeatExternal string `config:"heartbeat.external"` + HeartbeatNetworkInterface string `config:"heartbeat.network_interface"` + SenderSize uint64 `config:"sender.size"` + SenderCount uint `config:"sender.count"` + SenderDelayChannelSize uint `config:"sender.delay_channel_size"` + KeepAlive uint `config:"keep_alive"` + PidPath string `config:"pid_path"` + ScanKeyNumber uint32 `config:"scan.key_number"` + ScanSpecialCloud string `config:"scan.special_cloud"` + ScanSpecialCloudTencentUrls string `config:"scan.special_cloud.tencent.urls"` + ScanSpecialCloudAliyunNodeNumber uint8 `config:"scan.special_cloud.aliyun.node_number"` // inner variables ReplaceHashTag bool `config:"replace_hash_tag"` diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index d22dc11..b2c6d4d 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -39,6 +39,7 @@ const ( TypeRestore = "restore" TypeDump = "dump" TypeSync = "sync" + TypeRump = "rump" defaultHttpPort = 20881 defaultSystemPort = 20882 @@ -103,6 +104,8 @@ func main() { runner = new(run.CmdDump) case TypeSync: runner = new(run.CmdSync) + case TypeRump: + runner = new(run.CmdRump) } // create metric @@ -161,7 +164,7 @@ func startHttpServer() { // sanitize options func sanitizeOptions(tp string) error { var err error - if tp != TypeDecode && tp != TypeRestore && tp != TypeDump && tp != TypeSync { + if tp != TypeDecode && tp != TypeRestore && tp != TypeDump && tp != TypeSync && tp != TypeRump { return fmt.Errorf("unknown type[%v]", tp) } @@ -195,6 +198,9 @@ func sanitizeOptions(tp string) error { if (tp == TypeDump || tp == TypeSync) && conf.Options.SourceAddress == "" { return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}") } + if tp == TypeRump && (conf.Options.SourceAddress == "" || conf.Options.TargetAddress == "") { + return fmt.Errorf("source and target address shouldn't be empty when type in {rump}") + } if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" { return fmt.Errorf("only one of source password_raw or password_encoding should be given") @@ -221,6 +227,25 @@ func sanitizeOptions(tp string) error { } log.StdLog = log.New(utils.LogRotater, "") } + // set log level + var logDeepLevel log.LogLevel + switch conf.Options.LogLevel { + case "none": + logDeepLevel = log.LEVEL_NONE + case "error": + logDeepLevel = log.LEVEL_ERROR + case "warn": + logDeepLevel = log.LEVEL_WARN + case "": + fallthrough + case "info": + logDeepLevel = log.LEVEL_INFO + case "all": + logDeepLevel = log.LEVEL_DEBUG + default: + return fmt.Errorf("invalid log level[%v]", conf.Options.LogLevel) + } + log.SetLevel(logDeepLevel) // heartbeat, 86400 = 1 day if conf.Options.HeartbeatInterval > 86400 { @@ -323,6 +348,26 @@ func sanitizeOptions(tp string) error { } } + if tp == TypeRump { + if conf.Options.ScanKeyNumber == 0 { + conf.Options.ScanKeyNumber = 100 + } + + if conf.Options.ScanSpecialCloud == run.TencentCluster { + if len(conf.Options.ScanSpecialCloudTencentUrls) == 0 { + return fmt.Errorf("`scan.special_cloud.tencent.urls` shouldn't be empty when " + + "`scan.special_cloud` is [%s]", run.TencentCluster) + } + } else if conf.Options.ScanSpecialCloud == run.AliyunCluster { + if conf.Options.ScanSpecialCloudAliyunNodeNumber == 0 { + return fmt.Errorf("`scan.special_cloud.aliyun.node_number` shouldn't be 0 when " + + "`scan.special_cloud` is [%s]", run.AliyunCluster) + } + } else if conf.Options.ScanSpecialCloud != "" { + return fmt.Errorf("special cloud type[%s] is not supported", conf.Options.ScanSpecialCloud) + } + } + return nil } diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go new file mode 100644 index 0000000..a03c882 --- /dev/null +++ b/src/redis-shake/rump.go @@ -0,0 +1,173 @@ +package run + +import ( + "pkg/libs/log" + "strconv" + + "redis-shake/common" + "redis-shake/configure" + + "github.com/garyburd/redigo/redis" +) + +const ( + TencentCluster = "tencent_cluster" + AliyunCluster = "aliyun_cluster" +) + +type CmdRump struct { + sourceConn redis.Conn + targetConn redis.Conn + + keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2 + resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3 +} + +type KeyNode struct { + key string + value string + pttl int64 +} + +func (cr *CmdRump) GetDetailedInfo() []interface{} { + return nil +} + +func (cr *CmdRump) Main() { + // build connection + cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType, + conf.Options.SourcePasswordRaw) + cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType, + conf.Options.TargetPasswordRaw) + + // init two channels + cr.keyChan = make(chan *KeyNode, conf.Options.ScanKeyNumber) + cr.resultChan = make(chan *KeyNode, conf.Options.ScanKeyNumber) + + /* + * we start 3 routines to run: + * 1. fetch keys from the source redis + * 2. write keys into the target redis + * 3. read result from the target redis + */ + // routine1 + go cr.fetcher() + // routine2 + go cr.writer() + // routine3 + cr.receiver() +} + +func (cr *CmdRump) fetcher() { + length := 1 + if conf.Options.ScanSpecialCloud == TencentCluster { + length = len(conf.Options.ScanSpecialCloudTencentUrls) + } else if conf.Options.ScanSpecialCloud == AliyunCluster { + length = int(conf.Options.ScanSpecialCloudAliyunNodeNumber) + } + + // iterate all source nodes + for i := 0; i < length; i++ { + var ( + cursor int64 + keys []string + values []interface{} + err error + ) + + // fetch data from on node + for { + switch conf.Options.ScanSpecialCloud { + case "": + values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT", + conf.Options.ScanKeyNumber)) + case TencentCluster: + values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT", + conf.Options.ScanKeyNumber, conf.Options.ScanSpecialCloudTencentUrls[i])) + case AliyunCluster: + values, err = redis.Values(cr.sourceConn.Do("ISCAN", i, cursor, "COUNT", + conf.Options.ScanKeyNumber)) + } + if err != nil && err != redis.ErrNil { + log.Panicf("scan with cursor[%v] failed[%v]", cursor, err) + } + + values, err = redis.Scan(values, &cursor, &keys) + if err != nil && err != redis.ErrNil { + log.Panicf("do scan with cursor[%v] failed[%v]", cursor, err) + } + + log.Info("scaned keys: ", len(keys)) + + // pipeline dump + for _, key := range keys { + log.Debug("scan key: ", key) + cr.sourceConn.Send("DUMP", key) + } + dumps, err := redis.Strings(cr.sourceConn.Do("")) + if err != nil && err != redis.ErrNil { + log.Panicf("do dump with cursor[%v] failed[%v]", cursor, err) + } + + // pipeline ttl + for _, key := range keys { + cr.sourceConn.Send("PTTL", key) + } + pttls, err := redis.Int64s(cr.sourceConn.Do("")) + if err != nil && err != redis.ErrNil { + log.Panicf("do ttl with cursor[%v] failed[%v]", cursor, err) + } + + for i, k := range keys { + cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]} + } + + // Last iteration of scan. + if cursor == 0 { + break + } + } + } + + close(cr.keyChan) +} + +func (cr *CmdRump) writer() { + var count uint32 + for ele := range cr.keyChan { + if ele.pttl == -1 { // not set ttl + ele.pttl = 0 + } + if ele.pttl == -2 { + log.Debugf("skip key %s for expired", ele.key) + continue + } + + log.Debugf("restore %s", ele.key) + if conf.Options.Rewrite { + cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE") + } else { + cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value) + } + + cr.resultChan <- ele + count++ + if count == conf.Options.ScanKeyNumber { + // batch + log.Debugf("send keys %d\n", count) + cr.targetConn.Flush() + count = 0 + } + } + cr.targetConn.Flush() + close(cr.resultChan) +} + +func (cr *CmdRump) receiver() { + for ele := range cr.resultChan { + if _, err := cr.targetConn.Receive(); err != nil && err != redis.ErrNil { + log.Panicf("restore key[%v] with pttl[%v] error[%v]", ele.key, strconv.FormatInt(ele.pttl, 10), + err) + } + } +} \ No newline at end of file diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 26a597e..a91f0df 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -463,6 +463,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd } else { // cmd.SyncStat.PullCmdCount.Incr() metric.MetricVar.AddPullCmdCount(1) + if scmd != "ping" { if strings.EqualFold(scmd, "select") { if len(argv) != 1 {