diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 1ea6de6..c832c9a 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -9,7 +9,6 @@ import ( "crypto/tls" "encoding/binary" "fmt" - "github.com/alibaba/RedisShake/redis-shake/bigkey" "io" "math/rand" "net" @@ -18,6 +17,8 @@ import ( "strings" "time" + "github.com/alibaba/RedisShake/redis-shake/bigkey" + "github.com/alibaba/RedisShake/pkg/libs/atomic2" "github.com/alibaba/RedisShake/pkg/libs/errors" "github.com/alibaba/RedisShake/pkg/libs/log" @@ -25,7 +26,7 @@ import ( "github.com/alibaba/RedisShake/pkg/rdb" "github.com/alibaba/RedisShake/pkg/redis" - "github.com/alibaba/RedisShake/redis-shake/configure" + conf "github.com/alibaba/RedisShake/redis-shake/configure" "github.com/FZambia/go-sentinel" redigo "github.com/garyburd/redigo/redis" @@ -232,7 +233,7 @@ func SendPSyncFullsync(br *bufio.Reader, bw *bufio.Writer) (string, int64, <-cha return runid, offset, waitRdbDump(br) } -func SendPSyncContinue(br *bufio.Reader, bw *bufio.Writer, runid string, offset int64) (string, int64, <-chan int64) { +func SendPSyncContinue(br *bufio.Reader, bw *bufio.Writer, runid string, offset int64, isReconn bool) (string, int64, <-chan int64) { if offset != -1 { offset += 1 } @@ -262,6 +263,10 @@ func SendPSyncContinue(br *bufio.Reader, bw *bufio.Writer, runid string, offset log.Infof("Event:IncSyncStart\tId:%s\t", conf.Options.Id) return runid, offset - 1, nil } else if len(xx) >= 3 && strings.ToLower(xx[0]) == "fullresync" { + if isReconn { + log.Panic("psync with fullresync after reconnect is not supported yet") + } + v, err := strconv.ParseInt(xx[2], 10, 64) if err != nil { log.PanicError(err, "parse psync offset failed") diff --git a/src/redis-shake/dbSync/syncBegin.go b/src/redis-shake/dbSync/syncBegin.go index 9ecc396..f359568 100644 --- a/src/redis-shake/dbSync/syncBegin.go +++ b/src/redis-shake/dbSync/syncBegin.go @@ -2,16 +2,17 @@ package dbSync import ( "bufio" + "io" + "net" + "time" + "github.com/alibaba/RedisShake/pkg/libs/atomic2" "github.com/alibaba/RedisShake/pkg/libs/io/pipe" "github.com/alibaba/RedisShake/pkg/libs/log" "github.com/alibaba/RedisShake/redis-shake/base" - "github.com/alibaba/RedisShake/redis-shake/common" - "io" - "net" - "time" + utils "github.com/alibaba/RedisShake/redis-shake/common" - "github.com/alibaba/RedisShake/redis-shake/configure" + conf "github.com/alibaba/RedisShake/redis-shake/configure" ) // send command to source redis @@ -47,7 +48,7 @@ func (ds *DbSyncer) sendPSyncCmd(master, authType, passwd string, tlsEnable bool log.Infof("DbSyncer[%d] try to send 'psync' command: run-id[%v], offset[%v]", ds.id, runId, prevOffset) // send psync command and decode the result - runid, offset, wait := utils.SendPSyncContinue(br, bw, runId, prevOffset) + runid, offset, wait := utils.SendPSyncContinue(br, bw, runId, prevOffset, false) ds.stat.targetOffset.Set(offset) ds.fullSyncOffset = offset // store the full sync offset @@ -130,7 +131,7 @@ func (ds *DbSyncer) runIncrementalSync(c net.Conn, br *bufio.Reader, bw *bufio.W utils.SendPSyncListeningPort(c, conf.Options.HttpProfile) br = bufio.NewReaderSize(c, utils.ReaderBufferSize) bw = bufio.NewWriterSize(c, utils.WriterBufferSize) - utils.SendPSyncContinue(br, bw, runId, offset) + utils.SendPSyncContinue(br, bw, runId, offset, true) } }