From 25272477f0baec12921c5b87dd617ad76b67c593 Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 25 Jun 2019 14:57:48 +0800 Subject: [PATCH 01/11] polish conf comments --- conf/redis-shake.conf | 8 ++++---- src/redis-shake/common/configure.go | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 8acf1a7..345af66 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -35,12 +35,12 @@ source.type = standalone # ip:port # the source address can be the following: # 1. single db address. for "standalone" type. -# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. +# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. # 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. e.g., 10.1.1.1:20331;10.1.1.2:20441. # 4. proxy address(used in "rump" mode only). for "proxy" type. # 源redis地址。对于sentinel模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址" source.address = 127.0.0.1:20441 -# password. +# password of db/proxy. even if type is sentinel. source.password_raw = 123456 # auth type, don't modify it source.auth_type = auth @@ -59,11 +59,11 @@ target.type = standalone # ip:port # the target address can be the following: # 1. single db address. for "standalone" type. -# 2. sentinel_master_name@sentinel single/cluster address, e.g., mymaster@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. +# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. # 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. # 4. proxy address(used in "rump" mode only). for "proxy" type. target.address = 127.0.0.1:20551 -# password. +# password of db/proxy. even if type is sentinel. target.password_raw = # auth type, don't modify it target.auth_type = auth diff --git a/src/redis-shake/common/configure.go b/src/redis-shake/common/configure.go index 8b38f27..453e4d3 100644 --- a/src/redis-shake/common/configure.go +++ b/src/redis-shake/common/configure.go @@ -57,8 +57,9 @@ func parseAddress(tp, address, redisType string, isSource bool) error { case conf.RedisTypeSentinel: arr := strings.Split(address, AddressSplitter) if len(arr) != 2 { - return fmt.Errorf("redis type[%v] address[%v] length[%v] != 2", - conf.RedisTypeStandalone, address, len(arr)) + return fmt.Errorf("redis type[%v] address[%v] must begin with or has '%v': e.g., \"master@ip1:port1;ip2:port2\", " + + "\"@ip1:port1,ip2:port2\"", + conf.RedisTypeSentinel, address, AddressSplitter) } var masterName string From 0493252ec890d8ad7f0f55cf76155ba0474980fa Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 25 Jun 2019 15:00:49 +0800 Subject: [PATCH 02/11] polish conf again --- conf/redis-shake.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 345af66..e950d3d 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -112,7 +112,7 @@ filter.db = # e.g., a;b;c # default is all. # used in `restore`, `sync` and `rump`. -# 支持过滤key,只让指定的key通过,分号分隔 +# 支持按前缀过滤key,只让指定前缀的key通过,分号分隔 filter.key = # filter given slot, multiple slots are separated by ';'. # e.g., 1;2;3 From 560a0943a6eb3960d6264ff0217619f930f5ebe2 Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 25 Jun 2019 17:45:07 +0800 Subject: [PATCH 03/11] polish code and cond --- conf/redis-shake.conf | 2 +- src/redis-shake/main/main.go | 7 ++++++- src/redis-shake/sync.go | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index e950d3d..8a7f5ab 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -112,7 +112,7 @@ filter.db = # e.g., a;b;c # default is all. # used in `restore`, `sync` and `rump`. -# 支持按前缀过滤key,只让指定前缀的key通过,分号分隔 +# 支持按前缀过滤key,只让指定前缀的key通过,分号分隔。比如指定abc,将会过滤abc, abc1, abcxxx filter.key = # filter given slot, multiple slots are separated by ';'. # e.g., 1;2;3 diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index dcfb0fb..303f140 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -51,7 +51,12 @@ func main() { version := flag.Bool("version", false, "show version") flag.Parse() - if *configuration == "" || *tp == "" || *version { + if *version { + fmt.Println(utils.Version) + return + } + + if *configuration == "" || *tp == "" { if !*version { fmt.Println("Please show me the '-conf' and '-type'") } diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 1da67a1..894c517 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -629,7 +629,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type if len(conf.Options.FilterKey) != 0 { cmdNode, ok := command.RedisCommands[scmd] if ok && len(argv) > 0 { - log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd) + // log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd) new_argv, pass = command.GetMatchKeys(cmdNode, argv, conf.Options.FilterKey) } else { pass = true From b2a9f50011fb9d7810b87d86f83f4c31968a1a86 Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 25 Jun 2019 17:53:29 +0800 Subject: [PATCH 04/11] polish code and cond2 --- src/redis-shake/main/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 303f140..92d33ec 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -55,7 +55,7 @@ func main() { fmt.Println(utils.Version) return } - + if *configuration == "" || *tp == "" { if !*version { fmt.Println("Please show me the '-conf' and '-type'") From 7b8d3e2c6a07ced1dd67ce6c20cfa18dcd3280b2 Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 25 Jun 2019 22:05:57 +0800 Subject: [PATCH 05/11] polish --- src/redis-shake/common/split.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/redis-shake/common/split.go b/src/redis-shake/common/split.go index 30d76d2..bbf53b2 100644 --- a/src/redis-shake/common/split.go +++ b/src/redis-shake/common/split.go @@ -18,7 +18,7 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db Key: String2Bytes(key), Type: 0, // uselss Value: String2Bytes(value), - ExpireAt: 0, // useless here + ExpireAt: 0, // useless here RealMemberCount: 0, NeedReadLen: 1, IdleTime: 0, @@ -31,4 +31,4 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db if _, err := client.Do("pexpire", key, pttl); err != nil { log.Panicf("send key[%v] pexpire failed[%v]", key, err) } -} \ No newline at end of file +} From d7e515adb2ec5a70dba715463c8407bfd1cd8966 Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 25 Jun 2019 22:28:27 +0800 Subject: [PATCH 06/11] support lua in decode mode --- src/redis-shake/decode.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index 880a9d5..986f731 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -136,11 +136,25 @@ func (cmd *CmdDecode) decoderMain(ipipe <-chan *rdb.BinEntry, opipe chan<- strin return string(b) } for e := range ipipe { + var b bytes.Buffer + if e.Type == rdb.RdbFlagAUX && string(e.Key) == "lua" { + o := &struct { + Type string `json:"type"` + Key string `json:"key"` + Value64 string `json:"value64"` + }{ + "aux", "lua", string(e.Value), + } + fmt.Fprintf(&b, "%s\n", toJson(o)) + cmd.nentry.Incr() + opipe <- b.String() + continue + } + o, err := rdb.DecodeDump(e.Value) if err != nil { log.PanicError(err, "decode failed") } - var b bytes.Buffer switch obj := o.(type) { default: log.Panicf("unknown object %v", o) From 229d1f1882b8a8383ba96c7effb239ebb3375475 Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 25 Jun 2019 22:42:45 +0800 Subject: [PATCH 07/11] support lua in decode mode2 --- src/redis-shake/decode.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index 986f731..3aa12fd 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -137,13 +137,13 @@ func (cmd *CmdDecode) decoderMain(ipipe <-chan *rdb.BinEntry, opipe chan<- strin } for e := range ipipe { var b bytes.Buffer - if e.Type == rdb.RdbFlagAUX && string(e.Key) == "lua" { + if e.Type == rdb.RdbFlagAUX { o := &struct { Type string `json:"type"` Key string `json:"key"` Value64 string `json:"value64"` }{ - "aux", "lua", string(e.Value), + "aux", string(e.Key), string(e.Value), } fmt.Fprintf(&b, "%s\n", toJson(o)) cmd.nentry.Incr() From dcb0c19d97dd34f680d256a35f819bce45d4d57b Mon Sep 17 00:00:00 2001 From: vinllen Date: Fri, 28 Jun 2019 20:55:59 +0800 Subject: [PATCH 08/11] fix bug of filter lua --- ChangeLog | 3 +++ src/redis-shake/common/filter.go | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/ChangeLog b/ChangeLog index 93b7b50..c1ceb02 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,6 @@ +2019-06-25 Alibaba Cloud. + * VERSION: 1.6.10 + * IMPROVE: support print Lua in `decode` mode. 2019-06-21 Alibaba Cloud. * VERSION: 1.6.9 * IMPROVE: support Lua and transaction when target is open source cluster diff --git a/src/redis-shake/common/filter.go b/src/redis-shake/common/filter.go index a4a1adc..720c438 100644 --- a/src/redis-shake/common/filter.go +++ b/src/redis-shake/common/filter.go @@ -8,7 +8,8 @@ func FilterCommands(cmd string, luaFilter bool) bool { return true } - if luaFilter && (strings.EqualFold(cmd, "eval") || strings.EqualFold(cmd, "script")) { + if luaFilter && (strings.EqualFold(cmd, "eval") || strings.EqualFold(cmd, "script") || + strings.EqualFold(cmd, "evalsha")) { return true } From cd8e28a392a725998296c112f7c894d6d360c1ef Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 3 Jul 2019 10:58:55 +0800 Subject: [PATCH 09/11] update driver to add more error info --- src/vendor/vendor.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/vendor/vendor.json b/src/vendor/vendor.json index 6303482..e518a12 100644 --- a/src/vendor/vendor.json +++ b/src/vendor/vendor.json @@ -159,10 +159,10 @@ "revisionTime": "2019-03-04T09:57:49Z" }, { - "checksumSHA1": "ZT2d9cNq14zxFxnzA2kaqj8tfJY=", + "checksumSHA1": "UXns0bW61NZgqtFBLj6jPgAwF+U=", "path": "github.com/vinllen/redis-go-cluster", - "revision": "bdcf6ff491eca6a29ba905300253a65d88eb1ad6", - "revisionTime": "2019-06-24T08:07:27Z" + "revision": "6ae0947e93ad83020f121a9b179d95e117fc0a5c", + "revisionTime": "2019-07-03T02:46:24Z" }, { "checksumSHA1": "TM3Neoy1xRAKyZYMGzKc41sDFW4=", From 4a85f9bd07667ddf5f296f9f0eced8b4ce9d45de Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 3 Jul 2019 11:37:48 +0800 Subject: [PATCH 10/11] polish README --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 4df0411..f9b5e30 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,11 @@ The type can be one of the followings:
Please check out the `conf/redis-shake.conf` to see the detailed parameters description.
+# Support +--- +Redis version from 2.x to 5.0. +Standalone, Cluster, Codis, Aliyun Cluster Proxy, Tencent Cloud Proxy and so on. + # Configuration Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters. From d02ff3bfc418efe11e1b08be52098152b57c7ca9 Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 3 Jul 2019 20:06:21 +0800 Subject: [PATCH 11/11] check version and checksum when meeting error in rump mode --- ChangeLog | 3 +++ README.md | 4 ++-- src/redis-shake/common/common.go | 30 ++++++++++++++++++++++++++++++ src/redis-shake/rump.go | 10 ++++++++-- 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/ChangeLog b/ChangeLog index c1ceb02..192729b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,9 @@ 2019-06-25 Alibaba Cloud. * VERSION: 1.6.10 * IMPROVE: support print Lua in `decode` mode. + * BUGFIX: merge metric panic PR#111 + * IMPROVE: check checksum and version once receiving error from the target in + `rump` mode. 2019-06-21 Alibaba Cloud. * VERSION: 1.6.9 * IMPROVE: support Lua and transaction when target is open source cluster diff --git a/README.md b/README.md index f9b5e30..7ae280e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -RedisShake is mainly used to synchronize data from one redis database to another.
+RedisShake is mainly used to synchronize data from one redis to another.
Thanks to the Douyu's WSD team for the support.
* [中文文档](https://yq.aliyun.com/articles/691794) @@ -26,7 +26,7 @@ Please check out the `conf/redis-shake.conf` to see the detailed parameters desc # Support --- Redis version from 2.x to 5.0. -Standalone, Cluster, Codis, Aliyun Cluster Proxy, Tencent Cloud Proxy and so on. +Supports `Standalone`, `Cluster`, `Codis`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on. # Configuration Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters. diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 6c4ddd3..4393773 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -7,11 +7,13 @@ import ( "strings" "reflect" "unsafe" + "encoding/binary" "pkg/libs/bytesize" "redis-shake/configure" logRotate "gopkg.in/natefinch/lumberjack.v2" + "github.com/cupcake/rdb/crc64" ) const ( @@ -36,6 +38,7 @@ var ( LogRotater *logRotate.Logger StartTime string TargetRoundRobin int + RDBVersion uint = 9 // 9 for 5.0 ) const ( @@ -111,4 +114,31 @@ func String2Bytes(s string) []byte { func Bytes2String(b []byte) string { return *(*string)(unsafe.Pointer(&b)) +} + +func CheckVersionChecksum(d []byte) (uint, uint64, error) { + /* Write the footer, this is how it looks like: + * ----------------+---------------------+---------------+ + * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | + * ----------------+---------------------+---------------+ + * RDB version and CRC are both in little endian. + */ + length := len(d) + if length < 10 { + return 0, 0, fmt.Errorf("rdb: invalid dump length") + } + + footer := length - 10 + rdbVersion := uint((d[footer + 1] << 8) | d[footer]) + if rdbVersion > RDBVersion { + return 0, 0, fmt.Errorf("current version[%v] > RDBVersion[%v]", rdbVersion, RDBVersion) + } + + checksum := binary.LittleEndian.Uint64(d[length - 8:]) + digest := crc64.Digest(d[: length - 8]) + if checksum != digest { + return 0, 0, fmt.Errorf("rdb: invalid CRC checksum[%v] != digest[%v]", checksum, digest) + } + + return rdbVersion, checksum, nil } \ No newline at end of file diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 1183c8f..46fa3da 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -344,6 +344,9 @@ func (dre *dbRumperExecutor) writer() { // handle big key utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db) + + // all the reply has been handled in RestoreBigkey + // dre.resultChan <- ele continue } @@ -408,8 +411,11 @@ func (dre *dbRumperExecutor) writeSend(batch []*KeyNode, count *uint32, wBytes * func (dre *dbRumperExecutor) receiver() { for ele := range dre.resultChan { if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil { - log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId, - dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err) + rdbVersion, checksum, checkErr := utils.CheckVersionChecksum(utils.String2Bytes(ele.value)) + log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], " + + "rdb version[%v], checksum[%v], check error[%v]", + dre.rumperId, dre.executorId, ele.key, err, strconv.FormatInt(ele.pttl, 10), len(ele.value), + rdbVersion, checksum, checkErr) } dre.stat.cCommands.Incr() }