Merge pull request #113 from alibaba/feature-1.6

Feature 1.6
v4
Vinllen Chen 5 years ago committed by GitHub
commit ac32ec8c33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      ChangeLog
  2. 7
      README.md
  3. 10
      conf/redis-shake.conf
  4. 30
      src/redis-shake/common/common.go
  5. 5
      src/redis-shake/common/configure.go
  6. 3
      src/redis-shake/common/filter.go
  7. 4
      src/redis-shake/common/split.go
  8. 16
      src/redis-shake/decode.go
  9. 7
      src/redis-shake/main/main.go
  10. 12
      src/redis-shake/rump.go
  11. 2
      src/redis-shake/sync.go
  12. 6
      src/vendor/vendor.json

@ -1,3 +1,9 @@
2019-06-25 Alibaba Cloud.
* VERSION: 1.6.10
* IMPROVE: support print Lua in `decode` mode.
* BUGFIX: merge metric panic PR#111
* IMPROVE: check checksum and version once receiving error from the target in
`rump` mode.
2019-06-21 Alibaba Cloud. 2019-06-21 Alibaba Cloud.
* VERSION: 1.6.9 * VERSION: 1.6.9
* IMPROVE: support Lua and transaction when target is open source cluster * IMPROVE: support Lua and transaction when target is open source cluster

