release v1.4: support rump type to using dump+restore command to syncing data

v4
vinllen 6 years ago
parent 491178d3ce
commit 6a33d25a4e
  1. 4
      ChangeLog
  2. 3
      README.md
  3. 35
      conf/redis-shake.conf
  4. 4
      src/redis-shake/common/mix.go
  5. 79
      src/redis-shake/configure/configure.go
  6. 47
      src/redis-shake/main/main.go
  7. 173
      src/redis-shake/rump.go
  8. 1
      src/redis-shake/sync.go

@ -1,3 +1,7 @@
2019-04-21 Alibaba Cloud.
* version: 1.4.0
* IMPROVE: support "rump" type to syncing data when `sync` and `psync`
commands are not supported.
2019-04-13 Alibaba Cloud.
* version: 1.2.3
* IMPROVE: polish log print to print more error info.

@ -15,7 +15,8 @@ The type can be one of the followings:<br>
* **decode**: Decode dumped payload to human readable format (hex-encoding).
* **restore**: Restore RDB file to target redis.
* **dump**: Dump RDB file from souce redis.
* **sync**: Sync data from source redis to target redis.
* **sync**: Sync data from source redis to target redis by `sync` or `psync` command. Including full synchronization and incremental synchronization.
* **rump**: Sync data from source redis to target redis by `scan` command. Only support full synchronization.
Please check out the `conf/redis-shake.conf` to see the detailed parameters description.<br>

@ -1,9 +1,13 @@
# this is the configuration of redis-shake.
# if you have any problem, please visit https://github.com/alibaba/RedisShake/wiki/FAQ
# id
id = redis-shake
# log file,日志文件,不配置将打印到stdout (e.g. /var/log/redis-shake.log )
log_file =
log.file =
# log level: "none", "error", "warn", "info", "all". default is "info".
log.level = info
# pid path,进程文件存储地址(e.g. /var/run/),不配置将默认输出到执行下面,
# 注意这个是目录,真正的pid是`{pid_path}/{id}.pid`
pid_path =
@ -30,7 +34,7 @@ input_rdb = local
output_rdb = local_dump
# source redis configuration.
# used in `dump` and `sync`.
# used in `dump`, `sync` and `rump`.
# ip:port
# 源redis地址
source.address = 127.0.0.1:20441
@ -38,11 +42,9 @@ source.address = 127.0.0.1:20441
source.password_raw = 123456
# auth type, don't modify it
source.auth_type = auth
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0)
source.version = 9
# target redis configuration. used in `restore` and `sync`.
# used in `restore` and `sync`.
# used in `restore`, `sync` and `rump`.
# ip:port
# 目的redis地址
target.address = 127.0.0.1:20551
@ -50,8 +52,6 @@ target.address = 127.0.0.1:20551
target.password_raw = 123456
# auth type, don't modify it
target.auth_type = auth
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0)
target.version = 6
# all the data will come into this db. < 0 means disable.
# used in `restore` and `sync`.
target.db = -1
@ -61,7 +61,7 @@ target.db = -1
fake_time =
# force rewrite when destination restore has the key
# used in `restore` and `sync`.
# used in `restore`, `sync` and `rump`.
# 当源目的有重复key,是否进行覆写
rewrite = true
@ -137,8 +137,25 @@ sender.delay_channel_size = 65535
# TCP keep-alive保活参数,单位秒,0表示不启用。
keep_alive = 0
# used in `rump`.
# number of keys captured each time. default is 100.
# 每次scan的个数,不配置则默认100.
scan.key_number = 50
# used in `rump`.
# we support some special redis types that don't use default `scan` command like alibaba cloud and tencent cloud.
# 有些版本具有特殊的格式,与普通的scan命令有所不同,我们进行了特殊的适配。目前支持腾讯云的集群版"tencent_cluster"
# 和阿里云的集群版"aliyun_cluster"。
scan.special_cloud =
# 如果源端是腾讯云的集群版,那么需要传入不同子节点的id(通过`cluster nodes`命令),以数组形式表示(分号分割)。
# shake会"串行"进行遍历并抓取。例如:"25b21f1836026bd49c52b2d10e09fbf8c6aa1fdc;da6041781b5d7fe21404811d430cdffea2bf84de"
# 具体请参考:https://cloud.tencent.com/document/product/239/18336 中的"自定义命令"小节。
scan.special_cloud.tencent.urls =
# 如果源端是阿里云的集群版,那么需要传入子节点的个数。例如:16
scan.special_cloud.aliyun.node_number =
# ----------------splitter----------------
# below variables are useless for current opensource version so don't set.
# below variables are useless for current open source version so don't set.
# replace hash tag.
# used in `sync`.

