finish parallel rump

v4
vinllen 6 years ago
parent 26d21693b5
commit e24b6edfe4
  1. 9
      src/redis-shake/main/main.go
  2. 305
      src/redis-shake/rump.go
  3. 6
      src/redis-shake/scanner/keyFileScanner.go
  4. 6
      src/redis-shake/scanner/normalScanner.go
  5. 41
      src/redis-shake/scanner/scanner.go
  6. 39
      src/redis-shake/scanner/specialCloudScanner.go

@ -390,14 +390,13 @@ func sanitizeOptions(tp string) error {
} }
if conf.Options.ScanSpecialCloud != "" && conf.Options.ScanKeyFile != "" { if conf.Options.ScanSpecialCloud != "" && conf.Options.ScanKeyFile != "" {
return fmt.Errorf("scan.special_cloud[%v] and scan.key_file[%v] cann't be given at the same time", return fmt.Errorf("scan.special_cloud[%v] and scan.key_file[%v] can't all be given at the same time",
conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile)
} }
if (conf.Options.ScanSpecialCloud != "" || conf.Options.ScanKeyFile != "") && len(conf.Options.SourceAddressList) > 1 { //if len(conf.Options.SourceAddressList) == 1 {
return fmt.Errorf("source address should <= 1 when scan.special_cloud[%v] or scan.key_file[%v] given", // return fmt.Errorf("source address length should == 1 when type is 'rump'")
conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) //}
}
} }
return nil return nil

