|
|
@ -483,19 +483,24 @@ func (dre *dbRumperExecutor) doFetch(db int) error { |
|
|
|
log.Debugf("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key) |
|
|
|
log.Debugf("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key) |
|
|
|
dre.sourceClient.Send("DUMP", key) |
|
|
|
dre.sourceClient.Send("DUMP", key) |
|
|
|
} |
|
|
|
} |
|
|
|
dumps, err := redis.Strings(dre.sourceClient.Do("")) |
|
|
|
|
|
|
|
|
|
|
|
reply, err := dre.sourceClient.Do("") |
|
|
|
|
|
|
|
dumpsRet, err := utils.WrapCommand(utils.ReplayString, reply, err) |
|
|
|
if err != nil && err != redis.ErrNil { |
|
|
|
if err != nil && err != redis.ErrNil { |
|
|
|
return fmt.Errorf("do dump with failed[%v]", err) |
|
|
|
return fmt.Errorf("do dump with failed[%v]", err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
dumps := dumpsRet.([]string) |
|
|
|
|
|
|
|
|
|
|
|
// pipeline ttl
|
|
|
|
// pipeline ttl
|
|
|
|
for _, key := range keys { |
|
|
|
for _, key := range keys { |
|
|
|
dre.sourceClient.Send("PTTL", key) |
|
|
|
dre.sourceClient.Send("PTTL", key) |
|
|
|
} |
|
|
|
} |
|
|
|
pttls, err := redis.Int64s(dre.sourceClient.Do("")) |
|
|
|
reply, err = dre.sourceClient.Do("") |
|
|
|
|
|
|
|
pttlsRet, err := utils.WrapCommand(utils.ReplayInt64s, reply, err) |
|
|
|
if err != nil && err != redis.ErrNil { |
|
|
|
if err != nil && err != redis.ErrNil { |
|
|
|
return fmt.Errorf("do ttl with failed[%v]", err) |
|
|
|
return fmt.Errorf("do ttl with failed[%v]", err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
pttls := pttlsRet.([]int64) |
|
|
|
|
|
|
|
|
|
|
|
dre.stat.rCommands.Add(int64(len(keys))) |
|
|
|
dre.stat.rCommands.Add(int64(len(keys))) |
|
|
|
for i, k := range keys { |
|
|
|
for i, k := range keys { |
|
|
|