@ -58,8 +58,8 @@ func Welcome() {
/ / (o)
------------------------------
`
log.Warn("\n", welcome)
startMsg := "if you have any problem, please visit https://github.com/alibaba/RedisShake/wiki/FAQ"
log.Warnf("\n%s%s\n\n", welcome, startMsg)
}
func Goodbye() {

@ -4,43 +4,48 @@ import "time"
type Configuration struct {
// config file variables
Id string `config:"id"`
LogFile string `config:"log_file"`
SystemProfile int `config:"system_profile"`
HttpProfile int `config:"http_profile"`
NCpu int `config:"ncpu"`
Parallel int `config:"parallel"`
InputRdb string `config:"input_rdb"`
OutputRdb string `config:"output_rdb"`
SourceAddress string `config:"source.address"`
SourcePasswordRaw string `config:"source.password_raw"`
SourcePasswordEncoding string `config:"source.password_encoding"`
SourceVersion uint `config:"source.version"`
SourceAuthType string `config:"source.auth_type"`
TargetAddress string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"`
TargetVersion uint `config:"target.version"`
TargetDB int `config:"target.db"`
TargetAuthType string `config:"target.auth_type"`
FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"`
FilterDB string `config:"filter.db"`
FilterKey []string `config:"filter.key"`
FilterSlot []string `config:"filter.slot"`
BigKeyThreshold uint64 `config:"big_key_threshold"`
Psync bool `config:"psync"`
Metric bool `config:"metric"`
MetricPrintLog bool `config:"metric.print_log"`
HeartbeatUrl string `config:"heartbeat.url"`
HeartbeatInterval uint `config:"heartbeat.interval"`
HeartbeatExternal string `config:"heartbeat.external"`
HeartbeatNetworkInterface string `config:"heartbeat.network_interface"`
SenderSize uint64 `config:"sender.size"`
SenderCount uint `config:"sender.count"`
SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
KeepAlive uint `config:"keep_alive"`
PidPath string `config:"pid_path"`
Id string `config:"id"`
LogFile string `config:"log.file"`
LogLevel string `config:"log.level"`
SystemProfile int `config:"system_profile"`
HttpProfile int `config:"http_profile"`
NCpu int `config:"ncpu"`
Parallel int `config:"parallel"`
InputRdb string `config:"input_rdb"`
OutputRdb string `config:"output_rdb"`
SourceAddress string `config:"source.address"`
SourcePasswordRaw string `config:"source.password_raw"`
SourcePasswordEncoding string `config:"source.password_encoding"`
SourceVersion uint `config:"source.version"`
SourceAuthType string `config:"source.auth_type"`
TargetAddress string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"`
TargetVersion uint `config:"target.version"`
TargetDB int `config:"target.db"`
TargetAuthType string `config:"target.auth_type"`
FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"`
FilterDB string `config:"filter.db"`
FilterKey []string `config:"filter.key"`
FilterSlot []string `config:"filter.slot"`
BigKeyThreshold uint64 `config:"big_key_threshold"`
Psync bool `config:"psync"`
Metric bool `config:"metric"`
MetricPrintLog bool `config:"metric.print_log"`
HeartbeatUrl string `config:"heartbeat.url"`
HeartbeatInterval uint `config:"heartbeat.interval"`
HeartbeatExternal string `config:"heartbeat.external"`
HeartbeatNetworkInterface string `config:"heartbeat.network_interface"`
SenderSize uint64 `config:"sender.size"`
SenderCount uint `config:"sender.count"`
SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
KeepAlive uint `config:"keep_alive"`
PidPath string `config:"pid_path"`
ScanKeyNumber uint32 `config:"scan.key_number"`
ScanSpecialCloud string `config:"scan.special_cloud"`
ScanSpecialCloudTencentUrls string `config:"scan.special_cloud.tencent.urls"`
ScanSpecialCloudAliyunNodeNumber uint8 `config:"scan.special_cloud.aliyun.node_number"`
// inner variables
ReplaceHashTag bool `config:"replace_hash_tag"`

@ -39,6 +39,7 @@ const (
TypeRestore = "restore"
TypeDump = "dump"
TypeSync = "sync"
TypeRump = "rump"
defaultHttpPort = 20881
defaultSystemPort = 20882
@ -103,6 +104,8 @@ func main() {
runner = new(run.CmdDump)
case TypeSync:
runner = new(run.CmdSync)
case TypeRump:
runner = new(run.CmdRump)
}
// create metric
@ -161,7 +164,7 @@ func startHttpServer() {
// sanitize options
func sanitizeOptions(tp string) error {
var err error
if tp != TypeDecode && tp != TypeRestore && tp != TypeDump && tp != TypeSync {
if tp != TypeDecode && tp != TypeRestore && tp != TypeDump && tp != TypeSync && tp != TypeRump {
return fmt.Errorf("unknown type[%v]", tp)
}
@ -195,6 +198,9 @@ func sanitizeOptions(tp string) error {
if (tp == TypeDump || tp == TypeSync) && conf.Options.SourceAddress == "" {
return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}")
}
if tp == TypeRump && (conf.Options.SourceAddress == "" || conf.Options.TargetAddress == "") {
return fmt.Errorf("source and target address shouldn't be empty when type in {rump}")
}
if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" {
return fmt.Errorf("only one of source password_raw or password_encoding should be given")
@ -221,6 +227,25 @@ func sanitizeOptions(tp string) error {
}
log.StdLog = log.New(utils.LogRotater, "")
}
// set log level
var logDeepLevel log.LogLevel
switch conf.Options.LogLevel {
case "none":
logDeepLevel = log.LEVEL_NONE
case "error":
logDeepLevel = log.LEVEL_ERROR
case "warn":
logDeepLevel = log.LEVEL_WARN
case "":
fallthrough
case "info":
logDeepLevel = log.LEVEL_INFO
case "all":
logDeepLevel = log.LEVEL_DEBUG
default:
return fmt.Errorf("invalid log level[%v]", conf.Options.LogLevel)
}
log.SetLevel(logDeepLevel)
// heartbeat, 86400 = 1 day
if conf.Options.HeartbeatInterval > 86400 {
@ -323,6 +348,26 @@ func sanitizeOptions(tp string) error {
}
}
if tp == TypeRump {
if conf.Options.ScanKeyNumber == 0 {
conf.Options.ScanKeyNumber = 100
}
if conf.Options.ScanSpecialCloud == run.TencentCluster {
if len(conf.Options.ScanSpecialCloudTencentUrls) == 0 {
return fmt.Errorf("`scan.special_cloud.tencent.urls` shouldn't be empty when " +
"`scan.special_cloud` is [%s]", run.TencentCluster)
}
} else if conf.Options.ScanSpecialCloud == run.AliyunCluster {
if conf.Options.ScanSpecialCloudAliyunNodeNumber == 0 {
return fmt.Errorf("`scan.special_cloud.aliyun.node_number` shouldn't be 0 when " +
"`scan.special_cloud` is [%s]", run.AliyunCluster)
}
} else if conf.Options.ScanSpecialCloud != "" {
return fmt.Errorf("special cloud type[%s] is not supported", conf.Options.ScanSpecialCloud)
}
}
return nil
}

@ -0,0 +1,173 @@
package run
import (
"pkg/libs/log"
"strconv"
"redis-shake/common"
"redis-shake/configure"
"github.com/garyburd/redigo/redis"
)
const (
TencentCluster = "tencent_cluster"
AliyunCluster = "aliyun_cluster"
)
type CmdRump struct {
sourceConn redis.Conn
targetConn redis.Conn
keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2
resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3
}
type KeyNode struct {
key string
value string
pttl int64
}
func (cr *CmdRump) GetDetailedInfo() []interface{} {
return nil
}
func (cr *CmdRump) Main() {
// build connection
cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw)
cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw)
// init two channels
cr.keyChan = make(chan *KeyNode, conf.Options.ScanKeyNumber)
cr.resultChan = make(chan *KeyNode, conf.Options.ScanKeyNumber)
/*
* we start 3 routines to run:
* 1. fetch keys from the source redis
* 2. write keys into the target redis
* 3. read result from the target redis
*/
// routine1
go cr.fetcher()
// routine2
go cr.writer()
// routine3
cr.receiver()
}
func (cr *CmdRump) fetcher() {
length := 1
if conf.Options.ScanSpecialCloud == TencentCluster {
length = len(conf.Options.ScanSpecialCloudTencentUrls)
} else if conf.Options.ScanSpecialCloud == AliyunCluster {
length = int(conf.Options.ScanSpecialCloudAliyunNodeNumber)
}
// iterate all source nodes
for i := 0; i < length; i++ {
var (
cursor int64
keys []string
values []interface{}
err error
)
// fetch data from on node
for {
switch conf.Options.ScanSpecialCloud {
case "":
values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT",
conf.Options.ScanKeyNumber))
case TencentCluster:
values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT",
conf.Options.ScanKeyNumber, conf.Options.ScanSpecialCloudTencentUrls[i]))
case AliyunCluster:
values, err = redis.Values(cr.sourceConn.Do("ISCAN", i, cursor, "COUNT",
conf.Options.ScanKeyNumber))
}
if err != nil && err != redis.ErrNil {
log.Panicf("scan with cursor[%v] failed[%v]", cursor, err)
}
values, err = redis.Scan(values, &cursor, &keys)
if err != nil && err != redis.ErrNil {
log.Panicf("do scan with cursor[%v] failed[%v]", cursor, err)
}
log.Info("scaned keys: ", len(keys))
// pipeline dump
for _, key := range keys {
log.Debug("scan key: ", key)
cr.sourceConn.Send("DUMP", key)
}
dumps, err := redis.Strings(cr.sourceConn.Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do dump with cursor[%v] failed[%v]", cursor, err)
}
// pipeline ttl
for _, key := range keys {
cr.sourceConn.Send("PTTL", key)
}
pttls, err := redis.Int64s(cr.sourceConn.Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do ttl with cursor[%v] failed[%v]", cursor, err)
}
for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
}
// Last iteration of scan.
if cursor == 0 {
break
}
}
}
close(cr.keyChan)
}
func (cr *CmdRump) writer() {
var count uint32
for ele := range cr.keyChan {
if ele.pttl == -1 { // not set ttl
ele.pttl = 0
}
if ele.pttl == -2 {
log.Debugf("skip key %s for expired", ele.key)
continue
}
log.Debugf("restore %s", ele.key)
if conf.Options.Rewrite {
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE")
} else {
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value)
}
cr.resultChan <- ele
count++
if count == conf.Options.ScanKeyNumber {
// batch
log.Debugf("send keys %d\n", count)
cr.targetConn.Flush()
count = 0
}
}
cr.targetConn.Flush()
close(cr.resultChan)
}
func (cr *CmdRump) receiver() {
for ele := range cr.resultChan {
if _, err := cr.targetConn.Receive(); err != nil && err != redis.ErrNil {
log.Panicf("restore key[%v] with pttl[%v] error[%v]", ele.key, strconv.FormatInt(ele.pttl, 10),
err)
}
}
}

@ -463,6 +463,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd
} else {
// cmd.SyncStat.PullCmdCount.Incr()
metric.MetricVar.AddPullCmdCount(1)
if scmd != "ping" {
if strings.EqualFold(scmd, "select") {
if len(argv) != 1 {

Loading…
Cancel
Save