diff --git a/ChangeLog b/ChangeLog index 1a4130b..c8aba52 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,16 @@ +2019-12-20 Alibaba Cloud. + * VERSION: 1.6.24 + * BUGFIX: cluster receive channel size adjust from 4096 to `sender.count`. + * BUGFIX: update redis-go-cluster to solve the send and receive + concurrency conflict. + * BUGFIX: fix some bugs in redis-go-cluster including io timeout problem, + see #192, #210. + * IMPROVE: set 'psync' to true by default in configuration, if the source + redis version is less than v2.8, switch to false. + * IMPROVE: when target version is less than the source, do restore + directly. Then catch the "Bad data format" and retry by split value. see + #211. + * IMPROVE: catch more errors in `restoreBigRdbEntry` function. 2019-11-28 Alibaba Cloud. * VERSION: 1.6.23 * BUGFIX: update redis-go-cluster driver to solve MOVED error in lua diff --git a/README.md b/README.md index fb73a8f..2dbf00e 100644 --- a/README.md +++ b/README.md @@ -104,3 +104,4 @@ Plus, we have a DingDing(钉钉) group so that users can join and discuss, pleas | muicoder | muicoder@gmail.com | | zhklcf | huikangzhu@126.com | | shuff1e | sfxu@foxmail.com | +| xuhualin | xuhualing8439523@163.com | diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 9a7f76e..9df4e17 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -148,12 +148,13 @@ filter.lua = false # the reason. # 正常key如果不大,那么都是直接调用restore写入到目的端,如果key对应的value字节超过了给定 # 的值,那么会分批依次一个一个写入。如果目的端是Codis,这个需要置为1,具体原因请查看FAQ。 +# 如果目的端大版本小于源端,也建议设置为1。 big_key_threshold = 524288000 # use psync command. # used in `sync`. -# 默认使用sync命令,启用将会使用psync命令 -psync = false +# 默认使用psync命令进行同步,置为false将会用sync命令进行同步,代码层面会自动识别2.8以前的版本改为sync。 +psync = true # enable metric # used in `sync`. @@ -172,7 +173,7 @@ sender.size = 104857600 # used in `sync`. flush sender buffer when bigger than this threshold. # 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送,对于目的端是cluster的情况,这个值 # 的调大将会占用部分内存。 -sender.count = 4096 +sender.count = 4095 # delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also # stored in this delay queue. this timestamp will be used to calculate the time delay when receiving # ack from target redis. diff --git a/src/redis-shake/common/split.go b/src/redis-shake/common/split.go index 27011e9..83b0bfb 100644 --- a/src/redis-shake/common/split.go +++ b/src/redis-shake/common/split.go @@ -29,12 +29,14 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db Freq: 0, } - restoreBigRdbEntry(client, &entry) + if err := restoreBigRdbEntry(client, &entry); err != nil { + log.Panicf("restore big rdb key[%s] failed[%v]", key, err) + } if pttl > 0 { // pttl if _, err := client.Do("pexpire", key, pttl); err != nil { - log.Panicf("send key[%v] pexpire failed[%v]", key, err) + log.Panicf("send key[%s] pexpire failed[%v]", key, err) } } } diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 09ae7ef..0b53b81 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -35,24 +35,24 @@ func OpenRedisConn(target []string, auth_type, passwd string, isCluster bool, tl } func OpenRedisConnWithTimeout(target []string, auth_type, passwd string, readTimeout, writeTimeout time.Duration, - isCluster bool, tlsEnable bool) redigo.Conn { - // return redigo.NewConn(OpenNetConn(target, auth_type, passwd), readTimeout, writeTimeout) + isCluster bool, tlsEnable bool) redigo.Conn { if isCluster { + // the alive time isn't the tcp keep_alive parameter cluster, err := redigoCluster.NewCluster( &redigoCluster.Options{ StartNodes: target, - ConnTimeout: 30 * time.Second, + ConnTimeout: 5 * time.Second, ReadTimeout: readTimeout, WriteTimeout: writeTimeout, - KeepAlive: 16, - AliveTime: time.Duration(conf.Options.KeepAlive) * time.Second, + KeepAlive: 32, // number of available connections + AliveTime: 10 * time.Second, // hard code to set alive time in single connection, not the tcp keep alive Password: passwd, }) if err != nil { log.Panicf("create cluster connection error[%v]", err) return nil } - return NewClusterConn(cluster, 4096) + return NewClusterConn(cluster, RecvChanSize) } else { // tls only support single connection currently return redigo.NewConn(OpenNetConn(target[0], auth_type, passwd, tlsEnable), readTimeout, writeTimeout) @@ -389,14 +389,16 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) { } } -func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { +func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) error { //read type + var err error r := rdb.NewRdbReader(bytes.NewReader(e.Value)) t, err := r.ReadByte() if err != nil { log.PanicError(err, "read rdb ") } - log.Info("restore big key ", string(e.Key), " Value Length ", len(e.Value), " type ", t) + + log.Debug("restore big key ", string(e.Key), " Value Length ", len(e.Value), " type ", t) count := 0 switch t { case rdb.RdbTypeHashZiplist: @@ -422,7 +424,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { log.PanicError(err, "read rdb ") } count++ - c.Send("HSET", e.Key, field, value) + err = c.Send("HSET", e.Key, field, value) if (count == 100) || (i == (length - 1)) { flushAndCheckReply(c, count) count = 0 @@ -455,7 +457,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { log.PanicError(err, "read rdb ") } count++ - c.Send("ZADD", e.Key, scoreBytes, member) + err = c.Send("ZADD", e.Key, scoreBytes, member) if (count == 100) || (i == (cardinality - 1)) { flushAndCheckReply(c, count) count = 0 @@ -500,7 +502,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { intString = strconv.FormatInt(int64(int64(binary.LittleEndian.Uint64(intBytes))), 10) } count++ - c.Send("SADD", e.Key, []byte(intString)) + err = c.Send("SADD", e.Key, []byte(intString)) if (count == 100) || (i == (cardinality - 1)) { flushAndCheckReply(c, count) count = 0 @@ -525,7 +527,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { } //rpush(c, e.Key, entry) count++ - c.Send("RPUSH", e.Key, entry) + err = c.Send("RPUSH", e.Key, entry) if (count == 100) || (i == (length - 1)) { flushAndCheckReply(c, count) count = 0 @@ -562,7 +564,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { log.PanicError(err, "read rdb ") } count++ - c.Send("HSET", e.Key, field, value) + err = c.Send("HSET", e.Key, field, value) if (count == 100) || (i == (int(length) - 1)) { flushAndCheckReply(c, count) count = 0 @@ -587,7 +589,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { } //rpush(c, e.Key, field) count++ - c.Send("RPUSH", e.Key, field) + err = c.Send("RPUSH", e.Key, field) if (count == 100) || (i == (int(n) - 1)) { flushAndCheckReply(c, count) count = 0 @@ -605,7 +607,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { log.PanicError(err, "read rdb ") } count++ - c.Send("SADD", e.Key, member) + err = c.Send("SADD", e.Key, member) if (count == 100) || (i == (int(n) - 1)) { flushAndCheckReply(c, count) count = 0 @@ -633,8 +635,9 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { log.PanicError(err, "read rdb ") } count++ - log.Info("restore big zset key ", string(e.Key), " score ", (Float64ToByte(score)), " member ", string(member)) - c.Send("ZADD", e.Key, Float64ToByte(score), member) + log.Info("restore big zset key ", string(e.Key), " score ", Float64ToByte(score), + " member ", string(member)) + err = c.Send("ZADD", e.Key, Float64ToByte(score), member) if (count == 100) || (i == (int(n) - 1)) { flushAndCheckReply(c, count) count = 0 @@ -671,7 +674,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { } //hset(c, e.Key, field, value) count++ - c.Send("HSET", e.Key, field, value) + err = c.Send("HSET", e.Key, field, value) if (count == 100) || (i == (int(n) - 1)) { flushAndCheckReply(c, count) count = 0 @@ -701,7 +704,7 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { } log.Info("rpush key: ", e.Key, " value: ", entry) count++ - c.Send("RPUSH", e.Key, entry) + err = c.Send("RPUSH", e.Key, entry) if count == 100 { flushAndCheckReply(c, count) count = 0 @@ -715,6 +718,8 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { default: log.PanicError(fmt.Errorf("can't deal rdb type:%d", t), "restore big key fail") } + + return err } func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { @@ -793,7 +798,11 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { log.Panicf("del ", string(e.Key), err) } } - restoreBigRdbEntry(c, e) + + if err := restoreBigRdbEntry(c, e); err != nil { + log.Panic(err) + } + if e.ExpireAt != 0 { r, err := redigo.Int64(c.Do("pexpire", e.Key, ttlms)) if err != nil && r != 1 { @@ -816,6 +825,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { log.Debugf("restore key[%s] with params[%v]", e.Key, params) // fmt.Printf("key: %v, value: %v params: %v\n", string(e.Key), e.Value, params) // s, err := redigo.String(c.Do("restore", params...)) +RESTORE: s, err := redigoCluster.String(c.Do("restore", params...)) if err != nil { /*The reply value of busykey in 2.8 kernel is "target key name is busy", @@ -826,21 +836,27 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { if !conf.Options.Metric { log.Infof("warning, rewrite key: %v", string(e.Key)) } - var s2 string - var rerr error + if conf.Options.TargetReplace { params = append(params, "REPLACE") - s2, rerr = redigo.String(c.Do("restore", params...)) } else { - _, _ = redigo.String(c.Do("del", e.Key)) - s2, rerr = redigo.String(c.Do("restore", params...)) - } - if rerr != nil { - log.Info(s2, rerr, "key ", string(e.Key)) + _, err = redigo.String(c.Do("del", e.Key)) + if err != nil { + log.Panicf("delete key[%v] failed[%v]", string(e.Key), err) + } } + + // retry + goto RESTORE } else { log.Panicf("target key name is busy:", string(e.Key)) } + } else if strings.Contains(err.Error(), "Bad data format") { + // from big version to small version may has this error. we need to split the data struct + log.Warnf("return error[%v], ignore it and try to split the value", err) + if err := restoreBigRdbEntry(c, e); err != nil { + log.Panic(err) + } } else { log.PanicError(err, "restore command error key:", string(e.Key), " err:", err.Error()) } diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index cd5f6c6..f7aa471 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -463,11 +463,18 @@ func sanitizeOptions(tp string) error { } // compare version. see github issue #173. - if ret := utils.CompareVersion(conf.Options.SourceVersion, conf.Options.TargetVersion, 2); ret != 0 && ret != 1 { + // v1.6.24. update in #211 again. if the source version is bigger than the target version, do restore directly, if failed, + // then try to split it + /*if ret := utils.CompareVersion(conf.Options.SourceVersion, conf.Options.TargetVersion, 2); ret != 0 && ret != 1 { // target version is smaller than source version, or unknown log.Warnf("target version[%v] < source version[%v], set big_key_threshold = 1. see #173", conf.Options.TargetVersion, conf.Options.SourceVersion) conf.Options.BigKeyThreshold = 1 + }*/ + + // use 'sync' instead of 'psync' if the source version < 2.8 + if ret := utils.CompareVersion(conf.Options.SourceVersion, "2.8", 2); ret == 1 && conf.Options.Psync { + conf.Options.Psync = false } } diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 83bbc44..2bb85e6 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -361,24 +361,24 @@ func (ds *dbSyncer) sendPSyncCmd(master, auth_type, passwd string, tlsEnable boo } func (ds *dbSyncer) pSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer, offset int64, copyto io.Writer) (int64, error) { - // TODO, two times call c.Close() ? maybe a bug - defer c.Close() var nread atomic2.Int64 go func() { defer c.Close() - for { - time.Sleep(time.Second * 1) + for range time.NewTicker(1 * time.Second).C { select { case <-ds.waitFull: if err := utils.SendPSyncAck(bw, offset+nread.Get()); err != nil { + log.Errorf("dbSyncer[%v] send offset to source redis failed[%v]", ds.id, err) return } default: if err := utils.SendPSyncAck(bw, 0); err != nil { + log.Errorf("dbSyncer[%v] send offset to source redis failed[%v]", ds.id, err) return } } } + log.Errorf("dbSyncer[%v] heartbeat thread closed!", ds.id) }() var p = make([]byte, 8192) diff --git a/src/vendor/vendor.json b/src/vendor/vendor.json index d3a0e47..4657f0a 100644 --- a/src/vendor/vendor.json +++ b/src/vendor/vendor.json @@ -159,10 +159,10 @@ "revisionTime": "2019-03-04T09:57:49Z" }, { - "checksumSHA1": "Y95c1YwyU+rB+mFMSB4zhxCpt+Q=", + "checksumSHA1": "5/V8dbgozt/GmSX5cRMlig6dFSA=", "path": "github.com/vinllen/redis-go-cluster", - "revision": "f33d5a7283f2dc28233aa0069d7ec1798323cdad", - "revisionTime": "2019-11-28T04:11:03Z" + "revision": "ceb83e88e9fa8fb8ddff00fffd1f2e384593e903", + "revisionTime": "2019-12-16T03:17:21Z" }, { "checksumSHA1": "U4rR1I0MXcvJz3zSxTp3hb3Y0I0=",