@ -1,4 +1,4 @@
RedisShake is mainly used to synchronize data from one redis database to another.<br> RedisShake is mainly used to synchronize data from one redis to another.<br>
Thanks to the Douyu's WSD team for the support. <br> Thanks to the Douyu's WSD team for the support. <br>
* [中文文档](https://yq.aliyun.com/articles/691794) * [中文文档](https://yq.aliyun.com/articles/691794)
@ -23,6 +23,11 @@ The type can be one of the followings:<br>
Please check out the `conf/redis-shake.conf` to see the detailed parameters description.<br> Please check out the `conf/redis-shake.conf` to see the detailed parameters description.<br>
# Support
---
Redis version from 2.x to 5.0.
Supports `Standalone`, `Cluster`, `Codis`, `Aliyun Cluster Proxy`, `Tencent Cloud Proxy` and so on.
# Configuration # Configuration
Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters. Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters.

@ -35,12 +35,12 @@ source.type = standalone
# ip:port # ip:port
# the source address can be the following: # the source address can be the following:
# 1. single db address. for "standalone" type. # 1. single db address. for "standalone" type.
# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. # 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type.
# 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. e.g., 10.1.1.1:20331;10.1.1.2:20441. # 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. e.g., 10.1.1.1:20331;10.1.1.2:20441.
# 4. proxy address(used in "rump" mode only). for "proxy" type. # 4. proxy address(used in "rump" mode only). for "proxy" type.
# 源redis地址。对于sentinel模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址" # 源redis地址。对于sentinel模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址"
source.address = 127.0.0.1:20441 source.address = 127.0.0.1:20441
# password. # password of db/proxy. even if type is sentinel.
source.password_raw = 123456 source.password_raw = 123456
# auth type, don't modify it # auth type, don't modify it
source.auth_type = auth source.auth_type = auth
@ -59,11 +59,11 @@ target.type = standalone
# ip:port # ip:port
# the target address can be the following: # the target address can be the following:
# 1. single db address. for "standalone" type. # 1. single db address. for "standalone" type.
# 2. sentinel_master_name@sentinel single/cluster address, e.g., mymaster@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. # 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type.
# 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. # 3. cluster that has several db nodes split by semicolon(;). for "cluster" type.
# 4. proxy address(used in "rump" mode only). for "proxy" type. # 4. proxy address(used in "rump" mode only). for "proxy" type.
target.address = 127.0.0.1:20551 target.address = 127.0.0.1:20551
# password. # password of db/proxy. even if type is sentinel.
target.password_raw = target.password_raw =
# auth type, don't modify it # auth type, don't modify it
target.auth_type = auth target.auth_type = auth
@ -112,7 +112,7 @@ filter.db =
# e.g., a;b;c # e.g., a;b;c
# default is all. # default is all.
# used in `restore`, `sync` and `rump`. # used in `restore`, `sync` and `rump`.
# 支持过滤key,只让指定的key通过,分号分隔 # 支持按前缀过滤key,只让指定前缀的key通过,分号分隔。比如指定abc,将会过滤abc, abc1, abcxxx
filter.key = filter.key =
# filter given slot, multiple slots are separated by ';'. # filter given slot, multiple slots are separated by ';'.
# e.g., 1;2;3 # e.g., 1;2;3

@ -7,11 +7,13 @@ import (
"strings" "strings"
"reflect" "reflect"
"unsafe" "unsafe"
"encoding/binary"
"pkg/libs/bytesize" "pkg/libs/bytesize"
"redis-shake/configure" "redis-shake/configure"
logRotate "gopkg.in/natefinch/lumberjack.v2" logRotate "gopkg.in/natefinch/lumberjack.v2"
"github.com/cupcake/rdb/crc64"
) )
const ( const (
@ -36,6 +38,7 @@ var (
LogRotater *logRotate.Logger LogRotater *logRotate.Logger
StartTime string StartTime string
TargetRoundRobin int TargetRoundRobin int
RDBVersion uint = 9 // 9 for 5.0
) )
const ( const (
@ -111,4 +114,31 @@ func String2Bytes(s string) []byte {
func Bytes2String(b []byte) string { func Bytes2String(b []byte) string {
return *(*string)(unsafe.Pointer(&b)) return *(*string)(unsafe.Pointer(&b))
}
func CheckVersionChecksum(d []byte) (uint, uint64, error) {
/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
* ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
* ----------------+---------------------+---------------+
* RDB version and CRC are both in little endian.
*/
length := len(d)
if length < 10 {
return 0, 0, fmt.Errorf("rdb: invalid dump length")
}
footer := length - 10
rdbVersion := uint((d[footer + 1] << 8) | d[footer])
if rdbVersion > RDBVersion {
return 0, 0, fmt.Errorf("current version[%v] > RDBVersion[%v]", rdbVersion, RDBVersion)
}
checksum := binary.LittleEndian.Uint64(d[length - 8:])
digest := crc64.Digest(d[: length - 8])
if checksum != digest {
return 0, 0, fmt.Errorf("rdb: invalid CRC checksum[%v] != digest[%v]", checksum, digest)
}
return rdbVersion, checksum, nil
} }

@ -57,8 +57,9 @@ func parseAddress(tp, address, redisType string, isSource bool) error {
case conf.RedisTypeSentinel: case conf.RedisTypeSentinel:
arr := strings.Split(address, AddressSplitter) arr := strings.Split(address, AddressSplitter)
if len(arr) != 2 { if len(arr) != 2 {
return fmt.Errorf("redis type[%v] address[%v] length[%v] != 2", return fmt.Errorf("redis type[%v] address[%v] must begin with or has '%v': e.g., \"master@ip1:port1;ip2:port2\", " +
conf.RedisTypeStandalone, address, len(arr)) "\"@ip1:port1,ip2:port2\"",
conf.RedisTypeSentinel, address, AddressSplitter)
} }
var masterName string var masterName string

@ -8,7 +8,8 @@ func FilterCommands(cmd string, luaFilter bool) bool {
return true return true
} }
if luaFilter && (strings.EqualFold(cmd, "eval") || strings.EqualFold(cmd, "script")) { if luaFilter && (strings.EqualFold(cmd, "eval") || strings.EqualFold(cmd, "script") ||
strings.EqualFold(cmd, "evalsha")) {
return true return true
} }

@ -18,7 +18,7 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db
Key: String2Bytes(key), Key: String2Bytes(key),
Type: 0, // uselss Type: 0, // uselss
Value: String2Bytes(value), Value: String2Bytes(value),
ExpireAt: 0, // useless here ExpireAt: 0, // useless here
RealMemberCount: 0, RealMemberCount: 0,
NeedReadLen: 1, NeedReadLen: 1,
IdleTime: 0, IdleTime: 0,
@ -31,4 +31,4 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db
if _, err := client.Do("pexpire", key, pttl); err != nil { if _, err := client.Do("pexpire", key, pttl); err != nil {
log.Panicf("send key[%v] pexpire failed[%v]", key, err) log.Panicf("send key[%v] pexpire failed[%v]", key, err)
} }
} }

@ -136,11 +136,25 @@ func (cmd *CmdDecode) decoderMain(ipipe <-chan *rdb.BinEntry, opipe chan<- strin
return string(b) return string(b)
} }
for e := range ipipe { for e := range ipipe {
var b bytes.Buffer
if e.Type == rdb.RdbFlagAUX {
o := &struct {
Type string `json:"type"`
Key string `json:"key"`
Value64 string `json:"value64"`
}{
"aux", string(e.Key), string(e.Value),
}
fmt.Fprintf(&b, "%s\n", toJson(o))
cmd.nentry.Incr()
opipe <- b.String()
continue
}
o, err := rdb.DecodeDump(e.Value) o, err := rdb.DecodeDump(e.Value)
if err != nil { if err != nil {
log.PanicError(err, "decode failed") log.PanicError(err, "decode failed")
} }
var b bytes.Buffer
switch obj := o.(type) { switch obj := o.(type) {
default: default:
log.Panicf("unknown object %v", o) log.Panicf("unknown object %v", o)

@ -51,7 +51,12 @@ func main() {
version := flag.Bool("version", false, "show version") version := flag.Bool("version", false, "show version")
flag.Parse() flag.Parse()
if *configuration == "" || *tp == "" || *version { if *version {
fmt.Println(utils.Version)
return
}
if *configuration == "" || *tp == "" {
if !*version { if !*version {
fmt.Println("Please show me the '-conf' and '-type'") fmt.Println("Please show me the '-conf' and '-type'")
} }

@ -344,6 +344,9 @@ func (dre *dbRumperExecutor) writer() {
// handle big key // handle big key
utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db) utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db)
// all the reply has been handled in RestoreBigkey
// dre.resultChan <- ele
continue continue
} }
@ -369,7 +372,7 @@ func (dre *dbRumperExecutor) writer() {
// dre.resultChan <- ele // dre.resultChan <- ele
count++ count++
if count == conf.Options.ScanKeyNumber { if count >= conf.Options.ScanKeyNumber {
// batch // batch
log.Debugf("dbRumper[%v] executor[%v] send keys %d", dre.rumperId, dre.executorId, count) log.Debugf("dbRumper[%v] executor[%v] send keys %d", dre.rumperId, dre.executorId, count)
@ -408,8 +411,11 @@ func (dre *dbRumperExecutor) writeSend(batch []*KeyNode, count *uint32, wBytes *
func (dre *dbRumperExecutor) receiver() { func (dre *dbRumperExecutor) receiver() {
for ele := range dre.resultChan { for ele := range dre.resultChan {
if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil { if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil {
log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId, rdbVersion, checksum, checkErr := utils.CheckVersionChecksum(utils.String2Bytes(ele.value))
dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err) log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], " +
"rdb version[%v], checksum[%v], check error[%v]",
dre.rumperId, dre.executorId, ele.key, err, strconv.FormatInt(ele.pttl, 10), len(ele.value),
rdbVersion, checksum, checkErr)
} }
dre.stat.cCommands.Incr() dre.stat.cCommands.Incr()
} }

@ -629,7 +629,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
if len(conf.Options.FilterKey) != 0 { if len(conf.Options.FilterKey) != 0 {
cmdNode, ok := command.RedisCommands[scmd] cmdNode, ok := command.RedisCommands[scmd]
if ok && len(argv) > 0 { if ok && len(argv) > 0 {
log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd) // log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd)
new_argv, pass = command.GetMatchKeys(cmdNode, argv, conf.Options.FilterKey) new_argv, pass = command.GetMatchKeys(cmdNode, argv, conf.Options.FilterKey)
} else { } else {
pass = true pass = true

@ -159,10 +159,10 @@
"revisionTime": "2019-03-04T09:57:49Z" "revisionTime": "2019-03-04T09:57:49Z"
}, },
{ {
"checksumSHA1": "ZT2d9cNq14zxFxnzA2kaqj8tfJY=", "checksumSHA1": "UXns0bW61NZgqtFBLj6jPgAwF+U=",
"path": "github.com/vinllen/redis-go-cluster", "path": "github.com/vinllen/redis-go-cluster",
"revision": "bdcf6ff491eca6a29ba905300253a65d88eb1ad6", "revision": "6ae0947e93ad83020f121a9b179d95e117fc0a5c",
"revisionTime": "2019-06-24T08:07:27Z" "revisionTime": "2019-07-03T02:46:24Z"
}, },
{ {
"checksumSHA1": "TM3Neoy1xRAKyZYMGzKc41sDFW4=", "checksumSHA1": "TM3Neoy1xRAKyZYMGzKc41sDFW4=",

Loading…
Cancel
Save