diff --git a/ChangeLog b/ChangeLog index c1ceb02..192729b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,9 @@ 2019-06-25 Alibaba Cloud. * VERSION: 1.6.10 * IMPROVE: support print Lua in `decode` mode. + * BUGFIX: merge metric panic PR#111 + * IMPROVE: check checksum and version once receiving error from the target in + `rump` mode. 2019-06-21 Alibaba Cloud. * VERSION: 1.6.9 * IMPROVE: support Lua and transaction when target is open source cluster diff --git a/README.md b/README.md index f9b5e30..7ae280e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -RedisShake is mainly used to synchronize data from one redis database to another.
+RedisShake is mainly used to synchronize data from one redis to another.
Thanks to the Douyu's WSD team for the support.
* [中文文档](https://yq.aliyun.com/articles/691794) @@ -26,7 +26,7 @@ Please check out the `conf/redis-shake.conf` to see the detailed parameters desc # Support --- Redis version from 2.x to 5.0. -Standalone, Cluster, Codis, Aliyun Cluster Proxy, Tencent Cloud Proxy and so on. +Supports `Standalone`, `Cluster`, `Codis`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on. # Configuration Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters. diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 6c4ddd3..4393773 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -7,11 +7,13 @@ import ( "strings" "reflect" "unsafe" + "encoding/binary" "pkg/libs/bytesize" "redis-shake/configure" logRotate "gopkg.in/natefinch/lumberjack.v2" + "github.com/cupcake/rdb/crc64" ) const ( @@ -36,6 +38,7 @@ var ( LogRotater *logRotate.Logger StartTime string TargetRoundRobin int + RDBVersion uint = 9 // 9 for 5.0 ) const ( @@ -111,4 +114,31 @@ func String2Bytes(s string) []byte { func Bytes2String(b []byte) string { return *(*string)(unsafe.Pointer(&b)) +} + +func CheckVersionChecksum(d []byte) (uint, uint64, error) { + /* Write the footer, this is how it looks like: + * ----------------+---------------------+---------------+ + * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | + * ----------------+---------------------+---------------+ + * RDB version and CRC are both in little endian. + */ + length := len(d) + if length < 10 { + return 0, 0, fmt.Errorf("rdb: invalid dump length") + } + + footer := length - 10 + rdbVersion := uint((d[footer + 1] << 8) | d[footer]) + if rdbVersion > RDBVersion { + return 0, 0, fmt.Errorf("current version[%v] > RDBVersion[%v]", rdbVersion, RDBVersion) + } + + checksum := binary.LittleEndian.Uint64(d[length - 8:]) + digest := crc64.Digest(d[: length - 8]) + if checksum != digest { + return 0, 0, fmt.Errorf("rdb: invalid CRC checksum[%v] != digest[%v]", checksum, digest) + } + + return rdbVersion, checksum, nil } \ No newline at end of file diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 1183c8f..46fa3da 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -344,6 +344,9 @@ func (dre *dbRumperExecutor) writer() { // handle big key utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db) + + // all the reply has been handled in RestoreBigkey + // dre.resultChan <- ele continue } @@ -408,8 +411,11 @@ func (dre *dbRumperExecutor) writeSend(batch []*KeyNode, count *uint32, wBytes * func (dre *dbRumperExecutor) receiver() { for ele := range dre.resultChan { if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil { - log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId, - dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err) + rdbVersion, checksum, checkErr := utils.CheckVersionChecksum(utils.String2Bytes(ele.value)) + log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], " + + "rdb version[%v], checksum[%v], check error[%v]", + dre.rumperId, dre.executorId, ele.key, err, strconv.FormatInt(ele.pttl, 10), len(ele.value), + rdbVersion, checksum, checkErr) } dre.stat.cCommands.Incr() }