@ -14,13 +14,136 @@ import (
) )
type CmdRump struct { type CmdRump struct {
sourceConn []redis.Conn }
targetConn redis.Conn
func (cr *CmdRump) GetDetailedInfo() interface{} {
return nil
}
func (cr *CmdRump) Main() {
var wg sync.WaitGroup
wg.Add(len(conf.Options.SourceAddressList))
// build dbRumper
for i, address := range conf.Options.SourceAddressList {
dr := &dbRumper{
id: i,
address: address,
}
log.Infof("start dbRumper[%v]", i)
go func() {
defer wg.Done()
dr.run()
}()
}
wg.Wait()
log.Info("all rumpers finish!")
}
/*------------------------------------------------------*/
// one rump(1 db or 1 proxy) link corresponding to one dbRumper
type dbRumper struct {
id int // id
address string
client redis.Conn // source client
tencentNodes []string // for tencent cluster only
}
func (dr *dbRumper) run() {
// single connection
dr.client = utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable)
// some clouds may have several db under proxy
count, err := dr.getNode()
if err != nil {
log.Panicf("dbRumper[%v] get node failed[%v]", dr.id, err)
}
log.Infof("dbRumper[%v] get node count: %v", dr.id, count)
var wg sync.WaitGroup
wg.Add(count)
for i := 0; i < count; i++ {
var target []string
if conf.Options.TargetType == conf.RedisTypeCluster {
target = conf.Options.TargetAddressList
} else {
// round-robin pick
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList))
target = []string{conf.Options.TargetAddressList[pick]}
}
var tencentNodeId string
if len(dr.tencentNodes) > 0 {
tencentNodeId = dr.tencentNodes[i]
}
executor := &dbRumperExecutor{
rumperId: dr.id,
executorId: i,
sourceClient: utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable),
targetClient: utils.OpenRedisConn(target, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw, conf.Options.TargetType == conf.RedisTypeCluster,
conf.Options.TargetTLSEnable),
tencentNodeId: tencentNodeId,
}
go func() {
defer wg.Done()
executor.exec()
}()
}
wg.Wait()
log.Infof("dbRumper[%v] finished!", dr.id)
}
func (dr *dbRumper) getNode() (int, error) {
switch conf.Options.ScanSpecialCloud {
case utils.AliyunCluster:
info, err := redis.Bytes(dr.client.Do("info", "Cluster"))
if err != nil {
return -1, err
}
result := utils.ParseInfo(info)
if count, err := strconv.ParseInt(result["nodecount"], 10, 0); err != nil {
return -1, err
} else if count <= 0 {
return -1, fmt.Errorf("source node count[%v] illegal", count)
} else {
return int(count), nil
}
case utils.TencentCluster:
var err error
dr.tencentNodes, err = utils.GetAllClusterNode(dr.client, conf.StandAloneRoleMaster, "id")
if err != nil {
return -1, err
}
return len(dr.tencentNodes), nil
default:
return 1, nil
}
}
/*------------------------------------------------------*/
// one executor(1 db only) link corresponding to one dbRumperExecutor
type dbRumperExecutor struct {
rumperId int // father id
executorId int // current id, also == aliyun cluster node id
sourceClient redis.Conn
targetClient redis.Conn
tencentNodeId string // tencent cluster node id
keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2 keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2
resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3 resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3
scanners []scanner.Scanner // one scanner match one db/proxy scanner scanner.Scanner // one scanner match one db/proxy
fetcherWg sync.WaitGroup fetcherWg sync.WaitGroup
} }
@ -30,137 +153,103 @@ type KeyNode struct {
pttl int64 pttl int64
} }
func (cr *CmdRump) GetDetailedInfo() interface{} { func (dre *dbRumperExecutor) exec() {
return nil // create scanner
} dre.scanner = scanner.NewScanner(dre.sourceClient, dre.tencentNodeId, dre.executorId)
if dre.scanner == nil {
func (cr *CmdRump) Main() { log.Panicf("dbRumper[%v] executor[%v] create scanner failed", dre.rumperId, dre.executorId)
// build connection return
cr.sourceConn = make([]redis.Conn, len(conf.Options.SourceAddressList))
for i, address := range conf.Options.SourceAddressList {
cr.sourceConn[i] = utils.OpenRedisConn([]string{address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable)
} }
// TODO, current only support write data into 1 db or proxy
cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddressList, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw, false, conf.Options.SourceTLSEnable)
// init two channels // init two channels
chanSize := int(conf.Options.ScanKeyNumber) * len(conf.Options.SourceAddressList) chanSize := int(conf.Options.ScanKeyNumber * 2)
cr.keyChan = make(chan *KeyNode, chanSize) dre.keyChan = make(chan *KeyNode, chanSize)
cr.resultChan = make(chan *KeyNode, chanSize) dre.resultChan = make(chan *KeyNode, chanSize)
cr.scanners = scanner.NewScanner(cr.sourceConn)
if cr.scanners == nil || len(cr.scanners) == 0 {
log.Panic("create scanner failed")
return
}
/* /*
* we start 4 routines to run: * we start 4 routines to run:
* 1. fetch keys from the source redis * 1. fetch keys from the source redis
* 2. wait fetcher all exit * 2. write keys into the target redis
* 3. write keys into the target redis * 3. read result from the target redis
* 4. read result from the target redis
*/ */
// routine1 // routine1
cr.fetcherWg.Add(len(cr.scanners)) go dre.fetcher()
for i := range cr.scanners {
go cr.fetcher(i)
}
// routine2
go func() {
cr.fetcherWg.Wait()
close(cr.keyChan)
}()
// routine3 // routine3
go cr.writer() go dre.writer()
// routine4 // routine4
cr.receiver() dre.receiver()
}
/*------------------------------------------------------*/
// one rump link corresponding to one dbRumper
type dbRumper struct {
log.Infof("dbRumper[%v] executor[%v] finish!", dre.rumperId, dre.executorId)
} }
func (cr *CmdRump) fetcher(idx int) { func (dre *dbRumperExecutor) fetcher() {
length, err := cr.scanners[idx].NodeCount() log.Infof("dbRumper[%v] executor[%v] start fetcher with special-cloud[%v]", dre.rumperId, dre.executorId,
if err != nil || length <= 0 { conf.Options.ScanSpecialCloud)
log.Panicf("fetch db node failed: length[%v], error[%v]", length, err)
}
log.Infof("start fetcher with special-cloud[%v], nodes[%v]", conf.Options.ScanSpecialCloud, length) // fetch db number from 'info keyspace'
dbNumber, err := dre.getSourceDbList()
if err != nil {
log.Panic(err)
}
// iterate all source nodes log.Infof("dbRumper[%v] executor[%v] fetch db list: %v", dre.rumperId, dre.executorId, dbNumber)
for i := 0; i < length; i++ { // iterate all db
// fetch db number from 'info Keyspace' for _, db := range dbNumber {
dbNumber, err := cr.getSourceDbList(idx) log.Infof("dbRumper[%v] executor[%v] fetch logical db: %v", dre.rumperId, dre.executorId, db)
if err != nil { if err := dre.doFetch(int(db)); err != nil {
log.Panic(err) log.Panic(err)
} }
log.Infof("fetch node[%v] with db list: %v", i, dbNumber)
// iterate all db
for _, db := range dbNumber {
log.Infof("fetch node[%v] db[%v]", i, db)
if err := cr.doFetch(int(db), i); err != nil {
log.Panic(err)
}
}
} }
cr.fetcherWg.Done() close(dre.keyChan)
} }
func (cr *CmdRump) writer() { func (dre *dbRumperExecutor) writer() {
var count uint32 var count uint32
for ele := range cr.keyChan { for ele := range dre.keyChan {
if ele.pttl == -1 { // not set ttl if ele.pttl == -1 { // not set ttl
ele.pttl = 0 ele.pttl = 0
} }
if ele.pttl == -2 { if ele.pttl == -2 {
log.Debugf("skip key %s for expired", ele.key) log.Debugf("dbRumper[%v] executor[%v] skip key %s for expired", dre.rumperId, dre.executorId, ele.key)
continue continue
} }
log.Debugf("restore %s", ele.key) // TODO, big key split
log.Debugf("dbRumper[%v] executor[%v] restore %s", dre.rumperId, dre.executorId, ele.key)
if conf.Options.Rewrite { if conf.Options.Rewrite {
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE") dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE")
} else { } else {
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value) dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value)
} }
cr.resultChan <- ele dre.resultChan <- ele
count++ count++
if count == conf.Options.ScanKeyNumber { if count == conf.Options.ScanKeyNumber {
// batch // batch
log.Debugf("send keys %d\n", count) log.Debugf("dbRumper[%v] executor[%v] send keys %d", dre.rumperId, dre.executorId, count)
cr.targetConn.Flush() dre.targetClient.Flush()
count = 0 count = 0
} }
} }
cr.targetConn.Flush() dre.targetClient.Flush()
close(cr.resultChan) close(dre.resultChan)
} }
func (cr *CmdRump) receiver() { func (dre *dbRumperExecutor) receiver() {
for ele := range cr.resultChan { for ele := range dre.resultChan {
if _, err := cr.targetConn.Receive(); err != nil && err != redis.ErrNil { if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil {
log.Panicf("restore key[%v] with pttl[%v] error[%v]", ele.key, strconv.FormatInt(ele.pttl, 10), log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId,
err) dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err)
} }
} }
} }
func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) { func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) {
conn := cr.sourceConn[id] conn := dre.sourceClient
if ret, err := conn.Do("info", "Keyspace"); err != nil { if ret, err := conn.Do("info", "keyspace"); err != nil {
return nil, err return nil, err
} else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil { } else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil {
return nil, err return nil, err
@ -175,69 +264,63 @@ func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) {
} }
} }
func (cr *CmdRump) doFetch(db, idx int) error { func (dre *dbRumperExecutor) doFetch(db int) error {
// send 'select' command to both source and target // send 'select' command to both source and target
log.Infof("send source select db") log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId)
// todo sourceConn[0] if _, err := dre.sourceClient.Do("select", db); err != nil {
if _, err := cr.sourceConn[0].Do("select", db); err != nil {
return err return err
} }
log.Infof("send target select db") // it's ok to send select directly because the message order can be guaranteed.
cr.targetConn.Flush() log.Infof("dbRumper[%v] executor[%v] send target select db", dre.rumperId, dre.executorId)
if err := cr.targetConn.Send("select", db); err != nil { dre.targetClient.Flush()
if err := dre.targetClient.Send("select", db); err != nil {
return err return err
} }
cr.targetConn.Flush() dre.targetClient.Flush()
log.Infof("finish select db, start fetching node[%v] db[%v]", idx, db) log.Infof("dbRumper[%v] executor[%v] start fetching node db[%v]", dre.rumperId, dre.executorId, db)
for { for {
// todo sourceConn[0] keys, err := dre.scanner.ScanKey()
keys, err := cr.scanners[0].ScanKey(idx)
if err != nil { if err != nil {
return err return err
} }
log.Info("scanned keys: ", len(keys)) log.Infof("dbRumper[%v] executor[%v] scanned keys number: %v", dre.rumperId, dre.executorId, len(keys))
if len(keys) != 0 { if len(keys) != 0 {
// pipeline dump // pipeline dump
for _, key := range keys { for _, key := range keys {
log.Debug("scan key: ", key) log.Debug("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key)
// todo sourceConn[0] dre.sourceClient.Send("DUMP", key)
cr.sourceConn[0].Send("DUMP", key)
} }
// todo sourceConn[0] dumps, err := redis.Strings(dre.sourceClient.Do(""))
dumps, err := redis.Strings(cr.sourceConn[0].Do(""))
if err != nil && err != redis.ErrNil { if err != nil && err != redis.ErrNil {
return fmt.Errorf("do dump with failed[%v]", err) return fmt.Errorf("do dump with failed[%v]", err)
} }
// pipeline ttl // pipeline ttl
for _, key := range keys { for _, key := range keys {
// todo sourceConn[0] dre.sourceClient.Send("PTTL", key)
cr.sourceConn[0].Send("PTTL", key)
} }
// todo sourceConn[0] pttls, err := redis.Int64s(dre.sourceClient.Do(""))
pttls, err := redis.Int64s(cr.sourceConn[0].Do(""))
if err != nil && err != redis.ErrNil { if err != nil && err != redis.ErrNil {
return fmt.Errorf("do ttl with failed[%v]", err) return fmt.Errorf("do ttl with failed[%v]", err)
} }
for i, k := range keys { for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]} dre.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
} }
} }
// Last iteration of scan. // Last iteration of scan.
// todo sourceConn[0] if dre.scanner.EndNode() {
if cr.scanners[0].EndNode() {
break break
} }
} }
log.Infof("finish fetching node[%v] db[%v]", idx, db) log.Infof("dbRumper[%v] executor[%v] finish fetching db[%v]", dre.rumperId, dre.executorId, db)
return nil return nil
} }

