Merge pull request #217 from alibaba/feature-1.6

Feature 1.6
v4
Vinllen Chen 5 years ago committed by GitHub
commit 121aa0f8ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      ChangeLog
  2. 1
      README.md
  3. 7
      conf/redis-shake.conf
  4. 6
      src/redis-shake/common/split.go
  5. 72
      src/redis-shake/common/utils.go
  6. 9
      src/redis-shake/main/main.go
  7. 8
      src/redis-shake/sync.go
  8. 6
      src/vendor/vendor.json

@ -1,3 +1,16 @@
2019-12-20 Alibaba Cloud.
* VERSION: 1.6.24
* BUGFIX: cluster receive channel size adjust from 4096 to `sender.count`.
* BUGFIX: update redis-go-cluster to solve the send and receive
concurrency conflict.
* BUGFIX: fix some bugs in redis-go-cluster including io timeout problem,
see #192, #210.
* IMPROVE: set 'psync' to true by default in configuration, if the source
redis version is less than v2.8, switch to false.
* IMPROVE: when target version is less than the source, do restore
directly. Then catch the "Bad data format" and retry by split value. see
#211.
* IMPROVE: catch more errors in `restoreBigRdbEntry` function.
2019-11-28 Alibaba Cloud.
* VERSION: 1.6.23
* BUGFIX: update redis-go-cluster driver to solve MOVED error in lua

@ -104,3 +104,4 @@ Plus, we have a DingDing(钉钉) group so that users can join and discuss, pleas
| muicoder | muicoder@gmail.com |
| zhklcf | huikangzhu@126.com |
| shuff1e | sfxu@foxmail.com |
| xuhualin | xuhualing8439523@163.com |

@ -148,12 +148,13 @@ filter.lua = false
# the reason.
# 正常key如果不大,那么都是直接调用restore写入到目的端,如果key对应的value字节超过了给定
# 的值,那么会分批依次一个一个写入。如果目的端是Codis,这个需要置为1,具体原因请查看FAQ。
# 如果目的端大版本小于源端,也建议设置为1。
big_key_threshold = 524288000
# use psync command.
# used in `sync`.
# 默认使用sync命令,启用将会使用psync命令
psync = false
# 默认使用psync命令进行同步,置为false将会用sync命令进行同步,代码层面会自动识别2.8以前的版本改为sync。
psync = true
# enable metric
# used in `sync`.
@ -172,7 +173,7 @@ sender.size = 104857600
# used in `sync`. flush sender buffer when bigger than this threshold.
# 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送,对于目的端是cluster的情况,这个值
# 的调大将会占用部分内存。
sender.count = 4096
sender.count = 4095
# delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also
# stored in this delay queue. this timestamp will be used to calculate the time delay when receiving
# ack from target redis.

@ -29,12 +29,14 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db
Freq: 0,
}
restoreBigRdbEntry(client, &entry)
if err := restoreBigRdbEntry(client, &entry); err != nil {
log.Panicf("restore big rdb key[%s] failed[%v]", key, err)
}
if pttl > 0 {
// pttl
if _, err := client.Do("pexpire", key, pttl); err != nil {
log.Panicf("send key[%v] pexpire failed[%v]", key, err)
log.Panicf("send key[%s] pexpire failed[%v]", key, err)
}
}
}

