fix: panic while reconnect source master fired fullresync. (#381)

v4
Jacky Wu 3 years ago committed by GitHub
parent 2651e3d749
commit 753b5d842f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      src/redis-shake/common/utils.go
  2. 15
      src/redis-shake/dbSync/syncBegin.go

@ -9,7 +9,6 @@ import (
"crypto/tls" "crypto/tls"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"github.com/alibaba/RedisShake/redis-shake/bigkey"
"io" "io"
"math/rand" "math/rand"
"net" "net"
@ -18,6 +17,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/alibaba/RedisShake/redis-shake/bigkey"
"github.com/alibaba/RedisShake/pkg/libs/atomic2" "github.com/alibaba/RedisShake/pkg/libs/atomic2"
"github.com/alibaba/RedisShake/pkg/libs/errors" "github.com/alibaba/RedisShake/pkg/libs/errors"
"github.com/alibaba/RedisShake/pkg/libs/log" "github.com/alibaba/RedisShake/pkg/libs/log"
@ -25,7 +26,7 @@ import (
"github.com/alibaba/RedisShake/pkg/rdb" "github.com/alibaba/RedisShake/pkg/rdb"
"github.com/alibaba/RedisShake/pkg/redis" "github.com/alibaba/RedisShake/pkg/redis"
"github.com/alibaba/RedisShake/redis-shake/configure" conf "github.com/alibaba/RedisShake/redis-shake/configure"
"github.com/FZambia/go-sentinel" "github.com/FZambia/go-sentinel"
redigo "github.com/garyburd/redigo/redis" redigo "github.com/garyburd/redigo/redis"
@ -232,7 +233,7 @@ func SendPSyncFullsync(br *bufio.Reader, bw *bufio.Writer) (string, int64, <-cha
return runid, offset, waitRdbDump(br) return runid, offset, waitRdbDump(br)
} }
func SendPSyncContinue(br *bufio.Reader, bw *bufio.Writer, runid string, offset int64) (string, int64, <-chan int64) { func SendPSyncContinue(br *bufio.Reader, bw *bufio.Writer, runid string, offset int64, isReconn bool) (string, int64, <-chan int64) {
if offset != -1 { if offset != -1 {
offset += 1 offset += 1
} }
@ -262,6 +263,10 @@ func SendPSyncContinue(br *bufio.Reader, bw *bufio.Writer, runid string, offset
log.Infof("Event:IncSyncStart\tId:%s\t", conf.Options.Id) log.Infof("Event:IncSyncStart\tId:%s\t", conf.Options.Id)
return runid, offset - 1, nil return runid, offset - 1, nil
} else if len(xx) >= 3 && strings.ToLower(xx[0]) == "fullresync" { } else if len(xx) >= 3 && strings.ToLower(xx[0]) == "fullresync" {
if isReconn {
log.Panic("psync with fullresync after reconnect is not supported yet")
}
v, err := strconv.ParseInt(xx[2], 10, 64) v, err := strconv.ParseInt(xx[2], 10, 64)
if err != nil { if err != nil {
log.PanicError(err, "parse psync offset failed") log.PanicError(err, "parse psync offset failed")

@ -2,16 +2,17 @@ package dbSync
import ( import (
"bufio" "bufio"
"io"
"net"
"time"
"github.com/alibaba/RedisShake/pkg/libs/atomic2" "github.com/alibaba/RedisShake/pkg/libs/atomic2"
"github.com/alibaba/RedisShake/pkg/libs/io/pipe" "github.com/alibaba/RedisShake/pkg/libs/io/pipe"
"github.com/alibaba/RedisShake/pkg/libs/log" "github.com/alibaba/RedisShake/pkg/libs/log"
"github.com/alibaba/RedisShake/redis-shake/base" "github.com/alibaba/RedisShake/redis-shake/base"
"github.com/alibaba/RedisShake/redis-shake/common" utils "github.com/alibaba/RedisShake/redis-shake/common"
"io"
"net"
"time"
"github.com/alibaba/RedisShake/redis-shake/configure" conf "github.com/alibaba/RedisShake/redis-shake/configure"
) )
// send command to source redis // send command to source redis
@ -47,7 +48,7 @@ func (ds *DbSyncer) sendPSyncCmd(master, authType, passwd string, tlsEnable bool
log.Infof("DbSyncer[%d] try to send 'psync' command: run-id[%v], offset[%v]", ds.id, runId, prevOffset) log.Infof("DbSyncer[%d] try to send 'psync' command: run-id[%v], offset[%v]", ds.id, runId, prevOffset)
// send psync command and decode the result // send psync command and decode the result
runid, offset, wait := utils.SendPSyncContinue(br, bw, runId, prevOffset) runid, offset, wait := utils.SendPSyncContinue(br, bw, runId, prevOffset, false)
ds.stat.targetOffset.Set(offset) ds.stat.targetOffset.Set(offset)
ds.fullSyncOffset = offset // store the full sync offset ds.fullSyncOffset = offset // store the full sync offset
@ -130,7 +131,7 @@ func (ds *DbSyncer) runIncrementalSync(c net.Conn, br *bufio.Reader, bw *bufio.W
utils.SendPSyncListeningPort(c, conf.Options.HttpProfile) utils.SendPSyncListeningPort(c, conf.Options.HttpProfile)
br = bufio.NewReaderSize(c, utils.ReaderBufferSize) br = bufio.NewReaderSize(c, utils.ReaderBufferSize)
bw = bufio.NewWriterSize(c, utils.WriterBufferSize) bw = bufio.NewWriterSize(c, utils.WriterBufferSize)
utils.SendPSyncContinue(br, bw, runId, offset) utils.SendPSyncContinue(br, bw, runId, offset, true)
} }
} }

Loading…
Cancel
Save