@ -13,11 +13,7 @@ type KeyFileScanner struct {
cnt int // mark the number of this scan. init: -1 cnt int // mark the number of this scan. init: -1
} }
func (kfs *KeyFileScanner) NodeCount() (int, error) { func (kfs *KeyFileScanner) ScanKey() ([]string, error) {
return 1, nil
}
func (kfs *KeyFileScanner) ScanKey(node interface{}) ([]string, error) {
keys := make([]string, 0, conf.Options.ScanKeyNumber) keys := make([]string, 0, conf.Options.ScanKeyNumber)
for i := 0; i < int(conf.Options.ScanKeyNumber) && kfs.bufScan.Scan(); i++ { for i := 0; i < int(conf.Options.ScanKeyNumber) && kfs.bufScan.Scan(); i++ {
keys = append(keys, kfs.bufScan.Text()) keys = append(keys, kfs.bufScan.Text())

@ -13,11 +13,7 @@ type NormalScanner struct {
cursor int64 cursor int64
} }
func (ns *NormalScanner) NodeCount() (int, error) { func (ns *NormalScanner) ScanKey() ([]string, error) {
return 1, nil
}
func (ns *NormalScanner) ScanKey(node interface{}) ([]string, error) {
var keys []string var keys []string
values, err := redis.Values(ns.client.Do("SCAN", ns.cursor, "COUNT", values, err := redis.Values(ns.client.Do("SCAN", ns.cursor, "COUNT",
conf.Options.ScanKeyNumber)) conf.Options.ScanKeyNumber))

@ -11,15 +11,8 @@ import (
// scanner used to scan keys // scanner used to scan keys
type Scanner interface { type Scanner interface {
/*
* get db node info.
* return:
* int: node number. Used in aliyun_cluster
*/
NodeCount() (int, error)
// return scanned keys // return scanned keys
ScanKey(node interface{}) ([]string, error) // return the scanned keys ScanKey() ([]string, error) // return the scanned keys
// end current node // end current node
EndNode() bool EndNode() bool
@ -27,35 +20,29 @@ type Scanner interface {
Close() Close()
} }
func NewScanner(client []redis.Conn) []Scanner { func NewScanner(client redis.Conn, tencentNodeId string, aliyunNodeId int) Scanner {
if conf.Options.ScanSpecialCloud != "" { if conf.Options.ScanSpecialCloud != "" {
return []Scanner{ return &SpecialCloudScanner{
&SpecialCloudScanner{ client: client,
client: client[0], cursor: 0,
cursor: 0, tencentNodeId: tencentNodeId,
}, aliyunNodeId: aliyunNodeId,
} }
} else if conf.Options.ScanKeyFile != "" { } else if conf.Options.ScanKeyFile != "" {
if f, err := os.Open(conf.Options.ScanKeyFile); err != nil { if f, err := os.Open(conf.Options.ScanKeyFile); err != nil {
log.Errorf("open scan-key-file[%v] error[%v]", conf.Options.ScanKeyFile, err) log.Errorf("open scan-key-file[%v] error[%v]", conf.Options.ScanKeyFile, err)
return nil return nil
} else { } else {
return []Scanner{ return &KeyFileScanner{
&KeyFileScanner{ f: f,
f: f, bufScan: bufio.NewScanner(f),
bufScan: bufio.NewScanner(f), cnt: -1,
cnt: -1,
},
} }
} }
} else { } else {
ret := make([]Scanner, 0, len(client)) return &NormalScanner{
for _, c := range client { client: client,
ret = append(ret, &NormalScanner{ cursor: 0,
client: c,
cursor: 0,
})
} }
return ret
} }
} }

@ -1,7 +1,6 @@
package scanner package scanner
import ( import (
"strconv"
"fmt" "fmt"
"redis-shake/common" "redis-shake/common"
"redis-shake/configure" "redis-shake/configure"
@ -13,39 +12,11 @@ type SpecialCloudScanner struct {
client redis.Conn client redis.Conn
cursor int64 cursor int64
tencentNodes []string tencentNodeId string
aliyunNodeId int
} }
func (scs *SpecialCloudScanner) NodeCount() (int, error) { func (scs *SpecialCloudScanner) ScanKey() ([]string, error) {
switch conf.Options.ScanSpecialCloud {
case utils.AliyunCluster:
info, err := redis.Bytes(scs.client.Do("info", "Cluster"))
if err != nil {
return -1, err
}
result := utils.ParseInfo(info)
if count, err := strconv.ParseInt(result["nodecount"], 10, 0); err != nil {
return -1, err
} else if count <= 0 {
return -1, fmt.Errorf("source node count[%v] illegal", count)
} else {
return int(count), nil
}
case utils.TencentCluster:
var err error
scs.tencentNodes, err = utils.GetAllClusterNode(scs.client, conf.StandAloneRoleMaster, "id")
if err != nil {
return -1, err
}
return len(scs.tencentNodes), nil
default:
return -1, nil
}
}
func (scs *SpecialCloudScanner) ScanKey(node interface{}) ([]string, error) {
var ( var (
values []interface{} values []interface{}
err error err error
@ -55,9 +26,9 @@ func (scs *SpecialCloudScanner) ScanKey(node interface{}) ([]string, error) {
switch conf.Options.ScanSpecialCloud { switch conf.Options.ScanSpecialCloud {
case utils.TencentCluster: case utils.TencentCluster:
values, err = redis.Values(scs.client.Do("SCAN", scs.cursor, "COUNT", values, err = redis.Values(scs.client.Do("SCAN", scs.cursor, "COUNT",
conf.Options.ScanKeyNumber, scs.tencentNodes[node.(int)])) conf.Options.ScanKeyNumber, scs.tencentNodeId))
case utils.AliyunCluster: case utils.AliyunCluster:
values, err = redis.Values(scs.client.Do("ISCAN", node, scs.cursor, "COUNT", values, err = redis.Values(scs.client.Do("ISCAN", scs.aliyunNodeId, scs.cursor, "COUNT",
conf.Options.ScanKeyNumber)) conf.Options.ScanKeyNumber))
} }
if err != nil && err != redis.ErrNil { if err != nil && err != redis.ErrNil {

Loading…
Cancel
Save