@ -35,24 +35,24 @@ func OpenRedisConn(target []string, auth_type, passwd string, isCluster bool, tl
}
func OpenRedisConnWithTimeout(target []string, auth_type, passwd string, readTimeout, writeTimeout time.Duration,
isCluster bool, tlsEnable bool) redigo.Conn {
// return redigo.NewConn(OpenNetConn(target, auth_type, passwd), readTimeout, writeTimeout)
isCluster bool, tlsEnable bool) redigo.Conn {
if isCluster {
// the alive time isn't the tcp keep_alive parameter
cluster, err := redigoCluster.NewCluster(
&redigoCluster.Options{
StartNodes: target,
ConnTimeout: 30 * time.Second,
ConnTimeout: 5 * time.Second,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
KeepAlive: 16,
AliveTime: time.Duration(conf.Options.KeepAlive) * time.Second,
KeepAlive: 32, // number of available connections
AliveTime: 10 * time.Second, // hard code to set alive time in single connection, not the tcp keep alive
Password: passwd,
})
if err != nil {
log.Panicf("create cluster connection error[%v]", err)
return nil
}
return NewClusterConn(cluster, 4096)
return NewClusterConn(cluster, RecvChanSize)
} else {
// tls only support single connection currently
return redigo.NewConn(OpenNetConn(target[0], auth_type, passwd, tlsEnable), readTimeout, writeTimeout)
@ -389,14 +389,16 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) {
}
}
func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) error {
//read type
var err error
r := rdb.NewRdbReader(bytes.NewReader(e.Value))
t, err := r.ReadByte()
if err != nil {
log.PanicError(err, "read rdb ")
}
log.Info("restore big key ", string(e.Key), " Value Length ", len(e.Value), " type ", t)
log.Debug("restore big key ", string(e.Key), " Value Length ", len(e.Value), " type ", t)
count := 0
switch t {
case rdb.RdbTypeHashZiplist:
@ -422,7 +424,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
log.PanicError(err, "read rdb ")
}
count++
c.Send("HSET", e.Key, field, value)
err = c.Send("HSET", e.Key, field, value)
if (count == 100) || (i == (length - 1)) {
flushAndCheckReply(c, count)
count = 0
@ -455,7 +457,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
log.PanicError(err, "read rdb ")
}
count++
c.Send("ZADD", e.Key, scoreBytes, member)
err = c.Send("ZADD", e.Key, scoreBytes, member)
if (count == 100) || (i == (cardinality - 1)) {
flushAndCheckReply(c, count)
count = 0
@ -500,7 +502,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
intString = strconv.FormatInt(int64(int64(binary.LittleEndian.Uint64(intBytes))), 10)
}
count++
c.Send("SADD", e.Key, []byte(intString))
err = c.Send("SADD", e.Key, []byte(intString))
if (count == 100) || (i == (cardinality - 1)) {
flushAndCheckReply(c, count)
count = 0
@ -525,7 +527,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
}
//rpush(c, e.Key, entry)
count++
c.Send("RPUSH", e.Key, entry)
err = c.Send("RPUSH", e.Key, entry)
if (count == 100) || (i == (length - 1)) {
flushAndCheckReply(c, count)
count = 0
@ -562,7 +564,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
log.PanicError(err, "read rdb ")
}
count++
c.Send("HSET", e.Key, field, value)
err = c.Send("HSET", e.Key, field, value)
if (count == 100) || (i == (int(length) - 1)) {
flushAndCheckReply(c, count)
count = 0
@ -587,7 +589,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
}
//rpush(c, e.Key, field)
count++
c.Send("RPUSH", e.Key, field)
err = c.Send("RPUSH", e.Key, field)
if (count == 100) || (i == (int(n) - 1)) {
flushAndCheckReply(c, count)
count = 0
@ -605,7 +607,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
log.PanicError(err, "read rdb ")
}
count++
c.Send("SADD", e.Key, member)
err = c.Send("SADD", e.Key, member)
if (count == 100) || (i == (int(n) - 1)) {
flushAndCheckReply(c, count)
count = 0
@ -633,8 +635,9 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
log.PanicError(err, "read rdb ")
}
count++
log.Info("restore big zset key ", string(e.Key), " score ", (Float64ToByte(score)), " member ", string(member))
c.Send("ZADD", e.Key, Float64ToByte(score), member)
log.Info("restore big zset key ", string(e.Key), " score ", Float64ToByte(score),
" member ", string(member))
err = c.Send("ZADD", e.Key, Float64ToByte(score), member)
if (count == 100) || (i == (int(n) - 1)) {
flushAndCheckReply(c, count)
count = 0
@ -671,7 +674,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
}
//hset(c, e.Key, field, value)
count++
c.Send("HSET", e.Key, field, value)
err = c.Send("HSET", e.Key, field, value)
if (count == 100) || (i == (int(n) - 1)) {
flushAndCheckReply(c, count)
count = 0
@ -701,7 +704,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
}
log.Info("rpush key: ", e.Key, " value: ", entry)
count++
c.Send("RPUSH", e.Key, entry)
err = c.Send("RPUSH", e.Key, entry)
if count == 100 {
flushAndCheckReply(c, count)
count = 0
@ -715,6 +718,8 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
default:
log.PanicError(fmt.Errorf("can't deal rdb type:%d", t), "restore big key fail")
}
return err
}
func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
@ -793,7 +798,11 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
log.Panicf("del ", string(e.Key), err)
}
}
restoreBigRdbEntry(c, e)
if err := restoreBigRdbEntry(c, e); err != nil {
log.Panic(err)
}
if e.ExpireAt != 0 {
r, err := redigo.Int64(c.Do("pexpire", e.Key, ttlms))
if err != nil && r != 1 {
@ -816,6 +825,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
log.Debugf("restore key[%s] with params[%v]", e.Key, params)
// fmt.Printf("key: %v, value: %v params: %v\n", string(e.Key), e.Value, params)
// s, err := redigo.String(c.Do("restore", params...))
RESTORE:
s, err := redigoCluster.String(c.Do("restore", params...))
if err != nil {
/*The reply value of busykey in 2.8 kernel is "target key name is busy",
@ -826,21 +836,27 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
if !conf.Options.Metric {
log.Infof("warning, rewrite key: %v", string(e.Key))
}
var s2 string
var rerr error
if conf.Options.TargetReplace {
params = append(params, "REPLACE")
s2, rerr = redigo.String(c.Do("restore", params...))
} else {
_, _ = redigo.String(c.Do("del", e.Key))
s2, rerr = redigo.String(c.Do("restore", params...))
}
if rerr != nil {
log.Info(s2, rerr, "key ", string(e.Key))
_, err = redigo.String(c.Do("del", e.Key))
if err != nil {
log.Panicf("delete key[%v] failed[%v]", string(e.Key), err)
}
}
// retry
goto RESTORE
} else {
log.Panicf("target key name is busy:", string(e.Key))
}
} else if strings.Contains(err.Error(), "Bad data format") {
// from big version to small version may has this error. we need to split the data struct
log.Warnf("return error[%v], ignore it and try to split the value", err)
if err := restoreBigRdbEntry(c, e); err != nil {
log.Panic(err)
}
} else {
log.PanicError(err, "restore command error key:", string(e.Key), " err:", err.Error())
}

@ -463,11 +463,18 @@ func sanitizeOptions(tp string) error {
}
// compare version. see github issue #173.
if ret := utils.CompareVersion(conf.Options.SourceVersion, conf.Options.TargetVersion, 2); ret != 0 && ret != 1 {
// v1.6.24. update in #211 again. if the source version is bigger than the target version, do restore directly, if failed,
// then try to split it
/*if ret := utils.CompareVersion(conf.Options.SourceVersion, conf.Options.TargetVersion, 2); ret != 0 && ret != 1 {
// target version is smaller than source version, or unknown
log.Warnf("target version[%v] < source version[%v], set big_key_threshold = 1. see #173",
conf.Options.TargetVersion, conf.Options.SourceVersion)
conf.Options.BigKeyThreshold = 1
}*/
// use 'sync' instead of 'psync' if the source version < 2.8
if ret := utils.CompareVersion(conf.Options.SourceVersion, "2.8", 2); ret == 1 && conf.Options.Psync {
conf.Options.Psync = false
}
}

@ -361,24 +361,24 @@ func (ds *dbSyncer) sendPSyncCmd(master, auth_type, passwd string, tlsEnable boo
}
func (ds *dbSyncer) pSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer, offset int64, copyto io.Writer) (int64, error) {
// TODO, two times call c.Close() ? maybe a bug
defer c.Close()
var nread atomic2.Int64
go func() {
defer c.Close()
for {
time.Sleep(time.Second * 1)
for range time.NewTicker(1 * time.Second).C {
select {
case <-ds.waitFull:
if err := utils.SendPSyncAck(bw, offset+nread.Get()); err != nil {
log.Errorf("dbSyncer[%v] send offset to source redis failed[%v]", ds.id, err)
return
}
default:
if err := utils.SendPSyncAck(bw, 0); err != nil {
log.Errorf("dbSyncer[%v] send offset to source redis failed[%v]", ds.id, err)
return
}
}
}
log.Errorf("dbSyncer[%v] heartbeat thread closed!", ds.id)
}()
var p = make([]byte, 8192)

@ -159,10 +159,10 @@
"revisionTime": "2019-03-04T09:57:49Z"
},
{
"checksumSHA1": "Y95c1YwyU+rB+mFMSB4zhxCpt+Q=",
"checksumSHA1": "5/V8dbgozt/GmSX5cRMlig6dFSA=",
"path": "github.com/vinllen/redis-go-cluster",
"revision": "f33d5a7283f2dc28233aa0069d7ec1798323cdad",
"revisionTime": "2019-11-28T04:11:03Z"
"revision": "ceb83e88e9fa8fb8ddff00fffd1f2e384593e903",
"revisionTime": "2019-12-16T03:17:21Z"
},
{
"checksumSHA1": "U4rR1I0MXcvJz3zSxTp3hb3Y0I0=",

Loading…
Cancel
Save