diff --git a/ChangeLog b/ChangeLog
index c2d4540..b0fa823 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,7 @@
+2019-04-21 Alibaba Cloud.
+ * version: 1.4.0
+ * IMPROVE: support "rump" type to syncing data when `sync` and `psync`
+ commands are not supported.
2019-04-13 Alibaba Cloud.
* version: 1.2.3
* IMPROVE: polish log print to print more error info.
diff --git a/README.md b/README.md
index 12d11f4..43f7b80 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,8 @@ The type can be one of the followings:
* **decode**: Decode dumped payload to human readable format (hex-encoding).
* **restore**: Restore RDB file to target redis.
* **dump**: Dump RDB file from souce redis.
-* **sync**: Sync data from source redis to target redis.
+* **sync**: Sync data from source redis to target redis by `sync` or `psync` command. Including full synchronization and incremental synchronization.
+* **rump**: Sync data from source redis to target redis by `scan` command. Only support full synchronization.
Please check out the `conf/redis-shake.conf` to see the detailed parameters description.
diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf
index d27cbe1..6148365 100644
--- a/conf/redis-shake.conf
+++ b/conf/redis-shake.conf
@@ -1,9 +1,13 @@
# this is the configuration of redis-shake.
+# if you have any problem, please visit https://github.com/alibaba/RedisShake/wiki/FAQ
# id
id = redis-shake
+
# log file,日志文件,不配置将打印到stdout (e.g. /var/log/redis-shake.log )
-log_file =
+log.file =
+# log level: "none", "error", "warn", "info", "all". default is "info".
+log.level = info
# pid path,进程文件存储地址(e.g. /var/run/),不配置将默认输出到执行下面,
# 注意这个是目录,真正的pid是`{pid_path}/{id}.pid`
pid_path =
@@ -30,7 +34,7 @@ input_rdb = local
output_rdb = local_dump
# source redis configuration.
-# used in `dump` and `sync`.
+# used in `dump`, `sync` and `rump`.
# ip:port
# 源redis地址
source.address = 127.0.0.1:20441
@@ -38,11 +42,9 @@ source.address = 127.0.0.1:20441
source.password_raw = 123456
# auth type, don't modify it
source.auth_type = auth
-# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0)
-source.version = 9
# target redis configuration. used in `restore` and `sync`.
-# used in `restore` and `sync`.
+# used in `restore`, `sync` and `rump`.
# ip:port
# 目的redis地址
target.address = 127.0.0.1:20551
@@ -50,8 +52,6 @@ target.address = 127.0.0.1:20551
target.password_raw = 123456
# auth type, don't modify it
target.auth_type = auth
-# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0)
-target.version = 6
# all the data will come into this db. < 0 means disable.
# used in `restore` and `sync`.
target.db = -1
@@ -61,7 +61,7 @@ target.db = -1
fake_time =
# force rewrite when destination restore has the key
-# used in `restore` and `sync`.
+# used in `restore`, `sync` and `rump`.
# 当源目的有重复key,是否进行覆写
rewrite = true
@@ -137,8 +137,25 @@ sender.delay_channel_size = 65535
# TCP keep-alive保活参数,单位秒,0表示不启用。
keep_alive = 0
+# used in `rump`.
+# number of keys captured each time. default is 100.
+# 每次scan的个数,不配置则默认100.
+scan.key_number = 50
+
+# used in `rump`.
+# we support some special redis types that don't use default `scan` command like alibaba cloud and tencent cloud.
+# 有些版本具有特殊的格式,与普通的scan命令有所不同,我们进行了特殊的适配。目前支持腾讯云的集群版"tencent_cluster"
+# 和阿里云的集群版"aliyun_cluster"。
+scan.special_cloud =
+# 如果源端是腾讯云的集群版,那么需要传入不同子节点的id(通过`cluster nodes`命令),以数组形式表示(分号分割)。
+# shake会"串行"进行遍历并抓取。例如:"25b21f1836026bd49c52b2d10e09fbf8c6aa1fdc;da6041781b5d7fe21404811d430cdffea2bf84de"
+# 具体请参考:https://cloud.tencent.com/document/product/239/18336 中的"自定义命令"小节。
+scan.special_cloud.tencent.urls =
+# 如果源端是阿里云的集群版,那么需要传入子节点的个数。例如:16
+scan.special_cloud.aliyun.node_number =
+
# ----------------splitter----------------
-# below variables are useless for current opensource version so don't set.
+# below variables are useless for current open source version so don't set.
# replace hash tag.
# used in `sync`.
diff --git a/src/redis-shake/common/mix.go b/src/redis-shake/common/mix.go
index 1f5c0e0..a17daec 100644
--- a/src/redis-shake/common/mix.go
+++ b/src/redis-shake/common/mix.go
@@ -58,8 +58,8 @@ func Welcome() {
/ / (o)
------------------------------
`
-
- log.Warn("\n", welcome)
+ startMsg := "if you have any problem, please visit https://github.com/alibaba/RedisShake/wiki/FAQ"
+ log.Warnf("\n%s%s\n\n", welcome, startMsg)
}
func Goodbye() {
diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go
index bff85bb..ac95450 100644
--- a/src/redis-shake/configure/configure.go
+++ b/src/redis-shake/configure/configure.go
@@ -4,43 +4,48 @@ import "time"
type Configuration struct {
// config file variables
- Id string `config:"id"`
- LogFile string `config:"log_file"`
- SystemProfile int `config:"system_profile"`
- HttpProfile int `config:"http_profile"`
- NCpu int `config:"ncpu"`
- Parallel int `config:"parallel"`
- InputRdb string `config:"input_rdb"`
- OutputRdb string `config:"output_rdb"`
- SourceAddress string `config:"source.address"`
- SourcePasswordRaw string `config:"source.password_raw"`
- SourcePasswordEncoding string `config:"source.password_encoding"`
- SourceVersion uint `config:"source.version"`
- SourceAuthType string `config:"source.auth_type"`
- TargetAddress string `config:"target.address"`
- TargetPasswordRaw string `config:"target.password_raw"`
- TargetPasswordEncoding string `config:"target.password_encoding"`
- TargetVersion uint `config:"target.version"`
- TargetDB int `config:"target.db"`
- TargetAuthType string `config:"target.auth_type"`
- FakeTime string `config:"fake_time"`
- Rewrite bool `config:"rewrite"`
- FilterDB string `config:"filter.db"`
- FilterKey []string `config:"filter.key"`
- FilterSlot []string `config:"filter.slot"`
- BigKeyThreshold uint64 `config:"big_key_threshold"`
- Psync bool `config:"psync"`
- Metric bool `config:"metric"`
- MetricPrintLog bool `config:"metric.print_log"`
- HeartbeatUrl string `config:"heartbeat.url"`
- HeartbeatInterval uint `config:"heartbeat.interval"`
- HeartbeatExternal string `config:"heartbeat.external"`
- HeartbeatNetworkInterface string `config:"heartbeat.network_interface"`
- SenderSize uint64 `config:"sender.size"`
- SenderCount uint `config:"sender.count"`
- SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
- KeepAlive uint `config:"keep_alive"`
- PidPath string `config:"pid_path"`
+ Id string `config:"id"`
+ LogFile string `config:"log.file"`
+ LogLevel string `config:"log.level"`
+ SystemProfile int `config:"system_profile"`
+ HttpProfile int `config:"http_profile"`
+ NCpu int `config:"ncpu"`
+ Parallel int `config:"parallel"`
+ InputRdb string `config:"input_rdb"`
+ OutputRdb string `config:"output_rdb"`
+ SourceAddress string `config:"source.address"`
+ SourcePasswordRaw string `config:"source.password_raw"`
+ SourcePasswordEncoding string `config:"source.password_encoding"`
+ SourceVersion uint `config:"source.version"`
+ SourceAuthType string `config:"source.auth_type"`
+ TargetAddress string `config:"target.address"`
+ TargetPasswordRaw string `config:"target.password_raw"`
+ TargetPasswordEncoding string `config:"target.password_encoding"`
+ TargetVersion uint `config:"target.version"`
+ TargetDB int `config:"target.db"`
+ TargetAuthType string `config:"target.auth_type"`
+ FakeTime string `config:"fake_time"`
+ Rewrite bool `config:"rewrite"`
+ FilterDB string `config:"filter.db"`
+ FilterKey []string `config:"filter.key"`
+ FilterSlot []string `config:"filter.slot"`
+ BigKeyThreshold uint64 `config:"big_key_threshold"`
+ Psync bool `config:"psync"`
+ Metric bool `config:"metric"`
+ MetricPrintLog bool `config:"metric.print_log"`
+ HeartbeatUrl string `config:"heartbeat.url"`
+ HeartbeatInterval uint `config:"heartbeat.interval"`
+ HeartbeatExternal string `config:"heartbeat.external"`
+ HeartbeatNetworkInterface string `config:"heartbeat.network_interface"`
+ SenderSize uint64 `config:"sender.size"`
+ SenderCount uint `config:"sender.count"`
+ SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
+ KeepAlive uint `config:"keep_alive"`
+ PidPath string `config:"pid_path"`
+ ScanKeyNumber uint32 `config:"scan.key_number"`
+ ScanSpecialCloud string `config:"scan.special_cloud"`
+ ScanSpecialCloudTencentUrls string `config:"scan.special_cloud.tencent.urls"`
+ ScanSpecialCloudAliyunNodeNumber uint8 `config:"scan.special_cloud.aliyun.node_number"`
// inner variables
ReplaceHashTag bool `config:"replace_hash_tag"`
diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go
index d22dc11..b2c6d4d 100644
--- a/src/redis-shake/main/main.go
+++ b/src/redis-shake/main/main.go
@@ -39,6 +39,7 @@ const (
TypeRestore = "restore"
TypeDump = "dump"
TypeSync = "sync"
+ TypeRump = "rump"
defaultHttpPort = 20881
defaultSystemPort = 20882
@@ -103,6 +104,8 @@ func main() {
runner = new(run.CmdDump)
case TypeSync:
runner = new(run.CmdSync)
+ case TypeRump:
+ runner = new(run.CmdRump)
}
// create metric
@@ -161,7 +164,7 @@ func startHttpServer() {
// sanitize options
func sanitizeOptions(tp string) error {
var err error
- if tp != TypeDecode && tp != TypeRestore && tp != TypeDump && tp != TypeSync {
+ if tp != TypeDecode && tp != TypeRestore && tp != TypeDump && tp != TypeSync && tp != TypeRump {
return fmt.Errorf("unknown type[%v]", tp)
}
@@ -195,6 +198,9 @@ func sanitizeOptions(tp string) error {
if (tp == TypeDump || tp == TypeSync) && conf.Options.SourceAddress == "" {
return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}")
}
+ if tp == TypeRump && (conf.Options.SourceAddress == "" || conf.Options.TargetAddress == "") {
+ return fmt.Errorf("source and target address shouldn't be empty when type in {rump}")
+ }
if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" {
return fmt.Errorf("only one of source password_raw or password_encoding should be given")
@@ -221,6 +227,25 @@ func sanitizeOptions(tp string) error {
}
log.StdLog = log.New(utils.LogRotater, "")
}
+ // set log level
+ var logDeepLevel log.LogLevel
+ switch conf.Options.LogLevel {
+ case "none":
+ logDeepLevel = log.LEVEL_NONE
+ case "error":
+ logDeepLevel = log.LEVEL_ERROR
+ case "warn":
+ logDeepLevel = log.LEVEL_WARN
+ case "":
+ fallthrough
+ case "info":
+ logDeepLevel = log.LEVEL_INFO
+ case "all":
+ logDeepLevel = log.LEVEL_DEBUG
+ default:
+ return fmt.Errorf("invalid log level[%v]", conf.Options.LogLevel)
+ }
+ log.SetLevel(logDeepLevel)
// heartbeat, 86400 = 1 day
if conf.Options.HeartbeatInterval > 86400 {
@@ -323,6 +348,26 @@ func sanitizeOptions(tp string) error {
}
}
+ if tp == TypeRump {
+ if conf.Options.ScanKeyNumber == 0 {
+ conf.Options.ScanKeyNumber = 100
+ }
+
+ if conf.Options.ScanSpecialCloud == run.TencentCluster {
+ if len(conf.Options.ScanSpecialCloudTencentUrls) == 0 {
+ return fmt.Errorf("`scan.special_cloud.tencent.urls` shouldn't be empty when " +
+ "`scan.special_cloud` is [%s]", run.TencentCluster)
+ }
+ } else if conf.Options.ScanSpecialCloud == run.AliyunCluster {
+ if conf.Options.ScanSpecialCloudAliyunNodeNumber == 0 {
+ return fmt.Errorf("`scan.special_cloud.aliyun.node_number` shouldn't be 0 when " +
+ "`scan.special_cloud` is [%s]", run.AliyunCluster)
+ }
+ } else if conf.Options.ScanSpecialCloud != "" {
+ return fmt.Errorf("special cloud type[%s] is not supported", conf.Options.ScanSpecialCloud)
+ }
+ }
+
return nil
}
diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go
new file mode 100644
index 0000000..a03c882
--- /dev/null
+++ b/src/redis-shake/rump.go
@@ -0,0 +1,173 @@
+package run
+
+import (
+ "pkg/libs/log"
+ "strconv"
+
+ "redis-shake/common"
+ "redis-shake/configure"
+
+ "github.com/garyburd/redigo/redis"
+)
+
+const (
+ TencentCluster = "tencent_cluster"
+ AliyunCluster = "aliyun_cluster"
+)
+
+type CmdRump struct {
+ sourceConn redis.Conn
+ targetConn redis.Conn
+
+ keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2
+ resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3
+}
+
+type KeyNode struct {
+ key string
+ value string
+ pttl int64
+}
+
+func (cr *CmdRump) GetDetailedInfo() []interface{} {
+ return nil
+}
+
+func (cr *CmdRump) Main() {
+ // build connection
+ cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType,
+ conf.Options.SourcePasswordRaw)
+ cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType,
+ conf.Options.TargetPasswordRaw)
+
+ // init two channels
+ cr.keyChan = make(chan *KeyNode, conf.Options.ScanKeyNumber)
+ cr.resultChan = make(chan *KeyNode, conf.Options.ScanKeyNumber)
+
+ /*
+ * we start 3 routines to run:
+ * 1. fetch keys from the source redis
+ * 2. write keys into the target redis
+ * 3. read result from the target redis
+ */
+ // routine1
+ go cr.fetcher()
+ // routine2
+ go cr.writer()
+ // routine3
+ cr.receiver()
+}
+
+func (cr *CmdRump) fetcher() {
+ length := 1
+ if conf.Options.ScanSpecialCloud == TencentCluster {
+ length = len(conf.Options.ScanSpecialCloudTencentUrls)
+ } else if conf.Options.ScanSpecialCloud == AliyunCluster {
+ length = int(conf.Options.ScanSpecialCloudAliyunNodeNumber)
+ }
+
+ // iterate all source nodes
+ for i := 0; i < length; i++ {
+ var (
+ cursor int64
+ keys []string
+ values []interface{}
+ err error
+ )
+
+ // fetch data from on node
+ for {
+ switch conf.Options.ScanSpecialCloud {
+ case "":
+ values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT",
+ conf.Options.ScanKeyNumber))
+ case TencentCluster:
+ values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT",
+ conf.Options.ScanKeyNumber, conf.Options.ScanSpecialCloudTencentUrls[i]))
+ case AliyunCluster:
+ values, err = redis.Values(cr.sourceConn.Do("ISCAN", i, cursor, "COUNT",
+ conf.Options.ScanKeyNumber))
+ }
+ if err != nil && err != redis.ErrNil {
+ log.Panicf("scan with cursor[%v] failed[%v]", cursor, err)
+ }
+
+ values, err = redis.Scan(values, &cursor, &keys)
+ if err != nil && err != redis.ErrNil {
+ log.Panicf("do scan with cursor[%v] failed[%v]", cursor, err)
+ }
+
+ log.Info("scaned keys: ", len(keys))
+
+ // pipeline dump
+ for _, key := range keys {
+ log.Debug("scan key: ", key)
+ cr.sourceConn.Send("DUMP", key)
+ }
+ dumps, err := redis.Strings(cr.sourceConn.Do(""))
+ if err != nil && err != redis.ErrNil {
+ log.Panicf("do dump with cursor[%v] failed[%v]", cursor, err)
+ }
+
+ // pipeline ttl
+ for _, key := range keys {
+ cr.sourceConn.Send("PTTL", key)
+ }
+ pttls, err := redis.Int64s(cr.sourceConn.Do(""))
+ if err != nil && err != redis.ErrNil {
+ log.Panicf("do ttl with cursor[%v] failed[%v]", cursor, err)
+ }
+
+ for i, k := range keys {
+ cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
+ }
+
+ // Last iteration of scan.
+ if cursor == 0 {
+ break
+ }
+ }
+ }
+
+ close(cr.keyChan)
+}
+
+func (cr *CmdRump) writer() {
+ var count uint32
+ for ele := range cr.keyChan {
+ if ele.pttl == -1 { // not set ttl
+ ele.pttl = 0
+ }
+ if ele.pttl == -2 {
+ log.Debugf("skip key %s for expired", ele.key)
+ continue
+ }
+
+ log.Debugf("restore %s", ele.key)
+ if conf.Options.Rewrite {
+ cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE")
+ } else {
+ cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value)
+ }
+
+ cr.resultChan <- ele
+ count++
+ if count == conf.Options.ScanKeyNumber {
+ // batch
+ log.Debugf("send keys %d\n", count)
+ cr.targetConn.Flush()
+ count = 0
+ }
+ }
+ cr.targetConn.Flush()
+ close(cr.resultChan)
+}
+
+func (cr *CmdRump) receiver() {
+ for ele := range cr.resultChan {
+ if _, err := cr.targetConn.Receive(); err != nil && err != redis.ErrNil {
+ log.Panicf("restore key[%v] with pttl[%v] error[%v]", ele.key, strconv.FormatInt(ele.pttl, 10),
+ err)
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go
index 26a597e..a91f0df 100644
--- a/src/redis-shake/sync.go
+++ b/src/redis-shake/sync.go
@@ -463,6 +463,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd
} else {
// cmd.SyncStat.PullCmdCount.Incr()
metric.MetricVar.AddPullCmdCount(1)
+
if scmd != "ping" {
if strings.EqualFold(scmd, "select") {
if len(argv) != 1 {