diff --git a/ChangeLog b/ChangeLog index 4c47679..259753b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,10 +1,12 @@ -2019-12-16 Alibaba Cloud. +2019-12-xx Alibaba Cloud. * VERSION: 1.6.24 * BUGFIX: cluster receive channel size adjust from 4096 to `sender.count`. * BUGFIX: update redis-go-cluster to solve the send and receive concurrency conflict. * BUGFIX: fix some bugs in redis-go-cluster including io timeout problem, see #192, #210. + * IMPROVE: set 'psync' to true by default in configuration, if the source + redis version is less than v2.8, switch to false. 2019-11-28 Alibaba Cloud. * VERSION: 1.6.23 * BUGFIX: update redis-go-cluster driver to solve MOVED error in lua diff --git a/README.md b/README.md index fb73a8f..2dbf00e 100644 --- a/README.md +++ b/README.md @@ -104,3 +104,4 @@ Plus, we have a DingDing(钉钉) group so that users can join and discuss, pleas | muicoder | muicoder@gmail.com | | zhklcf | huikangzhu@126.com | | shuff1e | sfxu@foxmail.com | +| xuhualin | xuhualing8439523@163.com | diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 9a7f76e..4b7f813 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -152,8 +152,8 @@ big_key_threshold = 524288000 # use psync command. # used in `sync`. -# 默认使用sync命令,启用将会使用psync命令 -psync = false +# 默认使用psync命令进行同步,置为false将会用sync命令进行同步,代码层面会自动识别2.8以前的版本改为sync。 +psync = true # enable metric # used in `sync`. diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index cd5f6c6..518fa45 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -469,6 +469,11 @@ func sanitizeOptions(tp string) error { conf.Options.TargetVersion, conf.Options.SourceVersion) conf.Options.BigKeyThreshold = 1 } + + // use 'sync' instead of 'psync' if the source version < 2.8 + if ret := utils.CompareVersion(conf.Options.SourceVersion, "2.8", 2); ret == 1 && conf.Options.Psync { + conf.Options.Psync = false + } } if tp == conf.TypeRump { diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 83bbc44..2bb85e6 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -361,24 +361,24 @@ func (ds *dbSyncer) sendPSyncCmd(master, auth_type, passwd string, tlsEnable boo } func (ds *dbSyncer) pSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer, offset int64, copyto io.Writer) (int64, error) { - // TODO, two times call c.Close() ? maybe a bug - defer c.Close() var nread atomic2.Int64 go func() { defer c.Close() - for { - time.Sleep(time.Second * 1) + for range time.NewTicker(1 * time.Second).C { select { case <-ds.waitFull: if err := utils.SendPSyncAck(bw, offset+nread.Get()); err != nil { + log.Errorf("dbSyncer[%v] send offset to source redis failed[%v]", ds.id, err) return } default: if err := utils.SendPSyncAck(bw, 0); err != nil { + log.Errorf("dbSyncer[%v] send offset to source redis failed[%v]", ds.id, err) return } } } + log.Errorf("dbSyncer[%v] heartbeat thread closed!", ds.id) }() var p = make([]byte, 8192)