diff --git a/ChangeLog b/ChangeLog
index 93b7b50..192729b 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
+2019-06-25 Alibaba Cloud.
+ * VERSION: 1.6.10
+ * IMPROVE: support print Lua in `decode` mode.
+ * BUGFIX: merge metric panic PR#111
+ * IMPROVE: check checksum and version once receiving error from the target in
+ `rump` mode.
2019-06-21 Alibaba Cloud.
* VERSION: 1.6.9
* IMPROVE: support Lua and transaction when target is open source cluster
diff --git a/README.md b/README.md
index 4df0411..7ae280e 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-RedisShake is mainly used to synchronize data from one redis database to another.
+RedisShake is mainly used to synchronize data from one redis to another.
Thanks to the Douyu's WSD team for the support.
* [中文文档](https://yq.aliyun.com/articles/691794)
@@ -23,6 +23,11 @@ The type can be one of the followings:
Please check out the `conf/redis-shake.conf` to see the detailed parameters description.
+# Support
+---
+Redis version from 2.x to 5.0.
+Supports `Standalone`, `Cluster`, `Codis`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on.
+
# Configuration
Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters.
diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf
index 8acf1a7..8a7f5ab 100644
--- a/conf/redis-shake.conf
+++ b/conf/redis-shake.conf
@@ -35,12 +35,12 @@ source.type = standalone
# ip:port
# the source address can be the following:
# 1. single db address. for "standalone" type.
-# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type.
+# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type.
# 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. e.g., 10.1.1.1:20331;10.1.1.2:20441.
# 4. proxy address(used in "rump" mode only). for "proxy" type.
# 源redis地址。对于sentinel模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址"
source.address = 127.0.0.1:20441
-# password.
+# password of db/proxy. even if type is sentinel.
source.password_raw = 123456
# auth type, don't modify it
source.auth_type = auth
@@ -59,11 +59,11 @@ target.type = standalone
# ip:port
# the target address can be the following:
# 1. single db address. for "standalone" type.
-# 2. sentinel_master_name@sentinel single/cluster address, e.g., mymaster@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type.
+# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type.
# 3. cluster that has several db nodes split by semicolon(;). for "cluster" type.
# 4. proxy address(used in "rump" mode only). for "proxy" type.
target.address = 127.0.0.1:20551
-# password.
+# password of db/proxy. even if type is sentinel.
target.password_raw =
# auth type, don't modify it
target.auth_type = auth
@@ -112,7 +112,7 @@ filter.db =
# e.g., a;b;c
# default is all.
# used in `restore`, `sync` and `rump`.
-# 支持过滤key,只让指定的key通过,分号分隔
+# 支持按前缀过滤key,只让指定前缀的key通过,分号分隔。比如指定abc,将会过滤abc, abc1, abcxxx
filter.key =
# filter given slot, multiple slots are separated by ';'.
# e.g., 1;2;3
diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go
index 6c4ddd3..4393773 100644
--- a/src/redis-shake/common/common.go
+++ b/src/redis-shake/common/common.go
@@ -7,11 +7,13 @@ import (
"strings"
"reflect"
"unsafe"
+ "encoding/binary"
"pkg/libs/bytesize"
"redis-shake/configure"
logRotate "gopkg.in/natefinch/lumberjack.v2"
+ "github.com/cupcake/rdb/crc64"
)
const (
@@ -36,6 +38,7 @@ var (
LogRotater *logRotate.Logger
StartTime string
TargetRoundRobin int
+ RDBVersion uint = 9 // 9 for 5.0
)
const (
@@ -111,4 +114,31 @@ func String2Bytes(s string) []byte {
func Bytes2String(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
+}
+
+func CheckVersionChecksum(d []byte) (uint, uint64, error) {
+ /* Write the footer, this is how it looks like:
+ * ----------------+---------------------+---------------+
+ * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+ * ----------------+---------------------+---------------+
+ * RDB version and CRC are both in little endian.
+ */
+ length := len(d)
+ if length < 10 {
+ return 0, 0, fmt.Errorf("rdb: invalid dump length")
+ }
+
+ footer := length - 10
+ rdbVersion := uint((d[footer + 1] << 8) | d[footer])
+ if rdbVersion > RDBVersion {
+ return 0, 0, fmt.Errorf("current version[%v] > RDBVersion[%v]", rdbVersion, RDBVersion)
+ }
+
+ checksum := binary.LittleEndian.Uint64(d[length - 8:])
+ digest := crc64.Digest(d[: length - 8])
+ if checksum != digest {
+ return 0, 0, fmt.Errorf("rdb: invalid CRC checksum[%v] != digest[%v]", checksum, digest)
+ }
+
+ return rdbVersion, checksum, nil
}
\ No newline at end of file
diff --git a/src/redis-shake/common/configure.go b/src/redis-shake/common/configure.go
index 8b38f27..453e4d3 100644
--- a/src/redis-shake/common/configure.go
+++ b/src/redis-shake/common/configure.go
@@ -57,8 +57,9 @@ func parseAddress(tp, address, redisType string, isSource bool) error {
case conf.RedisTypeSentinel:
arr := strings.Split(address, AddressSplitter)
if len(arr) != 2 {
- return fmt.Errorf("redis type[%v] address[%v] length[%v] != 2",
- conf.RedisTypeStandalone, address, len(arr))
+ return fmt.Errorf("redis type[%v] address[%v] must begin with or has '%v': e.g., \"master@ip1:port1;ip2:port2\", " +
+ "\"@ip1:port1,ip2:port2\"",
+ conf.RedisTypeSentinel, address, AddressSplitter)
}
var masterName string
diff --git a/src/redis-shake/common/filter.go b/src/redis-shake/common/filter.go
index a4a1adc..720c438 100644
--- a/src/redis-shake/common/filter.go
+++ b/src/redis-shake/common/filter.go
@@ -8,7 +8,8 @@ func FilterCommands(cmd string, luaFilter bool) bool {
return true
}
- if luaFilter && (strings.EqualFold(cmd, "eval") || strings.EqualFold(cmd, "script")) {
+ if luaFilter && (strings.EqualFold(cmd, "eval") || strings.EqualFold(cmd, "script") ||
+ strings.EqualFold(cmd, "evalsha")) {
return true
}
diff --git a/src/redis-shake/common/split.go b/src/redis-shake/common/split.go
index 30d76d2..bbf53b2 100644
--- a/src/redis-shake/common/split.go
+++ b/src/redis-shake/common/split.go
@@ -18,7 +18,7 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db
Key: String2Bytes(key),
Type: 0, // uselss
Value: String2Bytes(value),
- ExpireAt: 0, // useless here
+ ExpireAt: 0, // useless here
RealMemberCount: 0,
NeedReadLen: 1,
IdleTime: 0,
@@ -31,4 +31,4 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db
if _, err := client.Do("pexpire", key, pttl); err != nil {
log.Panicf("send key[%v] pexpire failed[%v]", key, err)
}
-}
\ No newline at end of file
+}
diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go
index 880a9d5..3aa12fd 100644
--- a/src/redis-shake/decode.go
+++ b/src/redis-shake/decode.go
@@ -136,11 +136,25 @@ func (cmd *CmdDecode) decoderMain(ipipe <-chan *rdb.BinEntry, opipe chan<- strin
return string(b)
}
for e := range ipipe {
+ var b bytes.Buffer
+ if e.Type == rdb.RdbFlagAUX {
+ o := &struct {
+ Type string `json:"type"`
+ Key string `json:"key"`
+ Value64 string `json:"value64"`
+ }{
+ "aux", string(e.Key), string(e.Value),
+ }
+ fmt.Fprintf(&b, "%s\n", toJson(o))
+ cmd.nentry.Incr()
+ opipe <- b.String()
+ continue
+ }
+
o, err := rdb.DecodeDump(e.Value)
if err != nil {
log.PanicError(err, "decode failed")
}
- var b bytes.Buffer
switch obj := o.(type) {
default:
log.Panicf("unknown object %v", o)
diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go
index dcfb0fb..92d33ec 100644
--- a/src/redis-shake/main/main.go
+++ b/src/redis-shake/main/main.go
@@ -51,7 +51,12 @@ func main() {
version := flag.Bool("version", false, "show version")
flag.Parse()
- if *configuration == "" || *tp == "" || *version {
+ if *version {
+ fmt.Println(utils.Version)
+ return
+ }
+
+ if *configuration == "" || *tp == "" {
if !*version {
fmt.Println("Please show me the '-conf' and '-type'")
}
diff --git a/src/redis-shake/metric/metric.go b/src/redis-shake/metric/metric.go
index a5fbd35..fbb6dc5 100644
--- a/src/redis-shake/metric/metric.go
+++ b/src/redis-shake/metric/metric.go
@@ -3,6 +3,7 @@ package metric
import (
"encoding/json"
"fmt"
+ "math"
"strconv"
"sync"
"sync/atomic"
@@ -42,7 +43,7 @@ func (p *Percent) Get(returnString bool) interface{} {
if returnString {
return "null"
} else {
- return int64(^uint64(0) >> 1) // int64_max
+ return math.MaxFloat64
}
} else {
dividend := atomic.LoadUint64(&p.Dividend)
@@ -225,7 +226,10 @@ func (m *Metric) GetAvgDelay() interface{} {
}
func (m *Metric) GetAvgDelayFloat64() float64 {
- return float64(m.AvgDelay.Get(false).(int64))
+ if avgDelay, ok := m.AvgDelay.Get(false).(float64); ok {
+ return avgDelay
+ }
+ return math.MaxFloat64
}
func (m *Metric) AddNetworkFlow(dbSyncerID int, val uint64) {
diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go
index e22b0a4..46fa3da 100644
--- a/src/redis-shake/rump.go
+++ b/src/redis-shake/rump.go
@@ -344,6 +344,9 @@ func (dre *dbRumperExecutor) writer() {
// handle big key
utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db)
+
+ // all the reply has been handled in RestoreBigkey
+ // dre.resultChan <- ele
continue
}
@@ -369,7 +372,7 @@ func (dre *dbRumperExecutor) writer() {
// dre.resultChan <- ele
count++
- if count == conf.Options.ScanKeyNumber {
+ if count >= conf.Options.ScanKeyNumber {
// batch
log.Debugf("dbRumper[%v] executor[%v] send keys %d", dre.rumperId, dre.executorId, count)
@@ -408,8 +411,11 @@ func (dre *dbRumperExecutor) writeSend(batch []*KeyNode, count *uint32, wBytes *
func (dre *dbRumperExecutor) receiver() {
for ele := range dre.resultChan {
if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil {
- log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId,
- dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err)
+ rdbVersion, checksum, checkErr := utils.CheckVersionChecksum(utils.String2Bytes(ele.value))
+ log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], " +
+ "rdb version[%v], checksum[%v], check error[%v]",
+ dre.rumperId, dre.executorId, ele.key, err, strconv.FormatInt(ele.pttl, 10), len(ele.value),
+ rdbVersion, checksum, checkErr)
}
dre.stat.cCommands.Incr()
}
diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go
index 1da67a1..894c517 100644
--- a/src/redis-shake/sync.go
+++ b/src/redis-shake/sync.go
@@ -629,7 +629,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
if len(conf.Options.FilterKey) != 0 {
cmdNode, ok := command.RedisCommands[scmd]
if ok && len(argv) > 0 {
- log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd)
+ // log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd)
new_argv, pass = command.GetMatchKeys(cmdNode, argv, conf.Options.FilterKey)
} else {
pass = true
diff --git a/src/vendor/vendor.json b/src/vendor/vendor.json
index 6303482..e518a12 100644
--- a/src/vendor/vendor.json
+++ b/src/vendor/vendor.json
@@ -159,10 +159,10 @@
"revisionTime": "2019-03-04T09:57:49Z"
},
{
- "checksumSHA1": "ZT2d9cNq14zxFxnzA2kaqj8tfJY=",
+ "checksumSHA1": "UXns0bW61NZgqtFBLj6jPgAwF+U=",
"path": "github.com/vinllen/redis-go-cluster",
- "revision": "bdcf6ff491eca6a29ba905300253a65d88eb1ad6",
- "revisionTime": "2019-06-24T08:07:27Z"
+ "revision": "6ae0947e93ad83020f121a9b179d95e117fc0a5c",
+ "revisionTime": "2019-07-03T02:46:24Z"
},
{
"checksumSHA1": "TM3Neoy1xRAKyZYMGzKc41sDFW4=",