Merge pull request #88 from alibaba/bugfix-1.6.5

Bugfix 1.6.5
v4
Vinllen Chen 6 years ago committed by GitHub
commit 3796c9dc8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      ChangeLog
  2. 4
      src/redis-shake/common/configure.go
  3. 9
      src/redis-shake/main/main.go
  4. 282
      src/redis-shake/rump.go
  5. 6
      src/redis-shake/scanner/keyFileScanner.go
  6. 6
      src/redis-shake/scanner/normalScanner.go
  7. 31
      src/redis-shake/scanner/scanner.go
  8. 39
      src/redis-shake/scanner/specialCloudScanner.go

@ -1,3 +1,6 @@
2019-06-06 Alibaba Cloud.
* VERSION: 1.6.5
* IMPROVE: run rump in parallel to support several db nodes behind proxy.
2019-05-30 Alibaba Cloud.
* VERSION: 1.6.4
* BUGFIX: fix bug of `GetDetailedInfo` panic.

@ -141,8 +141,8 @@ func parseAddress(tp, address, redisType string, isSource bool) error {
setAddressList(isSource, address)
}
case conf.RedisTypeProxy:
if addressLen != 1 {
return fmt.Errorf("address[%v] length[%v] must == 1 when type is 'proxy'", addressLen, addressLen)
if isSource && addressLen != 1 {
return fmt.Errorf("address[%v] length[%v] must == 1 when type is 'proxy'", address, addressLen)
}
if isSource && tp != conf.TypeRump {
return fmt.Errorf("source.type == proxy should only happens when mode is 'rump'")

@ -390,14 +390,13 @@ func sanitizeOptions(tp string) error {
}
if conf.Options.ScanSpecialCloud != "" && conf.Options.ScanKeyFile != "" {
return fmt.Errorf("scan.special_cloud[%v] and scan.key_file[%v] cann't be given at the same time",
return fmt.Errorf("scan.special_cloud[%v] and scan.key_file[%v] can't all be given at the same time",
conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile)
}
if (conf.Options.ScanSpecialCloud != "" || conf.Options.ScanKeyFile != "") && len(conf.Options.SourceAddressList) > 1 {
return fmt.Errorf("source address should <= 1 when scan.special_cloud[%v] or scan.key_file[%v] given",
conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile)
}
//if len(conf.Options.SourceAddressList) == 1 {
// return fmt.Errorf("source address length should == 1 when type is 'rump'")
//}
}
return nil

@ -14,13 +14,136 @@ import (
)
type CmdRump struct {
sourceConn []redis.Conn
targetConn redis.Conn
}
func (cr *CmdRump) GetDetailedInfo() interface{} {
return nil
}
func (cr *CmdRump) Main() {
var wg sync.WaitGroup
wg.Add(len(conf.Options.SourceAddressList))
// build dbRumper
for i, address := range conf.Options.SourceAddressList {
dr := &dbRumper{
id: i,
address: address,
}
log.Infof("start dbRumper[%v]", i)
go func() {
defer wg.Done()
dr.run()
}()
}
wg.Wait()
log.Info("all rumpers finish!")
}
/*------------------------------------------------------*/
// one rump(1 db or 1 proxy) link corresponding to one dbRumper
type dbRumper struct {
id int // id
address string
client redis.Conn // source client
tencentNodes []string // for tencent cluster only
}
func (dr *dbRumper) run() {
// single connection
dr.client = utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable)
// some clouds may have several db under proxy
count, err := dr.getNode()
if err != nil {
log.Panicf("dbRumper[%v] get node failed[%v]", dr.id, err)
}
log.Infof("dbRumper[%v] get node count: %v", dr.id, count)
var wg sync.WaitGroup
wg.Add(count)
for i := 0; i < count; i++ {
var target []string
if conf.Options.TargetType == conf.RedisTypeCluster {
target = conf.Options.TargetAddressList
} else {
// round-robin pick
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList))
target = []string{conf.Options.TargetAddressList[pick]}
}
var tencentNodeId string
if len(dr.tencentNodes) > 0 {
tencentNodeId = dr.tencentNodes[i]
}
executor := &dbRumperExecutor{
rumperId: dr.id,
executorId: i,
sourceClient: utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable),
targetClient: utils.OpenRedisConn(target, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw, conf.Options.TargetType == conf.RedisTypeCluster,
conf.Options.TargetTLSEnable),
tencentNodeId: tencentNodeId,
}
go func() {
defer wg.Done()
executor.exec()
}()
}
wg.Wait()
log.Infof("dbRumper[%v] finished!", dr.id)
}
func (dr *dbRumper) getNode() (int, error) {
switch conf.Options.ScanSpecialCloud {
case utils.AliyunCluster:
info, err := redis.Bytes(dr.client.Do("info", "Cluster"))
if err != nil {
return -1, err
}
result := utils.ParseInfo(info)
if count, err := strconv.ParseInt(result["nodecount"], 10, 0); err != nil {
return -1, err
} else if count <= 0 {
return -1, fmt.Errorf("source node count[%v] illegal", count)
} else {
return int(count), nil
}
case utils.TencentCluster:
var err error
dr.tencentNodes, err = utils.GetAllClusterNode(dr.client, conf.StandAloneRoleMaster, "id")
if err != nil {
return -1, err
}
return len(dr.tencentNodes), nil
default:
return 1, nil
}
}
/*------------------------------------------------------*/
// one executor(1 db only) link corresponding to one dbRumperExecutor
type dbRumperExecutor struct {
rumperId int // father id
executorId int // current id, also == aliyun cluster node id
sourceClient redis.Conn
targetClient redis.Conn
tencentNodeId string // tencent cluster node id
keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2
resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3
scanners []scanner.Scanner // one scanner match one db/proxy
scanner scanner.Scanner // one scanner match one db/proxy
fetcherWg sync.WaitGroup
}
@ -30,131 +153,103 @@ type KeyNode struct {
pttl int64
}
func (cr *CmdRump) GetDetailedInfo() interface{} {
return nil
}
func (cr *CmdRump) Main() {
// build connection
cr.sourceConn = make([]redis.Conn, len(conf.Options.SourceAddressList))
for i, address := range conf.Options.SourceAddressList {
cr.sourceConn[i] = utils.OpenRedisConn([]string{address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable)
func (dre *dbRumperExecutor) exec() {
// create scanner
dre.scanner = scanner.NewScanner(dre.sourceClient, dre.tencentNodeId, dre.executorId)
if dre.scanner == nil {
log.Panicf("dbRumper[%v] executor[%v] create scanner failed", dre.rumperId, dre.executorId)
return
}
// TODO, current only support write data into 1 db or proxy
cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddressList, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw, false, conf.Options.SourceTLSEnable)
// init two channels
chanSize := int(conf.Options.ScanKeyNumber) * len(conf.Options.SourceAddressList)
cr.keyChan = make(chan *KeyNode, chanSize)
cr.resultChan = make(chan *KeyNode, chanSize)
cr.scanners = scanner.NewScanner(cr.sourceConn)
if cr.scanners == nil || len(cr.scanners) == 0 {
log.Panic("create scanner failed")
return
}
chanSize := int(conf.Options.ScanKeyNumber * 2)
dre.keyChan = make(chan *KeyNode, chanSize)
dre.resultChan = make(chan *KeyNode, chanSize)
/*
* we start 4 routines to run:
* 1. fetch keys from the source redis
* 2. wait fetcher all exit
* 3. write keys into the target redis
* 4. read result from the target redis
* 2. write keys into the target redis
* 3. read result from the target redis
*/
// routine1
cr.fetcherWg.Add(len(cr.scanners))
for i := range cr.scanners {
go cr.fetcher(i)
}
// routine2
go func() {
cr.fetcherWg.Wait()
close(cr.keyChan)
}()
go dre.fetcher()
// routine3
go cr.writer()
go dre.writer()
// routine4
cr.receiver()
}
dre.receiver()
func (cr *CmdRump) fetcher(idx int) {
length, err := cr.scanners[idx].NodeCount()
if err != nil || length <= 0 {
log.Panicf("fetch db node failed: length[%v], error[%v]", length, err)
log.Infof("dbRumper[%v] executor[%v] finish!", dre.rumperId, dre.executorId)
}
log.Infof("start fetcher with special-cloud[%v], nodes[%v]", conf.Options.ScanSpecialCloud, length)
func (dre *dbRumperExecutor) fetcher() {
log.Infof("dbRumper[%v] executor[%v] start fetcher with special-cloud[%v]", dre.rumperId, dre.executorId,
conf.Options.ScanSpecialCloud)
// iterate all source nodes
for i := 0; i < length; i++ {
// fetch db number from 'info Keyspace'
dbNumber, err := cr.getSourceDbList(i)
// fetch db number from 'info keyspace'
dbNumber, err := dre.getSourceDbList()
if err != nil {
log.Panic(err)
}
log.Infof("fetch node[%v] with db list: %v", i, dbNumber)
log.Infof("dbRumper[%v] executor[%v] fetch db list: %v", dre.rumperId, dre.executorId, dbNumber)
// iterate all db
for _, db := range dbNumber {
log.Infof("fetch node[%v] db[%v]", i, db)
if err := cr.doFetch(int(db), i); err != nil {
log.Infof("dbRumper[%v] executor[%v] fetch logical db: %v", dre.rumperId, dre.executorId, db)
if err := dre.doFetch(int(db)); err != nil {
log.Panic(err)
}
}
}
cr.fetcherWg.Done()
close(dre.keyChan)
}
func (cr *CmdRump) writer() {
func (dre *dbRumperExecutor) writer() {
var count uint32
for ele := range cr.keyChan {
for ele := range dre.keyChan {
if ele.pttl == -1 { // not set ttl
ele.pttl = 0
}
if ele.pttl == -2 {
log.Debugf("skip key %s for expired", ele.key)
log.Debugf("dbRumper[%v] executor[%v] skip key %s for expired", dre.rumperId, dre.executorId, ele.key)
continue
}
log.Debugf("restore %s", ele.key)
// TODO, big key split
log.Debugf("dbRumper[%v] executor[%v] restore %s", dre.rumperId, dre.executorId, ele.key)
if conf.Options.Rewrite {
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE")
dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE")
} else {
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value)
dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value)
}
cr.resultChan <- ele
dre.resultChan <- ele
count++
if count == conf.Options.ScanKeyNumber {
// batch
log.Debugf("send keys %d\n", count)
cr.targetConn.Flush()
log.Debugf("dbRumper[%v] executor[%v] send keys %d", dre.rumperId, dre.executorId, count)
dre.targetClient.Flush()
count = 0
}
}
cr.targetConn.Flush()
close(cr.resultChan)
dre.targetClient.Flush()
close(dre.resultChan)
}
func (cr *CmdRump) receiver() {
for ele := range cr.resultChan {
if _, err := cr.targetConn.Receive(); err != nil && err != redis.ErrNil {
log.Panicf("restore key[%v] with pttl[%v] error[%v]", ele.key, strconv.FormatInt(ele.pttl, 10),
err)
func (dre *dbRumperExecutor) receiver() {
for ele := range dre.resultChan {
if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil {
log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId,
dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err)
}
}
}
func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) {
conn := cr.sourceConn[id]
if ret, err := conn.Do("info", "Keyspace"); err != nil {
func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) {
conn := dre.sourceClient
if ret, err := conn.Do("info", "keyspace"); err != nil {
return nil, err
} else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil {
return nil, err
@ -169,62 +264,63 @@ func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) {
}
}
func (cr *CmdRump) doFetch(db, idx int) error {
func (dre *dbRumperExecutor) doFetch(db int) error {
// send 'select' command to both source and target
log.Infof("send source select db")
if _, err := cr.sourceConn[idx].Do("select", db); err != nil {
log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId)
if _, err := dre.sourceClient.Do("select", db); err != nil {
return err
}
log.Infof("send target select db")
cr.targetConn.Flush()
if err := cr.targetConn.Send("select", db); err != nil {
// it's ok to send select directly because the message order can be guaranteed.
log.Infof("dbRumper[%v] executor[%v] send target select db", dre.rumperId, dre.executorId)
dre.targetClient.Flush()
if err := dre.targetClient.Send("select", db); err != nil {
return err
}
cr.targetConn.Flush()
dre.targetClient.Flush()
log.Infof("finish select db, start fetching node[%v] db[%v]", idx, db)
log.Infof("dbRumper[%v] executor[%v] start fetching node db[%v]", dre.rumperId, dre.executorId, db)
for {
keys, err := cr.scanners[idx].ScanKey(idx)
keys, err := dre.scanner.ScanKey()
if err != nil {
return err
}
log.Info("scanned keys: ", len(keys))
log.Infof("dbRumper[%v] executor[%v] scanned keys number: %v", dre.rumperId, dre.executorId, len(keys))
if len(keys) != 0 {
// pipeline dump
for _, key := range keys {
log.Debug("scan key: ", key)
cr.sourceConn[idx].Send("DUMP", key)
log.Debug("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key)
dre.sourceClient.Send("DUMP", key)
}
dumps, err := redis.Strings(cr.sourceConn[idx].Do(""))
dumps, err := redis.Strings(dre.sourceClient.Do(""))
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do dump with failed[%v]", err)
}
// pipeline ttl
for _, key := range keys {
cr.sourceConn[idx].Send("PTTL", key)
dre.sourceClient.Send("PTTL", key)
}
pttls, err := redis.Int64s(cr.sourceConn[idx].Do(""))
pttls, err := redis.Int64s(dre.sourceClient.Do(""))
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do ttl with failed[%v]", err)
}
for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
dre.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
}
}
// Last iteration of scan.
if cr.scanners[idx].EndNode() {
if dre.scanner.EndNode() {
break
}
}
log.Infof("finish fetching node[%v] db[%v]", idx, db)
log.Infof("dbRumper[%v] executor[%v] finish fetching db[%v]", dre.rumperId, dre.executorId, db)
return nil
}

@ -13,11 +13,7 @@ type KeyFileScanner struct {
cnt int // mark the number of this scan. init: -1
}
func (kfs *KeyFileScanner) NodeCount() (int, error) {
return 1, nil
}
func (kfs *KeyFileScanner) ScanKey(node interface{}) ([]string, error) {
func (kfs *KeyFileScanner) ScanKey() ([]string, error) {
keys := make([]string, 0, conf.Options.ScanKeyNumber)
for i := 0; i < int(conf.Options.ScanKeyNumber) && kfs.bufScan.Scan(); i++ {
keys = append(keys, kfs.bufScan.Text())

@ -13,11 +13,7 @@ type NormalScanner struct {
cursor int64
}
func (ns *NormalScanner) NodeCount() (int, error) {
return 1, nil
}
func (ns *NormalScanner) ScanKey(node interface{}) ([]string, error) {
func (ns *NormalScanner) ScanKey() ([]string, error) {
var keys []string
values, err := redis.Values(ns.client.Do("SCAN", ns.cursor, "COUNT",
conf.Options.ScanKeyNumber))

@ -11,15 +11,8 @@ import (
// scanner used to scan keys
type Scanner interface {
/*
* get db node info.
* return:
* int: node number. Used in aliyun_cluster
*/
NodeCount() (int, error)
// return scanned keys
ScanKey(node interface{}) ([]string, error) // return the scanned keys
ScanKey() ([]string, error) // return the scanned keys
// end current node
EndNode() bool
@ -27,35 +20,29 @@ type Scanner interface {
Close()
}
func NewScanner(client []redis.Conn) []Scanner {
func NewScanner(client redis.Conn, tencentNodeId string, aliyunNodeId int) Scanner {
if conf.Options.ScanSpecialCloud != "" {
return []Scanner{
&SpecialCloudScanner{
client: client[0],
return &SpecialCloudScanner{
client: client,
cursor: 0,
},
tencentNodeId: tencentNodeId,
aliyunNodeId: aliyunNodeId,
}
} else if conf.Options.ScanKeyFile != "" {
if f, err := os.Open(conf.Options.ScanKeyFile); err != nil {
log.Errorf("open scan-key-file[%v] error[%v]", conf.Options.ScanKeyFile, err)
return nil
} else {
return []Scanner{
&KeyFileScanner{
return &KeyFileScanner{
f: f,
bufScan: bufio.NewScanner(f),
cnt: -1,
},
}
}
} else {
ret := make([]Scanner, 0, len(client))
for _, c := range client {
ret = append(ret, &NormalScanner{
client: c,
return &NormalScanner{
client: client,
cursor: 0,
})
}
return ret
}
}

@ -1,7 +1,6 @@
package scanner
import (
"strconv"
"fmt"
"redis-shake/common"
"redis-shake/configure"
@ -13,39 +12,11 @@ type SpecialCloudScanner struct {
client redis.Conn
cursor int64
tencentNodes []string
tencentNodeId string
aliyunNodeId int
}
func (scs *SpecialCloudScanner) NodeCount() (int, error) {
switch conf.Options.ScanSpecialCloud {
case utils.AliyunCluster:
info, err := redis.Bytes(scs.client.Do("info", "Cluster"))
if err != nil {
return -1, err
}
result := utils.ParseInfo(info)
if count, err := strconv.ParseInt(result["nodecount"], 10, 0); err != nil {
return -1, err
} else if count <= 0 {
return -1, fmt.Errorf("source node count[%v] illegal", count)
} else {
return int(count), nil
}
case utils.TencentCluster:
var err error
scs.tencentNodes, err = utils.GetAllClusterNode(scs.client, conf.StandAloneRoleMaster, "id")
if err != nil {
return -1, err
}
return len(scs.tencentNodes), nil
default:
return -1, nil
}
}
func (scs *SpecialCloudScanner) ScanKey(node interface{}) ([]string, error) {
func (scs *SpecialCloudScanner) ScanKey() ([]string, error) {
var (
values []interface{}
err error
@ -55,9 +26,9 @@ func (scs *SpecialCloudScanner) ScanKey(node interface{}) ([]string, error) {
switch conf.Options.ScanSpecialCloud {
case utils.TencentCluster:
values, err = redis.Values(scs.client.Do("SCAN", scs.cursor, "COUNT",
conf.Options.ScanKeyNumber, scs.tencentNodes[node.(int)]))
conf.Options.ScanKeyNumber, scs.tencentNodeId))
case utils.AliyunCluster:
values, err = redis.Values(scs.client.Do("ISCAN", node, scs.cursor, "COUNT",
values, err = redis.Values(scs.client.Do("ISCAN", scs.aliyunNodeId, scs.cursor, "COUNT",
conf.Options.ScanKeyNumber))
}
if err != nil && err != redis.ErrNil {

Loading…
Cancel
Save