|
|
|
@ -24,6 +24,7 @@ import ( |
|
|
|
|
"redis-shake/base" |
|
|
|
|
"redis-shake/heartbeat" |
|
|
|
|
"redis-shake/metric" |
|
|
|
|
"unsafe" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type delayNode struct { |
|
|
|
@ -361,6 +362,11 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd |
|
|
|
|
var sendId, recvId atomic2.Int64 |
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
|
if conf.Options.Psync == false { |
|
|
|
|
log.Warn("GetFakeSlaveOffset not enable when psync == false") |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
srcConn := utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, |
|
|
|
|
conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) |
|
|
|
|
ticker := time.NewTicker(10 * time.Second) |
|
|
|
@ -395,7 +401,11 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd |
|
|
|
|
go func() { |
|
|
|
|
var node *delayNode |
|
|
|
|
for { |
|
|
|
|
_, err := c.Receive() |
|
|
|
|
reply, err := c.Receive() |
|
|
|
|
|
|
|
|
|
// print debug log of receive reply
|
|
|
|
|
log.Debugf("receive reply: [%v], error: [%v]", reply, err) |
|
|
|
|
|
|
|
|
|
if conf.Options.Metric == false { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
@ -465,6 +475,15 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd |
|
|
|
|
metric.MetricVar.AddPullCmdCount(1) |
|
|
|
|
|
|
|
|
|
if scmd != "ping" { |
|
|
|
|
// print debug log of send command
|
|
|
|
|
if conf.Options.LogLevel == utils.LogLevelAll { |
|
|
|
|
strArgv := make([]string, len(argv)) |
|
|
|
|
for i, ele := range argv { |
|
|
|
|
strArgv[i] = *(*string)(unsafe.Pointer(&ele)) |
|
|
|
|
} |
|
|
|
|
log.Debugf("send command: [%s %v]", scmd, strArgv) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if strings.EqualFold(scmd, "select") { |
|
|
|
|
if len(argv) != 1 { |
|
|
|
|
log.Panicf("select command len(args) = %d", len(argv)) |
|
|
|
|