set psync = true by default, see #214

v4
vinllen 5 years ago
parent bf651e6eb4
commit a2ea9eee3c
  1. 4
      ChangeLog
  2. 1
      README.md
  3. 4
      conf/redis-shake.conf
  4. 5
      src/redis-shake/main/main.go
  5. 8
      src/redis-shake/sync.go

@ -1,10 +1,12 @@
2019-12-16 Alibaba Cloud.
2019-12-xx Alibaba Cloud.
* VERSION: 1.6.24
* BUGFIX: cluster receive channel size adjust from 4096 to `sender.count`.
* BUGFIX: update redis-go-cluster to solve the send and receive
concurrency conflict.
* BUGFIX: fix some bugs in redis-go-cluster including io timeout problem,
see #192, #210.
* IMPROVE: set 'psync' to true by default in configuration, if the source
redis version is less than v2.8, switch to false.
2019-11-28 Alibaba Cloud.
* VERSION: 1.6.23
* BUGFIX: update redis-go-cluster driver to solve MOVED error in lua

@ -104,3 +104,4 @@ Plus, we have a DingDing(钉钉) group so that users can join and discuss, pleas
| muicoder | muicoder@gmail.com |
| zhklcf | huikangzhu@126.com |
| shuff1e | sfxu@foxmail.com |
| xuhualin | xuhualing8439523@163.com |

@ -152,8 +152,8 @@ big_key_threshold = 524288000
# use psync command.
# used in `sync`.
# 默认使用sync命令,启用将会使用psync命令
psync = false
# 默认使用psync命令进行同步,置为false将会用sync命令进行同步,代码层面会自动识别2.8以前的版本改为sync。
psync = true
# enable metric
# used in `sync`.

@ -469,6 +469,11 @@ func sanitizeOptions(tp string) error {
conf.Options.TargetVersion, conf.Options.SourceVersion)
conf.Options.BigKeyThreshold = 1
}
// use 'sync' instead of 'psync' if the source version < 2.8
if ret := utils.CompareVersion(conf.Options.SourceVersion, "2.8", 2); ret == 1 && conf.Options.Psync {
conf.Options.Psync = false
}
}
if tp == conf.TypeRump {

@ -361,24 +361,24 @@ func (ds *dbSyncer) sendPSyncCmd(master, auth_type, passwd string, tlsEnable boo
}
func (ds *dbSyncer) pSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer, offset int64, copyto io.Writer) (int64, error) {
// TODO, two times call c.Close() ? maybe a bug
defer c.Close()
var nread atomic2.Int64
go func() {
defer c.Close()
for {
time.Sleep(time.Second * 1)
for range time.NewTicker(1 * time.Second).C {
select {
case <-ds.waitFull:
if err := utils.SendPSyncAck(bw, offset+nread.Get()); err != nil {
log.Errorf("dbSyncer[%v] send offset to source redis failed[%v]", ds.id, err)
return
}
default:
if err := utils.SendPSyncAck(bw, 0); err != nil {
log.Errorf("dbSyncer[%v] send offset to source redis failed[%v]", ds.id, err)
return
}
}
}
log.Errorf("dbSyncer[%v] heartbeat thread closed!", ds.id)
}()
var p = make([]byte, 8192)

Loading…
Cancel
Save