add progress bar in rump mode

v4
vinllen 5 years ago
parent df1b4c2830
commit c579bc03bb
  1. 64
      src/redis-shake/rump.go

@ -16,6 +16,8 @@ import (
"redis-shake/filter" "redis-shake/filter"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"time"
"bytes"
) )
type CmdRump struct { type CmdRump struct {
@ -193,6 +195,10 @@ type dbRumperExecutor struct {
fetcherWg sync.WaitGroup fetcherWg sync.WaitGroup
stat dbRumperExexutorStats stat dbRumperExexutorStats
dbList []int32 // db list
keyNumber int64 // key in this db number
close bool // is finish?
} }
func NewDbRumperExecutor(rumperId, executorId int, sourceClient, targetClient, targetBigKeyClient redis.Conn, func NewDbRumperExecutor(rumperId, executorId int, sourceClient, targetClient, targetBigKeyClient redis.Conn,
@ -273,6 +279,13 @@ func (dre *dbRumperExecutor) exec() {
dre.keyChan = make(chan *KeyNode, chanSize) dre.keyChan = make(chan *KeyNode, chanSize)
dre.resultChan = make(chan *KeyNode, chanSize) dre.resultChan = make(chan *KeyNode, chanSize)
// fetch db number from 'info keyspace'
var err error
dre.dbList, dre.keyNumber, err = dre.getSourceDbList()
if err != nil {
log.Panic(err)
}
/* /*
* we start 4 routines to run: * we start 4 routines to run:
* 1. fetch keys from the source redis * 1. fetch keys from the source redis
@ -286,7 +299,20 @@ func (dre *dbRumperExecutor) exec() {
go dre.writer() go dre.writer()
// routine4 // routine4
dre.receiver() go dre.receiver()
// start metric
for range time.NewTicker(1 * time.Second).C {
if dre.close {
break
}
var b bytes.Buffer
fmt.Fprintf(&b, "dbRumper[%v] total = %v(keys) - %10v(keys) [%3d%%] entry=%-12d",
dre.rumperId, dre.keyNumber, dre.stat.cCommands.Get(),
100 * dre.stat.cCommands.Get() / dre.keyNumber, dre.stat.wCommands.Get())
log.Info(b.String())
}
log.Infof("dbRumper[%v] executor[%v] finish!", dre.rumperId, dre.executorId) log.Infof("dbRumper[%v] executor[%v] finish!", dre.rumperId, dre.executorId)
} }
@ -295,15 +321,9 @@ func (dre *dbRumperExecutor) fetcher() {
log.Infof("dbRumper[%v] executor[%v] start fetcher with special-cloud[%v]", dre.rumperId, dre.executorId, log.Infof("dbRumper[%v] executor[%v] start fetcher with special-cloud[%v]", dre.rumperId, dre.executorId,
conf.Options.ScanSpecialCloud) conf.Options.ScanSpecialCloud)
// fetch db number from 'info keyspace' log.Infof("dbRumper[%v] executor[%v] fetch db list: %v", dre.rumperId, dre.executorId, dre.dbList)
dbNumber, err := dre.getSourceDbList()
if err != nil {
log.Panic(err)
}
log.Infof("dbRumper[%v] executor[%v] fetch db list: %v", dre.rumperId, dre.executorId, dbNumber)
// iterate all db nodes // iterate all db nodes
for _, db := range dbNumber { for _, db := range dre.dbList {
if filter.FilterDB(int(db)) { if filter.FilterDB(int(db)) {
log.Infof("dbRumper[%v] executor[%v] db[%v] filtered", dre.rumperId, dre.executorId, db) log.Infof("dbRumper[%v] executor[%v] db[%v] filtered", dre.rumperId, dre.executorId, db)
continue continue
@ -423,34 +443,38 @@ func (dre *dbRumperExecutor) receiver() {
for ele := range dre.resultChan { for ele := range dre.resultChan {
if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil { if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil {
rdbVersion, checksum, checkErr := utils.CheckVersionChecksum(utils.String2Bytes(ele.value)) rdbVersion, checksum, checkErr := utils.CheckVersionChecksum(utils.String2Bytes(ele.value))
log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], " + log.Panicf("dbRumper[%v] executor[%v] restore key[%v] error[%v]: pttl[%v], value length[%v], "+
"rdb version[%v], checksum[%v], check error[%v]", "rdb version[%v], checksum[%v], check error[%v]",
dre.rumperId, dre.executorId, ele.key, err, strconv.FormatInt(ele.pttl, 10), len(ele.value), dre.rumperId, dre.executorId, ele.key, err, strconv.FormatInt(ele.pttl, 10), len(ele.value),
rdbVersion, checksum, checkErr) rdbVersion, checksum, checkErr)
} }
dre.stat.cCommands.Incr() dre.stat.cCommands.Incr()
} }
dre.close = true
} }
func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) { func (dre *dbRumperExecutor) getSourceDbList() ([]int32, int64, error) {
// tencent cluster only has 1 logical db // tencent cluster only has 1 logical db
if conf.Options.ScanSpecialCloud == utils.TencentCluster { if conf.Options.ScanSpecialCloud == utils.TencentCluster {
return []int32{0}, nil return []int32{0}, 1, nil
} }
conn := dre.sourceClient conn := dre.sourceClient
if ret, err := conn.Do("info", "keyspace"); err != nil { if ret, err := conn.Do("info", "keyspace"); err != nil {
return nil, err return nil, 0, err
} else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil { } else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil {
return nil, err return nil, 0, err
} else { } else {
list := make([]int32, 0, len(mp)) list := make([]int32, 0, len(mp))
for key, val := range mp { var total int64
if val > 0 { for db, number := range mp {
list = append(list, key) if number > 0 && !filter.FilterDB(int(db)) {
list = append(list, db)
total += number
} }
} }
return list, nil return list, total, nil
} }
} }
@ -459,7 +483,7 @@ func (dre *dbRumperExecutor) doFetch(db int) error {
if db != dre.previousDb { if db != dre.previousDb {
dre.previousDb = db dre.previousDb = db
// send 'select' command to both source and target // send 'select' command to both source and target
log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId) log.Debugf("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId)
if _, err := dre.sourceClient.Do("select", db); err != nil { if _, err := dre.sourceClient.Do("select", db); err != nil {
return err return err
} }
@ -475,7 +499,7 @@ func (dre *dbRumperExecutor) doFetch(db int) error {
return err return err
} }
log.Infof("dbRumper[%v] executor[%v] scanned keys number: %v", dre.rumperId, dre.executorId, len(keys)) log.Debugf("dbRumper[%v] executor[%v] scanned keys number: %v", dre.rumperId, dre.executorId, len(keys))
if len(keys) != 0 { if len(keys) != 0 {
// pipeline dump // pipeline dump

Loading…
Cancel
Save