check version and checksum when meeting error in rump mode

v4
vinllen 5 years ago
parent fedeb66e6c
commit d02ff3bfc4
  1. 3
      ChangeLog
  2. 4
      README.md
  3. 30
      src/redis-shake/common/common.go
  4. 10
      src/redis-shake/rump.go

@ -1,6 +1,9 @@
2019-06-25 Alibaba Cloud.
* VERSION: 1.6.10
* IMPROVE: support print Lua in `decode` mode.
* BUGFIX: merge metric panic PR#111
* IMPROVE: check checksum and version once receiving error from the target in
`rump` mode.
2019-06-21 Alibaba Cloud.
* VERSION: 1.6.9
* IMPROVE: support Lua and transaction when target is open source cluster

@ -1,4 +1,4 @@
RedisShake is mainly used to synchronize data from one redis database to another.<br>
RedisShake is mainly used to synchronize data from one redis to another.<br>
Thanks to the Douyu's WSD team for the support. <br>
* [中文文档](https://yq.aliyun.com/articles/691794)
@ -26,7 +26,7 @@ Please check out the `conf/redis-shake.conf` to see the detailed parameters desc
# Support
---
Redis version from 2.x to 5.0.
Standalone, Cluster, Codis, Aliyun Cluster Proxy, Tencent Cloud Proxy and so on.
Supports `Standalone`, `Cluster`, `Codis`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on.
# Configuration
Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters.

@ -7,11 +7,13 @@ import (
"strings"
"reflect"
"unsafe"
"encoding/binary"
"pkg/libs/bytesize"
"redis-shake/configure"
logRotate "gopkg.in/natefinch/lumberjack.v2"
"github.com/cupcake/rdb/crc64"
)
const (
@ -36,6 +38,7 @@ var (
LogRotater *logRotate.Logger
StartTime string
TargetRoundRobin int
RDBVersion uint = 9 // 9 for 5.0
)
const (
@ -111,4 +114,31 @@ func String2Bytes(s string) []byte {
func Bytes2String(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
func CheckVersionChecksum(d []byte) (uint, uint64, error) {
/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
* ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
* ----------------+---------------------+---------------+
* RDB version and CRC are both in little endian.
*/
length := len(d)
if length < 10 {
return 0, 0, fmt.Errorf("rdb: invalid dump length")
}
footer := length - 10
rdbVersion := uint((d[footer + 1] << 8) | d[footer])
if rdbVersion > RDBVersion {
return 0, 0, fmt.Errorf("current version[%v] > RDBVersion[%v]", rdbVersion, RDBVersion)
}
checksum := binary.LittleEndian.Uint64(d[length - 8:])
digest := crc64.Digest(d[: length - 8])
if checksum != digest {
return 0, 0, fmt.Errorf("rdb: invalid CRC checksum[%v] != digest[%v]", checksum, digest)
}
return rdbVersion, checksum, nil
}

@ -344,6 +344,9 @@ func (dre *dbRumperExecutor) writer() {
// handle big key
utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db)
// all the reply has been handled in RestoreBigkey
// dre.resultChan <- ele
continue
}
@ -408,8 +411,11 @@ func (dre *dbRumperExecutor) writeSend(batch []*KeyNode, count *uint32, wBytes *
func (dre *dbRumperExecutor) receiver() {
for ele := range dre.resultChan {
if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil {
log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId,
dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err)
rdbVersion, checksum, checkErr := utils.CheckVersionChecksum(utils.String2Bytes(ele.value))
log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], " +
"rdb version[%v], checksum[%v], check error[%v]",
dre.rumperId, dre.executorId, ele.key, err, strconv.FormatInt(ele.pttl, 10), len(ele.value),
rdbVersion, checksum, checkErr)
}
dre.stat.cCommands.Incr()
}

Loading…
Cancel
Save