diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 9ab1c3f..58346d5 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -390,14 +390,13 @@ func sanitizeOptions(tp string) error { } if conf.Options.ScanSpecialCloud != "" && conf.Options.ScanKeyFile != "" { - return fmt.Errorf("scan.special_cloud[%v] and scan.key_file[%v] cann't be given at the same time", + return fmt.Errorf("scan.special_cloud[%v] and scan.key_file[%v] can't all be given at the same time", conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) } - if (conf.Options.ScanSpecialCloud != "" || conf.Options.ScanKeyFile != "") && len(conf.Options.SourceAddressList) > 1 { - return fmt.Errorf("source address should <= 1 when scan.special_cloud[%v] or scan.key_file[%v] given", - conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) - } + //if len(conf.Options.SourceAddressList) == 1 { + // return fmt.Errorf("source address length should == 1 when type is 'rump'") + //} } return nil diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 96e8603..a7032a7 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -14,13 +14,136 @@ import ( ) type CmdRump struct { - sourceConn []redis.Conn - targetConn redis.Conn +} + +func (cr *CmdRump) GetDetailedInfo() interface{} { + return nil +} + +func (cr *CmdRump) Main() { + var wg sync.WaitGroup + wg.Add(len(conf.Options.SourceAddressList)) + // build dbRumper + for i, address := range conf.Options.SourceAddressList { + dr := &dbRumper{ + id: i, + address: address, + } + log.Infof("start dbRumper[%v]", i) + go func() { + defer wg.Done() + dr.run() + }() + } + wg.Wait() + + log.Info("all rumpers finish!") +} + +/*------------------------------------------------------*/ +// one rump(1 db or 1 proxy) link corresponding to one dbRumper +type dbRumper struct { + id int // id + address string + + client redis.Conn // source client + tencentNodes []string // for tencent cluster only +} + +func (dr *dbRumper) run() { + // single connection + dr.client = utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType, + conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable) + + // some clouds may have several db under proxy + count, err := dr.getNode() + if err != nil { + log.Panicf("dbRumper[%v] get node failed[%v]", dr.id, err) + } + + log.Infof("dbRumper[%v] get node count: %v", dr.id, count) + + var wg sync.WaitGroup + wg.Add(count) + for i := 0; i < count; i++ { + var target []string + if conf.Options.TargetType == conf.RedisTypeCluster { + target = conf.Options.TargetAddressList + } else { + // round-robin pick + pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList)) + target = []string{conf.Options.TargetAddressList[pick]} + } + + var tencentNodeId string + if len(dr.tencentNodes) > 0 { + tencentNodeId = dr.tencentNodes[i] + } + + executor := &dbRumperExecutor{ + rumperId: dr.id, + executorId: i, + sourceClient: utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType, + conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable), + targetClient: utils.OpenRedisConn(target, conf.Options.TargetAuthType, + conf.Options.TargetPasswordRaw, conf.Options.TargetType == conf.RedisTypeCluster, + conf.Options.TargetTLSEnable), + tencentNodeId: tencentNodeId, + } + + go func() { + defer wg.Done() + executor.exec() + }() + } + + wg.Wait() + + log.Infof("dbRumper[%v] finished!", dr.id) +} + +func (dr *dbRumper) getNode() (int, error) { + switch conf.Options.ScanSpecialCloud { + case utils.AliyunCluster: + info, err := redis.Bytes(dr.client.Do("info", "Cluster")) + if err != nil { + return -1, err + } + + result := utils.ParseInfo(info) + if count, err := strconv.ParseInt(result["nodecount"], 10, 0); err != nil { + return -1, err + } else if count <= 0 { + return -1, fmt.Errorf("source node count[%v] illegal", count) + } else { + return int(count), nil + } + case utils.TencentCluster: + var err error + dr.tencentNodes, err = utils.GetAllClusterNode(dr.client, conf.StandAloneRoleMaster, "id") + if err != nil { + return -1, err + } + + return len(dr.tencentNodes), nil + default: + return 1, nil + } +} + +/*------------------------------------------------------*/ +// one executor(1 db only) link corresponding to one dbRumperExecutor +type dbRumperExecutor struct { + rumperId int // father id + executorId int // current id, also == aliyun cluster node id + sourceClient redis.Conn + targetClient redis.Conn + tencentNodeId string // tencent cluster node id keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2 resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3 - scanners []scanner.Scanner // one scanner match one db/proxy + scanner scanner.Scanner // one scanner match one db/proxy fetcherWg sync.WaitGroup } @@ -30,137 +153,103 @@ type KeyNode struct { pttl int64 } -func (cr *CmdRump) GetDetailedInfo() interface{} { - return nil -} - -func (cr *CmdRump) Main() { - // build connection - - cr.sourceConn = make([]redis.Conn, len(conf.Options.SourceAddressList)) - for i, address := range conf.Options.SourceAddressList { - cr.sourceConn[i] = utils.OpenRedisConn([]string{address}, conf.Options.SourceAuthType, - conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable) +func (dre *dbRumperExecutor) exec() { + // create scanner + dre.scanner = scanner.NewScanner(dre.sourceClient, dre.tencentNodeId, dre.executorId) + if dre.scanner == nil { + log.Panicf("dbRumper[%v] executor[%v] create scanner failed", dre.rumperId, dre.executorId) + return } - // TODO, current only support write data into 1 db or proxy - cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddressList, conf.Options.TargetAuthType, - conf.Options.TargetPasswordRaw, false, conf.Options.SourceTLSEnable) // init two channels - chanSize := int(conf.Options.ScanKeyNumber) * len(conf.Options.SourceAddressList) - cr.keyChan = make(chan *KeyNode, chanSize) - cr.resultChan = make(chan *KeyNode, chanSize) - - cr.scanners = scanner.NewScanner(cr.sourceConn) - if cr.scanners == nil || len(cr.scanners) == 0 { - log.Panic("create scanner failed") - return - } + chanSize := int(conf.Options.ScanKeyNumber * 2) + dre.keyChan = make(chan *KeyNode, chanSize) + dre.resultChan = make(chan *KeyNode, chanSize) /* * we start 4 routines to run: * 1. fetch keys from the source redis - * 2. wait fetcher all exit - * 3. write keys into the target redis - * 4. read result from the target redis + * 2. write keys into the target redis + * 3. read result from the target redis */ // routine1 - cr.fetcherWg.Add(len(cr.scanners)) - for i := range cr.scanners { - go cr.fetcher(i) - } - - // routine2 - go func() { - cr.fetcherWg.Wait() - close(cr.keyChan) - }() + go dre.fetcher() // routine3 - go cr.writer() + go dre.writer() // routine4 - cr.receiver() -} - -/*------------------------------------------------------*/ -// one rump link corresponding to one dbRumper -type dbRumper struct { + dre.receiver() + log.Infof("dbRumper[%v] executor[%v] finish!", dre.rumperId, dre.executorId) } -func (cr *CmdRump) fetcher(idx int) { - length, err := cr.scanners[idx].NodeCount() - if err != nil || length <= 0 { - log.Panicf("fetch db node failed: length[%v], error[%v]", length, err) - } +func (dre *dbRumperExecutor) fetcher() { + log.Infof("dbRumper[%v] executor[%v] start fetcher with special-cloud[%v]", dre.rumperId, dre.executorId, + conf.Options.ScanSpecialCloud) - log.Infof("start fetcher with special-cloud[%v], nodes[%v]", conf.Options.ScanSpecialCloud, length) + // fetch db number from 'info keyspace' + dbNumber, err := dre.getSourceDbList() + if err != nil { + log.Panic(err) + } - // iterate all source nodes - for i := 0; i < length; i++ { - // fetch db number from 'info Keyspace' - dbNumber, err := cr.getSourceDbList(idx) - if err != nil { + log.Infof("dbRumper[%v] executor[%v] fetch db list: %v", dre.rumperId, dre.executorId, dbNumber) + // iterate all db + for _, db := range dbNumber { + log.Infof("dbRumper[%v] executor[%v] fetch logical db: %v", dre.rumperId, dre.executorId, db) + if err := dre.doFetch(int(db)); err != nil { log.Panic(err) } - - log.Infof("fetch node[%v] with db list: %v", i, dbNumber) - // iterate all db - for _, db := range dbNumber { - log.Infof("fetch node[%v] db[%v]", i, db) - if err := cr.doFetch(int(db), i); err != nil { - log.Panic(err) - } - } } - cr.fetcherWg.Done() + close(dre.keyChan) } -func (cr *CmdRump) writer() { +func (dre *dbRumperExecutor) writer() { var count uint32 - for ele := range cr.keyChan { + for ele := range dre.keyChan { if ele.pttl == -1 { // not set ttl ele.pttl = 0 } if ele.pttl == -2 { - log.Debugf("skip key %s for expired", ele.key) + log.Debugf("dbRumper[%v] executor[%v] skip key %s for expired", dre.rumperId, dre.executorId, ele.key) continue } - log.Debugf("restore %s", ele.key) + // TODO, big key split + log.Debugf("dbRumper[%v] executor[%v] restore %s", dre.rumperId, dre.executorId, ele.key) if conf.Options.Rewrite { - cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE") + dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE") } else { - cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value) + dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value) } - cr.resultChan <- ele + dre.resultChan <- ele count++ if count == conf.Options.ScanKeyNumber { // batch - log.Debugf("send keys %d\n", count) - cr.targetConn.Flush() + log.Debugf("dbRumper[%v] executor[%v] send keys %d", dre.rumperId, dre.executorId, count) + dre.targetClient.Flush() count = 0 } } - cr.targetConn.Flush() - close(cr.resultChan) + dre.targetClient.Flush() + close(dre.resultChan) } -func (cr *CmdRump) receiver() { - for ele := range cr.resultChan { - if _, err := cr.targetConn.Receive(); err != nil && err != redis.ErrNil { - log.Panicf("restore key[%v] with pttl[%v] error[%v]", ele.key, strconv.FormatInt(ele.pttl, 10), - err) +func (dre *dbRumperExecutor) receiver() { + for ele := range dre.resultChan { + if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil { + log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId, + dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err) } } } -func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) { - conn := cr.sourceConn[id] - if ret, err := conn.Do("info", "Keyspace"); err != nil { +func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) { + conn := dre.sourceClient + if ret, err := conn.Do("info", "keyspace"); err != nil { return nil, err } else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil { return nil, err @@ -175,69 +264,63 @@ func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) { } } -func (cr *CmdRump) doFetch(db, idx int) error { +func (dre *dbRumperExecutor) doFetch(db int) error { // send 'select' command to both source and target - log.Infof("send source select db") - // todo sourceConn[0] - if _, err := cr.sourceConn[0].Do("select", db); err != nil { + log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId) + if _, err := dre.sourceClient.Do("select", db); err != nil { return err } - log.Infof("send target select db") - cr.targetConn.Flush() - if err := cr.targetConn.Send("select", db); err != nil { + // it's ok to send select directly because the message order can be guaranteed. + log.Infof("dbRumper[%v] executor[%v] send target select db", dre.rumperId, dre.executorId) + dre.targetClient.Flush() + if err := dre.targetClient.Send("select", db); err != nil { return err } - cr.targetConn.Flush() + dre.targetClient.Flush() - log.Infof("finish select db, start fetching node[%v] db[%v]", idx, db) + log.Infof("dbRumper[%v] executor[%v] start fetching node db[%v]", dre.rumperId, dre.executorId, db) for { - // todo sourceConn[0] - keys, err := cr.scanners[0].ScanKey(idx) + keys, err := dre.scanner.ScanKey() if err != nil { return err } - log.Info("scanned keys: ", len(keys)) + log.Infof("dbRumper[%v] executor[%v] scanned keys number: %v", dre.rumperId, dre.executorId, len(keys)) if len(keys) != 0 { // pipeline dump for _, key := range keys { - log.Debug("scan key: ", key) - // todo sourceConn[0] - cr.sourceConn[0].Send("DUMP", key) + log.Debug("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key) + dre.sourceClient.Send("DUMP", key) } - // todo sourceConn[0] - dumps, err := redis.Strings(cr.sourceConn[0].Do("")) + dumps, err := redis.Strings(dre.sourceClient.Do("")) if err != nil && err != redis.ErrNil { return fmt.Errorf("do dump with failed[%v]", err) } // pipeline ttl for _, key := range keys { - // todo sourceConn[0] - cr.sourceConn[0].Send("PTTL", key) + dre.sourceClient.Send("PTTL", key) } - // todo sourceConn[0] - pttls, err := redis.Int64s(cr.sourceConn[0].Do("")) + pttls, err := redis.Int64s(dre.sourceClient.Do("")) if err != nil && err != redis.ErrNil { return fmt.Errorf("do ttl with failed[%v]", err) } for i, k := range keys { - cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]} + dre.keyChan <- &KeyNode{k, dumps[i], pttls[i]} } } // Last iteration of scan. - // todo sourceConn[0] - if cr.scanners[0].EndNode() { + if dre.scanner.EndNode() { break } } - log.Infof("finish fetching node[%v] db[%v]", idx, db) + log.Infof("dbRumper[%v] executor[%v] finish fetching db[%v]", dre.rumperId, dre.executorId, db) return nil } \ No newline at end of file diff --git a/src/redis-shake/scanner/keyFileScanner.go b/src/redis-shake/scanner/keyFileScanner.go index 2d50087..e6ad0ab 100644 --- a/src/redis-shake/scanner/keyFileScanner.go +++ b/src/redis-shake/scanner/keyFileScanner.go @@ -13,11 +13,7 @@ type KeyFileScanner struct { cnt int // mark the number of this scan. init: -1 } -func (kfs *KeyFileScanner) NodeCount() (int, error) { - return 1, nil -} - -func (kfs *KeyFileScanner) ScanKey(node interface{}) ([]string, error) { +func (kfs *KeyFileScanner) ScanKey() ([]string, error) { keys := make([]string, 0, conf.Options.ScanKeyNumber) for i := 0; i < int(conf.Options.ScanKeyNumber) && kfs.bufScan.Scan(); i++ { keys = append(keys, kfs.bufScan.Text()) diff --git a/src/redis-shake/scanner/normalScanner.go b/src/redis-shake/scanner/normalScanner.go index f35630e..9ebc720 100644 --- a/src/redis-shake/scanner/normalScanner.go +++ b/src/redis-shake/scanner/normalScanner.go @@ -13,11 +13,7 @@ type NormalScanner struct { cursor int64 } -func (ns *NormalScanner) NodeCount() (int, error) { - return 1, nil -} - -func (ns *NormalScanner) ScanKey(node interface{}) ([]string, error) { +func (ns *NormalScanner) ScanKey() ([]string, error) { var keys []string values, err := redis.Values(ns.client.Do("SCAN", ns.cursor, "COUNT", conf.Options.ScanKeyNumber)) diff --git a/src/redis-shake/scanner/scanner.go b/src/redis-shake/scanner/scanner.go index c016346..48a5410 100644 --- a/src/redis-shake/scanner/scanner.go +++ b/src/redis-shake/scanner/scanner.go @@ -11,15 +11,8 @@ import ( // scanner used to scan keys type Scanner interface { - /* - * get db node info. - * return: - * int: node number. Used in aliyun_cluster - */ - NodeCount() (int, error) - // return scanned keys - ScanKey(node interface{}) ([]string, error) // return the scanned keys + ScanKey() ([]string, error) // return the scanned keys // end current node EndNode() bool @@ -27,35 +20,29 @@ type Scanner interface { Close() } -func NewScanner(client []redis.Conn) []Scanner { +func NewScanner(client redis.Conn, tencentNodeId string, aliyunNodeId int) Scanner { if conf.Options.ScanSpecialCloud != "" { - return []Scanner{ - &SpecialCloudScanner{ - client: client[0], - cursor: 0, - }, + return &SpecialCloudScanner{ + client: client, + cursor: 0, + tencentNodeId: tencentNodeId, + aliyunNodeId: aliyunNodeId, } } else if conf.Options.ScanKeyFile != "" { if f, err := os.Open(conf.Options.ScanKeyFile); err != nil { log.Errorf("open scan-key-file[%v] error[%v]", conf.Options.ScanKeyFile, err) return nil } else { - return []Scanner{ - &KeyFileScanner{ - f: f, - bufScan: bufio.NewScanner(f), - cnt: -1, - }, + return &KeyFileScanner{ + f: f, + bufScan: bufio.NewScanner(f), + cnt: -1, } } } else { - ret := make([]Scanner, 0, len(client)) - for _, c := range client { - ret = append(ret, &NormalScanner{ - client: c, - cursor: 0, - }) + return &NormalScanner{ + client: client, + cursor: 0, } - return ret } } diff --git a/src/redis-shake/scanner/specialCloudScanner.go b/src/redis-shake/scanner/specialCloudScanner.go index bc33383..c1ded75 100644 --- a/src/redis-shake/scanner/specialCloudScanner.go +++ b/src/redis-shake/scanner/specialCloudScanner.go @@ -1,7 +1,6 @@ package scanner import ( - "strconv" "fmt" "redis-shake/common" "redis-shake/configure" @@ -13,39 +12,11 @@ type SpecialCloudScanner struct { client redis.Conn cursor int64 - tencentNodes []string + tencentNodeId string + aliyunNodeId int } -func (scs *SpecialCloudScanner) NodeCount() (int, error) { - switch conf.Options.ScanSpecialCloud { - case utils.AliyunCluster: - info, err := redis.Bytes(scs.client.Do("info", "Cluster")) - if err != nil { - return -1, err - } - - result := utils.ParseInfo(info) - if count, err := strconv.ParseInt(result["nodecount"], 10, 0); err != nil { - return -1, err - } else if count <= 0 { - return -1, fmt.Errorf("source node count[%v] illegal", count) - } else { - return int(count), nil - } - case utils.TencentCluster: - var err error - scs.tencentNodes, err = utils.GetAllClusterNode(scs.client, conf.StandAloneRoleMaster, "id") - if err != nil { - return -1, err - } - - return len(scs.tencentNodes), nil - default: - return -1, nil - } -} - -func (scs *SpecialCloudScanner) ScanKey(node interface{}) ([]string, error) { +func (scs *SpecialCloudScanner) ScanKey() ([]string, error) { var ( values []interface{} err error @@ -55,9 +26,9 @@ func (scs *SpecialCloudScanner) ScanKey(node interface{}) ([]string, error) { switch conf.Options.ScanSpecialCloud { case utils.TencentCluster: values, err = redis.Values(scs.client.Do("SCAN", scs.cursor, "COUNT", - conf.Options.ScanKeyNumber, scs.tencentNodes[node.(int)])) + conf.Options.ScanKeyNumber, scs.tencentNodeId)) case utils.AliyunCluster: - values, err = redis.Values(scs.client.Do("ISCAN", node, scs.cursor, "COUNT", + values, err = redis.Values(scs.client.Do("ISCAN", scs.aliyunNodeId, scs.cursor, "COUNT", conf.Options.ScanKeyNumber)) } if err != nil && err != redis.ErrNil {