|
|
@ -178,20 +178,21 @@ func (dr *dbRumper) getNode() (int, error) { |
|
|
|
/*------------------------------------------------------*/ |
|
|
|
/*------------------------------------------------------*/ |
|
|
|
// one executor(1 db only) link corresponding to one dbRumperExecutor
|
|
|
|
// one executor(1 db only) link corresponding to one dbRumperExecutor
|
|
|
|
type dbRumperExecutor struct { |
|
|
|
type dbRumperExecutor struct { |
|
|
|
rumperId int // father id
|
|
|
|
rumperId int // father id
|
|
|
|
executorId int // current id, also == aliyun cluster node id
|
|
|
|
executorId int // current id, also == aliyun cluster node id
|
|
|
|
sourceClient redis.Conn // source client
|
|
|
|
sourceClient redis.Conn // source client
|
|
|
|
targetClient redis.Conn // target client
|
|
|
|
targetClient redis.Conn // target client
|
|
|
|
tencentNodeId string // tencent cluster node id
|
|
|
|
tencentNodeId string // tencent cluster node id
|
|
|
|
targetBigKeyClient redis.Conn // target client only used in big key, this is a bit ugly
|
|
|
|
targetBigKeyClient redis.Conn // target client only used in big key, this is a bit ugly
|
|
|
|
|
|
|
|
previousDb int // store previous db
|
|
|
|
|
|
|
|
|
|
|
|
keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2
|
|
|
|
keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2
|
|
|
|
resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3
|
|
|
|
resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3
|
|
|
|
|
|
|
|
|
|
|
|
scanner scanner.Scanner // one scanner match one db/proxy
|
|
|
|
scanner scanner.Scanner // one scanner match one db/proxy
|
|
|
|
fetcherWg sync.WaitGroup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stat dbRumperExexutorStats |
|
|
|
fetcherWg sync.WaitGroup |
|
|
|
|
|
|
|
stat dbRumperExexutorStats |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func NewDbRumperExecutor(rumperId, executorId int, sourceClient, targetClient, targetBigKeyClient redis.Conn, |
|
|
|
func NewDbRumperExecutor(rumperId, executorId int, sourceClient, targetClient, targetBigKeyClient redis.Conn, |
|
|
@ -203,6 +204,7 @@ func NewDbRumperExecutor(rumperId, executorId int, sourceClient, targetClient, t |
|
|
|
targetClient: targetClient, |
|
|
|
targetClient: targetClient, |
|
|
|
tencentNodeId: tencentNodeId, |
|
|
|
tencentNodeId: tencentNodeId, |
|
|
|
targetBigKeyClient: targetBigKeyClient, |
|
|
|
targetBigKeyClient: targetBigKeyClient, |
|
|
|
|
|
|
|
previousDb: 0, |
|
|
|
stat: dbRumperExexutorStats{ |
|
|
|
stat: dbRumperExexutorStats{ |
|
|
|
minSize: 1 << 30, |
|
|
|
minSize: 1 << 30, |
|
|
|
maxSize: 0, |
|
|
|
maxSize: 0, |
|
|
@ -453,7 +455,9 @@ func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (dre *dbRumperExecutor) doFetch(db int) error { |
|
|
|
func (dre *dbRumperExecutor) doFetch(db int) error { |
|
|
|
if conf.Options.ScanSpecialCloud != utils.TencentCluster { |
|
|
|
// some redis type only has db0, so we add this judge
|
|
|
|
|
|
|
|
if db != dre.previousDb { |
|
|
|
|
|
|
|
dre.previousDb = db |
|
|
|
// send 'select' command to both source and target
|
|
|
|
// send 'select' command to both source and target
|
|
|
|
log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId) |
|
|
|
log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId) |
|
|
|
if _, err := dre.sourceClient.Do("select", db); err != nil { |
|
|
|
if _, err := dre.sourceClient.Do("select", db); err != nil { |
|
|
|