release v1.4: support rump type to using dump+restore command to syncing data

v4
vinllen 6 years ago
parent 8e589e666f
commit a98dcf1101
  1. 4
      ChangeLog
  2. 3
      README.md
  3. BIN
      bin/redis-shake.darwin64
  4. 35
      conf/redis-shake.conf
  5. 4
      src/redis-shake/common/mix.go
  6. 7
      src/redis-shake/configure/configure.go
  7. 47
      src/redis-shake/main/main.go
  8. 173
      src/redis-shake/rump.go
  9. 1
      src/redis-shake/sync.go

@ -1,3 +1,7 @@
2019-04-21 Alibaba Cloud.
* version: 1.4.0
* IMPROVE: support "rump" type to syncing data when `sync` and `psync`
commands are not supported.
2019-04-13 Alibaba Cloud. 2019-04-13 Alibaba Cloud.
* version: 1.2.3 * version: 1.2.3
* IMPROVE: polish log print to print more error info. * IMPROVE: polish log print to print more error info.

@ -15,7 +15,8 @@ The type can be one of the followings:<br>
* **decode**: Decode dumped payload to human readable format (hex-encoding). * **decode**: Decode dumped payload to human readable format (hex-encoding).
* **restore**: Restore RDB file to target redis. * **restore**: Restore RDB file to target redis.
* **dump**: Dump RDB file from souce redis. * **dump**: Dump RDB file from souce redis.
* **sync**: Sync data from source redis to target redis. * **sync**: Sync data from source redis to target redis by `sync` or `psync` command. Including full synchronization and incremental synchronization.
* **rump**: Sync data from source redis to target redis by `scan` command. Only support full synchronization.
Please check out the `conf/redis-shake.conf` to see the detailed parameters description.<br> Please check out the `conf/redis-shake.conf` to see the detailed parameters description.<br>

Binary file not shown.

@ -1,9 +1,13 @@
# this is the configuration of redis-shake. # this is the configuration of redis-shake.
# if you have any problem, please visit https://github.com/alibaba/RedisShake/wiki/FAQ
# id # id
id = redis-shake id = redis-shake
# log file,日志文件,不配置将打印到stdout (e.g. /var/log/redis-shake.log ) # log file,日志文件,不配置将打印到stdout (e.g. /var/log/redis-shake.log )
log_file = log.file =
# log level: "none", "error", "warn", "info", "all". default is "info".
log.level = info
# pid path,进程文件存储地址(e.g. /var/run/),不配置将默认输出到执行下面, # pid path,进程文件存储地址(e.g. /var/run/),不配置将默认输出到执行下面,
# 注意这个是目录,真正的pid是`{pid_path}/{id}.pid` # 注意这个是目录,真正的pid是`{pid_path}/{id}.pid`
pid_path = pid_path =
@ -30,7 +34,7 @@ input_rdb = local
output_rdb = local_dump output_rdb = local_dump
# source redis configuration. # source redis configuration.
# used in `dump` and `sync`. # used in `dump`, `sync` and `rump`.
# ip:port # ip:port
# 源redis地址 # 源redis地址
source.address = 127.0.0.1:20441 source.address = 127.0.0.1:20441
@ -38,11 +42,9 @@ source.address = 127.0.0.1:20441
source.password_raw = 123456 source.password_raw = 123456
# auth type, don't modify it # auth type, don't modify it
source.auth_type = auth source.auth_type = auth
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0)
source.version = 9
# target redis configuration. used in `restore` and `sync`. # target redis configuration. used in `restore` and `sync`.
# used in `restore` and `sync`. # used in `restore`, `sync` and `rump`.
# ip:port # ip:port
# 目的redis地址 # 目的redis地址
target.address = 127.0.0.1:20551 target.address = 127.0.0.1:20551
@ -50,8 +52,6 @@ target.address = 127.0.0.1:20551
target.password_raw = 123456 target.password_raw = 123456
# auth type, don't modify it # auth type, don't modify it
target.auth_type = auth target.auth_type = auth
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0)
target.version = 6
# all the data will come into this db. < 0 means disable. # all the data will come into this db. < 0 means disable.
# used in `restore` and `sync`. # used in `restore` and `sync`.
target.db = -1 target.db = -1
@ -61,7 +61,7 @@ target.db = -1
fake_time = fake_time =
# force rewrite when destination restore has the key # force rewrite when destination restore has the key
# used in `restore` and `sync`. # used in `restore`, `sync` and `rump`.
# 当源目的有重复key,是否进行覆写 # 当源目的有重复key,是否进行覆写
rewrite = true rewrite = true
@ -137,8 +137,25 @@ sender.delay_channel_size = 65535
# TCP keep-alive保活参数,单位秒,0表示不启用。 # TCP keep-alive保活参数,单位秒,0表示不启用。
keep_alive = 0 keep_alive = 0
# used in `rump`.
# number of keys captured each time. default is 100.
# 每次scan的个数,不配置则默认100.
scan.key_number = 50
# used in `rump`.
# we support some special redis types that don't use default `scan` command like alibaba cloud and tencent cloud.
# 有些版本具有特殊的格式,与普通的scan命令有所不同,我们进行了特殊的适配。目前支持腾讯云的集群版"tencent_cluster"
# 和阿里云的集群版"aliyun_cluster"。
scan.special_cloud =
# 如果源端是腾讯云的集群版,那么需要传入不同子节点的id(通过`cluster nodes`命令),以数组形式表示(分号分割)。
# shake会"串行"进行遍历并抓取。例如:"25b21f1836026bd49c52b2d10e09fbf8c6aa1fdc;da6041781b5d7fe21404811d430cdffea2bf84de"
# 具体请参考:https://cloud.tencent.com/document/product/239/18336 中的"自定义命令"小节。
scan.special_cloud.tencent.urls =
# 如果源端是阿里云的集群版,那么需要传入子节点的个数。例如:16
scan.special_cloud.aliyun.node_number =
# ----------------splitter---------------- # ----------------splitter----------------
# below variables are useless for current opensource version so don't set. # below variables are useless for current open source version so don't set.
# replace hash tag. # replace hash tag.
# used in `sync`. # used in `sync`.

