diff --git a/ChangeLog b/ChangeLog index 93b7b50..192729b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,9 @@ +2019-06-25 Alibaba Cloud. + * VERSION: 1.6.10 + * IMPROVE: support print Lua in `decode` mode. + * BUGFIX: merge metric panic PR#111 + * IMPROVE: check checksum and version once receiving error from the target in + `rump` mode. 2019-06-21 Alibaba Cloud. * VERSION: 1.6.9 * IMPROVE: support Lua and transaction when target is open source cluster diff --git a/README.md b/README.md index 4df0411..7ae280e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -RedisShake is mainly used to synchronize data from one redis database to another.
+RedisShake is mainly used to synchronize data from one redis to another.
Thanks to the Douyu's WSD team for the support.
* [中文文档](https://yq.aliyun.com/articles/691794) @@ -23,6 +23,11 @@ The type can be one of the followings:
Please check out the `conf/redis-shake.conf` to see the detailed parameters description.
+# Support +--- +Redis version from 2.x to 5.0. +Supports `Standalone`, `Cluster`, `Codis`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on. + # Configuration Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters. diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 8acf1a7..8a7f5ab 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -35,12 +35,12 @@ source.type = standalone # ip:port # the source address can be the following: # 1. single db address. for "standalone" type. -# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. +# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. # 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. e.g., 10.1.1.1:20331;10.1.1.2:20441. # 4. proxy address(used in "rump" mode only). for "proxy" type. # 源redis地址。对于sentinel模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址" source.address = 127.0.0.1:20441 -# password. +# password of db/proxy. even if type is sentinel. source.password_raw = 123456 # auth type, don't modify it source.auth_type = auth @@ -59,11 +59,11 @@ target.type = standalone # ip:port # the target address can be the following: # 1. single db address. for "standalone" type. -# 2. sentinel_master_name@sentinel single/cluster address, e.g., mymaster@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. +# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. # 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. # 4. proxy address(used in "rump" mode only). for "proxy" type. target.address = 127.0.0.1:20551 -# password. +# password of db/proxy. even if type is sentinel. target.password_raw = # auth type, don't modify it target.auth_type = auth @@ -112,7 +112,7 @@ filter.db = # e.g., a;b;c # default is all. # used in `restore`, `sync` and `rump`. -# 支持过滤key,只让指定的key通过,分号分隔 +# 支持按前缀过滤key,只让指定前缀的key通过,分号分隔。比如指定abc,将会过滤abc, abc1, abcxxx filter.key = # filter given slot, multiple slots are separated by ';'. # e.g., 1;2;3 diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 6c4ddd3..4393773 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -7,11 +7,13 @@ import ( "strings" "reflect" "unsafe" + "encoding/binary" "pkg/libs/bytesize" "redis-shake/configure" logRotate "gopkg.in/natefinch/lumberjack.v2" + "github.com/cupcake/rdb/crc64" ) const ( @@ -36,6 +38,7 @@ var ( LogRotater *logRotate.Logger StartTime string TargetRoundRobin int + RDBVersion uint = 9 // 9 for 5.0 ) const ( @@ -111,4 +114,31 @@ func String2Bytes(s string) []byte { func Bytes2String(b []byte) string { return *(*string)(unsafe.Pointer(&b)) +} + +func CheckVersionChecksum(d []byte) (uint, uint64, error) { + /* Write the footer, this is how it looks like: + * ----------------+---------------------+---------------+ + * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | + * ----------------+---------------------+---------------+ + * RDB version and CRC are both in little endian. + */ + length := len(d) + if length < 10 { + return 0, 0, fmt.Errorf("rdb: invalid dump length") + } + + footer := length - 10 + rdbVersion := uint((d[footer + 1] << 8) | d[footer]) + if rdbVersion > RDBVersion { + return 0, 0, fmt.Errorf("current version[%v] > RDBVersion[%v]", rdbVersion, RDBVersion) + } + + checksum := binary.LittleEndian.Uint64(d[length - 8:]) + digest := crc64.Digest(d[: length - 8]) + if checksum != digest { + return 0, 0, fmt.Errorf("rdb: invalid CRC checksum[%v] != digest[%v]", checksum, digest) + } + + return rdbVersion, checksum, nil } \ No newline at end of file diff --git a/src/redis-shake/common/configure.go b/src/redis-shake/common/configure.go index 8b38f27..453e4d3 100644 --- a/src/redis-shake/common/configure.go +++ b/src/redis-shake/common/configure.go @@ -57,8 +57,9 @@ func parseAddress(tp, address, redisType string, isSource bool) error { case conf.RedisTypeSentinel: arr := strings.Split(address, AddressSplitter) if len(arr) != 2 { - return fmt.Errorf("redis type[%v] address[%v] length[%v] != 2", - conf.RedisTypeStandalone, address, len(arr)) + return fmt.Errorf("redis type[%v] address[%v] must begin with or has '%v': e.g., \"master@ip1:port1;ip2:port2\", " + + "\"@ip1:port1,ip2:port2\"", + conf.RedisTypeSentinel, address, AddressSplitter) } var masterName string diff --git a/src/redis-shake/common/filter.go b/src/redis-shake/common/filter.go index a4a1adc..720c438 100644 --- a/src/redis-shake/common/filter.go +++ b/src/redis-shake/common/filter.go @@ -8,7 +8,8 @@ func FilterCommands(cmd string, luaFilter bool) bool { return true } - if luaFilter && (strings.EqualFold(cmd, "eval") || strings.EqualFold(cmd, "script")) { + if luaFilter && (strings.EqualFold(cmd, "eval") || strings.EqualFold(cmd, "script") || + strings.EqualFold(cmd, "evalsha")) { return true } diff --git a/src/redis-shake/common/split.go b/src/redis-shake/common/split.go index 30d76d2..bbf53b2 100644 --- a/src/redis-shake/common/split.go +++ b/src/redis-shake/common/split.go @@ -18,7 +18,7 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db Key: String2Bytes(key), Type: 0, // uselss Value: String2Bytes(value), - ExpireAt: 0, // useless here + ExpireAt: 0, // useless here RealMemberCount: 0, NeedReadLen: 1, IdleTime: 0, @@ -31,4 +31,4 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db if _, err := client.Do("pexpire", key, pttl); err != nil { log.Panicf("send key[%v] pexpire failed[%v]", key, err) } -} \ No newline at end of file +} diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index 880a9d5..3aa12fd 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -136,11 +136,25 @@ func (cmd *CmdDecode) decoderMain(ipipe <-chan *rdb.BinEntry, opipe chan<- strin return string(b) } for e := range ipipe { + var b bytes.Buffer + if e.Type == rdb.RdbFlagAUX { + o := &struct { + Type string `json:"type"` + Key string `json:"key"` + Value64 string `json:"value64"` + }{ + "aux", string(e.Key), string(e.Value), + } + fmt.Fprintf(&b, "%s\n", toJson(o)) + cmd.nentry.Incr() + opipe <- b.String() + continue + } + o, err := rdb.DecodeDump(e.Value) if err != nil { log.PanicError(err, "decode failed") } - var b bytes.Buffer switch obj := o.(type) { default: log.Panicf("unknown object %v", o) diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index dcfb0fb..92d33ec 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -51,7 +51,12 @@ func main() { version := flag.Bool("version", false, "show version") flag.Parse() - if *configuration == "" || *tp == "" || *version { + if *version { + fmt.Println(utils.Version) + return + } + + if *configuration == "" || *tp == "" { if !*version { fmt.Println("Please show me the '-conf' and '-type'") } diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index e22b0a4..46fa3da 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -344,6 +344,9 @@ func (dre *dbRumperExecutor) writer() { // handle big key utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db) + + // all the reply has been handled in RestoreBigkey + // dre.resultChan <- ele continue } @@ -369,7 +372,7 @@ func (dre *dbRumperExecutor) writer() { // dre.resultChan <- ele count++ - if count == conf.Options.ScanKeyNumber { + if count >= conf.Options.ScanKeyNumber { // batch log.Debugf("dbRumper[%v] executor[%v] send keys %d", dre.rumperId, dre.executorId, count) @@ -408,8 +411,11 @@ func (dre *dbRumperExecutor) writeSend(batch []*KeyNode, count *uint32, wBytes * func (dre *dbRumperExecutor) receiver() { for ele := range dre.resultChan { if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil { - log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId, - dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err) + rdbVersion, checksum, checkErr := utils.CheckVersionChecksum(utils.String2Bytes(ele.value)) + log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], " + + "rdb version[%v], checksum[%v], check error[%v]", + dre.rumperId, dre.executorId, ele.key, err, strconv.FormatInt(ele.pttl, 10), len(ele.value), + rdbVersion, checksum, checkErr) } dre.stat.cCommands.Incr() } diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 1da67a1..894c517 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -629,7 +629,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type if len(conf.Options.FilterKey) != 0 { cmdNode, ok := command.RedisCommands[scmd] if ok && len(argv) > 0 { - log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd) + // log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd) new_argv, pass = command.GetMatchKeys(cmdNode, argv, conf.Options.FilterKey) } else { pass = true diff --git a/src/vendor/vendor.json b/src/vendor/vendor.json index 6303482..e518a12 100644 --- a/src/vendor/vendor.json +++ b/src/vendor/vendor.json @@ -159,10 +159,10 @@ "revisionTime": "2019-03-04T09:57:49Z" }, { - "checksumSHA1": "ZT2d9cNq14zxFxnzA2kaqj8tfJY=", + "checksumSHA1": "UXns0bW61NZgqtFBLj6jPgAwF+U=", "path": "github.com/vinllen/redis-go-cluster", - "revision": "bdcf6ff491eca6a29ba905300253a65d88eb1ad6", - "revisionTime": "2019-06-24T08:07:27Z" + "revision": "6ae0947e93ad83020f121a9b179d95e117fc0a5c", + "revisionTime": "2019-07-03T02:46:24Z" }, { "checksumSHA1": "TM3Neoy1xRAKyZYMGzKc41sDFW4=",