diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index c6a0db4..fe34842 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -16,6 +16,8 @@ import ( "redis-shake/filter" "github.com/garyburd/redigo/redis" + "time" + "bytes" ) type CmdRump struct { @@ -178,21 +180,25 @@ func (dr *dbRumper) getNode() (int, error) { /*------------------------------------------------------*/ // one executor(1 db only) link corresponding to one dbRumperExecutor type dbRumperExecutor struct { - rumperId int // father id - executorId int // current id, also == aliyun cluster node id - sourceClient redis.Conn // source client - targetClient redis.Conn // target client - tencentNodeId string // tencent cluster node id - targetBigKeyClient redis.Conn // target client only used in big key, this is a bit ugly - previousDb int // store previous db + rumperId int // father id + executorId int // current id, also == aliyun cluster node id + sourceClient redis.Conn // source client + targetClient redis.Conn // target client + tencentNodeId string // tencent cluster node id + targetBigKeyClient redis.Conn // target client only used in big key, this is a bit ugly + previousDb int // store previous db - keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2 - resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3 + keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2 + resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3 - scanner scanner.Scanner // one scanner match one db/proxy + scanner scanner.Scanner // one scanner match one db/proxy - fetcherWg sync.WaitGroup - stat dbRumperExexutorStats + fetcherWg sync.WaitGroup + stat dbRumperExexutorStats + + dbList []int32 // db list + keyNumber int64 // key in this db number + close bool // is finish? } func NewDbRumperExecutor(rumperId, executorId int, sourceClient, targetClient, targetBigKeyClient redis.Conn, @@ -273,6 +279,13 @@ func (dre *dbRumperExecutor) exec() { dre.keyChan = make(chan *KeyNode, chanSize) dre.resultChan = make(chan *KeyNode, chanSize) + // fetch db number from 'info keyspace' + var err error + dre.dbList, dre.keyNumber, err = dre.getSourceDbList() + if err != nil { + log.Panic(err) + } + /* * we start 4 routines to run: * 1. fetch keys from the source redis @@ -286,7 +299,20 @@ func (dre *dbRumperExecutor) exec() { go dre.writer() // routine4 - dre.receiver() + go dre.receiver() + + // start metric + for range time.NewTicker(1 * time.Second).C { + if dre.close { + break + } + + var b bytes.Buffer + fmt.Fprintf(&b, "dbRumper[%v] total = %v(keys) - %10v(keys) [%3d%%] entry=%-12d", + dre.rumperId, dre.keyNumber, dre.stat.cCommands.Get(), + 100 * dre.stat.cCommands.Get() / dre.keyNumber, dre.stat.wCommands.Get()) + log.Info(b.String()) + } log.Infof("dbRumper[%v] executor[%v] finish!", dre.rumperId, dre.executorId) } @@ -295,15 +321,9 @@ func (dre *dbRumperExecutor) fetcher() { log.Infof("dbRumper[%v] executor[%v] start fetcher with special-cloud[%v]", dre.rumperId, dre.executorId, conf.Options.ScanSpecialCloud) - // fetch db number from 'info keyspace' - dbNumber, err := dre.getSourceDbList() - if err != nil { - log.Panic(err) - } - - log.Infof("dbRumper[%v] executor[%v] fetch db list: %v", dre.rumperId, dre.executorId, dbNumber) + log.Infof("dbRumper[%v] executor[%v] fetch db list: %v", dre.rumperId, dre.executorId, dre.dbList) // iterate all db nodes - for _, db := range dbNumber { + for _, db := range dre.dbList { if filter.FilterDB(int(db)) { log.Infof("dbRumper[%v] executor[%v] db[%v] filtered", dre.rumperId, dre.executorId, db) continue @@ -423,34 +443,38 @@ func (dre *dbRumperExecutor) receiver() { for ele := range dre.resultChan { if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil { rdbVersion, checksum, checkErr := utils.CheckVersionChecksum(utils.String2Bytes(ele.value)) - log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], " + - "rdb version[%v], checksum[%v], check error[%v]", + log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], "+ + "rdb version[%v], checksum[%v], check error[%v]", dre.rumperId, dre.executorId, ele.key, err, strconv.FormatInt(ele.pttl, 10), len(ele.value), rdbVersion, checksum, checkErr) } dre.stat.cCommands.Incr() } + + dre.close = true } -func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) { +func (dre *dbRumperExecutor) getSourceDbList() ([]int32, int64, error) { // tencent cluster only has 1 logical db if conf.Options.ScanSpecialCloud == utils.TencentCluster { - return []int32{0}, nil + return []int32{0}, 1, nil } conn := dre.sourceClient if ret, err := conn.Do("info", "keyspace"); err != nil { - return nil, err + return nil, 0, err } else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil { - return nil, err + return nil, 0, err } else { list := make([]int32, 0, len(mp)) - for key, val := range mp { - if val > 0 { - list = append(list, key) + var total int64 + for db, number := range mp { + if number > 0 && !filter.FilterDB(int(db)) { + list = append(list, db) + total += number } } - return list, nil + return list, total, nil } } @@ -459,7 +483,7 @@ func (dre *dbRumperExecutor) doFetch(db int) error { if db != dre.previousDb { dre.previousDb = db // send 'select' command to both source and target - log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId) + log.Debugf("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId) if _, err := dre.sourceClient.Do("select", db); err != nil { return err } @@ -475,7 +499,7 @@ func (dre *dbRumperExecutor) doFetch(db int) error { return err } - log.Infof("dbRumper[%v] executor[%v] scanned keys number: %v", dre.rumperId, dre.executorId, len(keys)) + log.Debugf("dbRumper[%v] executor[%v] scanned keys number: %v", dre.rumperId, dre.executorId, len(keys)) if len(keys) != 0 { // pipeline dump