@ -58,8 +58,8 @@ func Welcome() {
/ / (o) / / (o)
------------------------------ ------------------------------
` `
startMsg := "if you have any problem, please visit https://github.com/alibaba/RedisShake/wiki/FAQ"
log.Warn("\n", welcome) log.Warnf("\n%s%s\n\n", welcome, startMsg)
} }
func Goodbye() { func Goodbye() {

@ -5,7 +5,8 @@ import "time"
type Configuration struct { type Configuration struct {
// config file variables // config file variables
Id string `config:"id"` Id string `config:"id"`
LogFile string `config:"log_file"` LogFile string `config:"log.file"`
LogLevel string `config:"log.level"`
SystemProfile int `config:"system_profile"` SystemProfile int `config:"system_profile"`
HttpProfile int `config:"http_profile"` HttpProfile int `config:"http_profile"`
NCpu int `config:"ncpu"` NCpu int `config:"ncpu"`
@ -41,6 +42,10 @@ type Configuration struct {
SenderDelayChannelSize uint `config:"sender.delay_channel_size"` SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
KeepAlive uint `config:"keep_alive"` KeepAlive uint `config:"keep_alive"`
PidPath string `config:"pid_path"` PidPath string `config:"pid_path"`
ScanKeyNumber uint32 `config:"scan.key_number"`
ScanSpecialCloud string `config:"scan.special_cloud"`
ScanSpecialCloudTencentUrls string `config:"scan.special_cloud.tencent.urls"`
ScanSpecialCloudAliyunNodeNumber uint8 `config:"scan.special_cloud.aliyun.node_number"`
// inner variables // inner variables
ReplaceHashTag bool `config:"replace_hash_tag"` ReplaceHashTag bool `config:"replace_hash_tag"`

@ -39,6 +39,7 @@ const (
TypeRestore = "restore" TypeRestore = "restore"
TypeDump = "dump" TypeDump = "dump"
TypeSync = "sync" TypeSync = "sync"
TypeRump = "rump"
defaultHttpPort = 20881 defaultHttpPort = 20881
defaultSystemPort = 20882 defaultSystemPort = 20882
@ -103,6 +104,8 @@ func main() {
runner = new(run.CmdDump) runner = new(run.CmdDump)
case TypeSync: case TypeSync:
runner = new(run.CmdSync) runner = new(run.CmdSync)
case TypeRump:
runner = new(run.CmdRump)
} }
// create metric // create metric
@ -161,7 +164,7 @@ func startHttpServer() {
// sanitize options // sanitize options
func sanitizeOptions(tp string) error { func sanitizeOptions(tp string) error {
var err error var err error
if tp != TypeDecode && tp != TypeRestore && tp != TypeDump && tp != TypeSync { if tp != TypeDecode && tp != TypeRestore && tp != TypeDump && tp != TypeSync && tp != TypeRump {
return fmt.Errorf("unknown type[%v]", tp) return fmt.Errorf("unknown type[%v]", tp)
} }
@ -195,6 +198,9 @@ func sanitizeOptions(tp string) error {
if (tp == TypeDump || tp == TypeSync) && conf.Options.SourceAddress == "" { if (tp == TypeDump || tp == TypeSync) && conf.Options.SourceAddress == "" {
return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}") return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}")
} }
if tp == TypeRump && (conf.Options.SourceAddress == "" || conf.Options.TargetAddress == "") {
return fmt.Errorf("source and target address shouldn't be empty when type in {rump}")
}
if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" { if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" {
return fmt.Errorf("only one of source password_raw or password_encoding should be given") return fmt.Errorf("only one of source password_raw or password_encoding should be given")
@ -221,6 +227,25 @@ func sanitizeOptions(tp string) error {
} }
log.StdLog = log.New(utils.LogRotater, "") log.StdLog = log.New(utils.LogRotater, "")
} }
// set log level
var logDeepLevel log.LogLevel
switch conf.Options.LogLevel {
case "none":
logDeepLevel = log.LEVEL_NONE
case "error":
logDeepLevel = log.LEVEL_ERROR
case "warn":
logDeepLevel = log.LEVEL_WARN
case "":
fallthrough
case "info":
logDeepLevel = log.LEVEL_INFO
case "all":
logDeepLevel = log.LEVEL_DEBUG
default:
return fmt.Errorf("invalid log level[%v]", conf.Options.LogLevel)
}
log.SetLevel(logDeepLevel)
// heartbeat, 86400 = 1 day // heartbeat, 86400 = 1 day
if conf.Options.HeartbeatInterval > 86400 { if conf.Options.HeartbeatInterval > 86400 {
@ -323,6 +348,26 @@ func sanitizeOptions(tp string) error {
} }
} }
if tp == TypeRump {
if conf.Options.ScanKeyNumber == 0 {
conf.Options.ScanKeyNumber = 100
}
if conf.Options.ScanSpecialCloud == run.TencentCluster {
if len(conf.Options.ScanSpecialCloudTencentUrls) == 0 {
return fmt.Errorf("`scan.special_cloud.tencent.urls` shouldn't be empty when " +
"`scan.special_cloud` is [%s]", run.TencentCluster)
}
} else if conf.Options.ScanSpecialCloud == run.AliyunCluster {
if conf.Options.ScanSpecialCloudAliyunNodeNumber == 0 {
return fmt.Errorf("`scan.special_cloud.aliyun.node_number` shouldn't be 0 when " +
"`scan.special_cloud` is [%s]", run.AliyunCluster)
}
} else if conf.Options.ScanSpecialCloud != "" {
return fmt.Errorf("special cloud type[%s] is not supported", conf.Options.ScanSpecialCloud)
}
}
return nil return nil
} }

@ -0,0 +1,173 @@
package run
import (
"pkg/libs/log"
"strconv"
"redis-shake/common"
"redis-shake/configure"
"github.com/garyburd/redigo/redis"
)
const (
TencentCluster = "tencent_cluster"
AliyunCluster = "aliyun_cluster"
)
type CmdRump struct {
sourceConn redis.Conn
targetConn redis.Conn
keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2
resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3
}
type KeyNode struct {
key string
value string
pttl int64
}
func (cr *CmdRump) GetDetailedInfo() []interface{} {
return nil
}
func (cr *CmdRump) Main() {
// build connection
cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw)
cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw)
// init two channels
cr.keyChan = make(chan *KeyNode, conf.Options.ScanKeyNumber)
cr.resultChan = make(chan *KeyNode, conf.Options.ScanKeyNumber)
/*
* we start 3 routines to run:
* 1. fetch keys from the source redis
* 2. write keys into the target redis
* 3. read result from the target redis
*/
// routine1
go cr.fetcher()
// routine2
go cr.writer()
// routine3
cr.receiver()
}
func (cr *CmdRump) fetcher() {
length := 1
if conf.Options.ScanSpecialCloud == TencentCluster {
length = len(conf.Options.ScanSpecialCloudTencentUrls)
} else if conf.Options.ScanSpecialCloud == AliyunCluster {
length = int(conf.Options.ScanSpecialCloudAliyunNodeNumber)
}
// iterate all source nodes
for i := 0; i < length; i++ {
var (
cursor int64
keys []string
values []interface{}
err error
)
// fetch data from on node
for {
switch conf.Options.ScanSpecialCloud {
case "":
values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT",
conf.Options.ScanKeyNumber))
case TencentCluster:
values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT",
conf.Options.ScanKeyNumber, conf.Options.ScanSpecialCloudTencentUrls[i]))
case AliyunCluster:
values, err = redis.Values(cr.sourceConn.Do("ISCAN", i, cursor, "COUNT",
conf.Options.ScanKeyNumber))
}
if err != nil && err != redis.ErrNil {
log.Panicf("scan with cursor[%v] failed[%v]", cursor, err)
}
values, err = redis.Scan(values, &cursor, &keys)
if err != nil && err != redis.ErrNil {
log.Panicf("do scan with cursor[%v] failed[%v]", cursor, err)
}
log.Info("scaned keys: ", len(keys))
// pipeline dump
for _, key := range keys {
log.Debug("scan key: ", key)
cr.sourceConn.Send("DUMP", key)
}
dumps, err := redis.Strings(cr.sourceConn.Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do dump with cursor[%v] failed[%v]", cursor, err)
}
// pipeline ttl
for _, key := range keys {
cr.sourceConn.Send("PTTL", key)
}
pttls, err := redis.Int64s(cr.sourceConn.Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do ttl with cursor[%v] failed[%v]", cursor, err)
}
for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
}
// Last iteration of scan.
if cursor == 0 {
break
}
}
}
close(cr.keyChan)
}
func (cr *CmdRump) writer() {
var count uint32
for ele := range cr.keyChan {
if ele.pttl == -1 { // not set ttl
ele.pttl = 0
}
if ele.pttl == -2 {
log.Debugf("skip key %s for expired", ele.key)
continue
}
log.Debugf("restore %s", ele.key)
if conf.Options.Rewrite {
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE")
} else {
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value)
}
cr.resultChan <- ele
count++
if count == conf.Options.ScanKeyNumber {
// batch
log.Debugf("send keys %d\n", count)
cr.targetConn.Flush()
count = 0
}
}
cr.targetConn.Flush()
close(cr.resultChan)
}
func (cr *CmdRump) receiver() {
for ele := range cr.resultChan {
if _, err := cr.targetConn.Receive(); err != nil && err != redis.ErrNil {
log.Panicf("restore key[%v] with pttl[%v] error[%v]", ele.key, strconv.FormatInt(ele.pttl, 10),
err)
}
}
}

@ -463,6 +463,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd
} else { } else {
// cmd.SyncStat.PullCmdCount.Incr() // cmd.SyncStat.PullCmdCount.Incr()
metric.MetricVar.AddPullCmdCount(1) metric.MetricVar.AddPullCmdCount(1)
if scmd != "ping" { if scmd != "ping" {
if strings.EqualFold(scmd, "select") { if strings.EqualFold(scmd, "select") {
if len(argv) != 1 { if len(argv) != 1 {

Loading…
Cancel
Save