From f3f9b1b1477b881a4b31df305453ef81c772e19d Mon Sep 17 00:00:00 2001 From: Vinllen Chen Date: Thu, 13 Jun 2019 23:54:05 +0800 Subject: [PATCH] Improve 1.6.7 (#95) * add metric for rump, haven't tested yet * debug * debug2 * debug3 * debug4 * debug5 * debug6 * debug7 * debug8 * debug9 * debug10 * debug11 * debug12 * add qos: limit the qps * debug12 * debug13 * debug14 * debug15 * debug16 * split big key in rump * update ChangeLog: 1.6.7 * update ChangeLog: 1.6.7 --- ChangeLog | 5 + conf/redis-shake.conf | 4 + src/redis-shake/common/common.go | 16 ++ src/redis-shake/common/speed.go | 22 +++ src/redis-shake/common/split.go | 34 ++++ src/redis-shake/configure/configure.go | 1 + src/redis-shake/main/main.go | 8 +- src/redis-shake/metric/variables.go | 2 + src/redis-shake/rump.go | 256 +++++++++++++++++++++---- 9 files changed, 305 insertions(+), 43 deletions(-) create mode 100644 src/redis-shake/common/speed.go create mode 100644 src/redis-shake/common/split.go diff --git a/ChangeLog b/ChangeLog index 870184b..fdf12f3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +2019-06-13 Alibaba Cloud. + * VERSION: 1.6.7 + * IMPROVE: split big key in `rump` mode. + * IMPROVE: add rate transmission mechanism in `rump` mode. + * IMPROVE: add metric in `rump` mode. 2019-06-09 Alibaba Cloud. * VERSION: 1.6.6 * cherry-pick merge v1.4.4 diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 8c75911..d5944f9 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -193,6 +193,10 @@ scan.special_cloud = # 有些云版本,既不支持sync/psync,也不支持scan,我们支持从文件中进行读取所有key列表并进行抓取:一行一个key。 scan.key_file = +# limit the rate of transmission. Only used in `rump` currently. +# e.g., qps = 1000 means pass 1000 keys per second. default is 500,000(0 means default) +qps = 200000 + # ----------------splitter---------------- # below variables are useless for current open source version so don't set. diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 5b83e05..6c4ddd3 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -5,6 +5,8 @@ import ( "fmt" "net" "strings" + "reflect" + "unsafe" "pkg/libs/bytesize" "redis-shake/configure" @@ -96,3 +98,17 @@ func PickTargetRoundRobin(n int) int { }() return TargetRoundRobin } + +func String2Bytes(s string) []byte { + sh := (*reflect.StringHeader)(unsafe.Pointer(&s)) + bh := reflect.SliceHeader{ + Data: sh.Data, + Len: sh.Len, + Cap: sh.Len, + } + return *(*[]byte)(unsafe.Pointer(&bh)) +} + +func Bytes2String(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} \ No newline at end of file diff --git a/src/redis-shake/common/speed.go b/src/redis-shake/common/speed.go new file mode 100644 index 0000000..4295119 --- /dev/null +++ b/src/redis-shake/common/speed.go @@ -0,0 +1,22 @@ +package utils + +import "time" + +func StartQoS(limit int) chan struct{} { + bucket := make(chan struct{}, limit) + go func() { + for range time.NewTicker(1 * time.Second).C { + for i := 0; i < limit; i++ { + select { + case bucket <- struct{}{}: + default: + // break if bucket if full + break + } + + } + } + }() + + return bucket +} \ No newline at end of file diff --git a/src/redis-shake/common/split.go b/src/redis-shake/common/split.go new file mode 100644 index 0000000..30d76d2 --- /dev/null +++ b/src/redis-shake/common/split.go @@ -0,0 +1,34 @@ +package utils + +// big key split in rump +import ( + "pkg/rdb" + "pkg/libs/log" + + redigo "github.com/garyburd/redigo/redis" +) + +func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int) { + if _, err := client.Do("select", db); err != nil { + log.Panicf("send select db[%v] failed[%v]", db, err) + } + + entry := rdb.BinEntry{ + DB: uint32(db), + Key: String2Bytes(key), + Type: 0, // uselss + Value: String2Bytes(value), + ExpireAt: 0, // useless here + RealMemberCount: 0, + NeedReadLen: 1, + IdleTime: 0, + Freq: 0, + } + + restoreBigRdbEntry(client, &entry) + + // pttl + if _, err := client.Do("pexpire", key, pttl); err != nil { + log.Panicf("send key[%v] pexpire failed[%v]", key, err) + } +} \ No newline at end of file diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index e72b440..6676231 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -52,6 +52,7 @@ type Configuration struct { ScanKeyNumber uint32 `config:"scan.key_number"` ScanSpecialCloud string `config:"scan.special_cloud"` ScanKeyFile string `config:"scan.key_file"` + Qps int `config:"qps"` // inner variables ReplaceHashTag bool `config:"replace_hash_tag"` diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index f2b087b..dcfb0fb 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -371,11 +371,17 @@ func sanitizeOptions(tp string) error { conf.Options.SenderCount = defaultSenderCount } - if conf.Options.SenderDelayChannelSize == 0 { conf.Options.SenderDelayChannelSize = 32 } + // [0, 100 million] + if conf.Options.Qps < 0 || conf.Options.Qps >= 100000000 { + return fmt.Errorf("qps[%v] should in (0, 100000000]", conf.Options.Qps) + } else if conf.Options.Qps == 0 { + conf.Options.Qps = 500000 + } + if tp == conf.TypeRestore || tp == conf.TypeSync || tp == conf.TypeRump { // get target redis version and set TargetReplace. for _, address := range conf.Options.TargetAddressList { diff --git a/src/redis-shake/metric/variables.go b/src/redis-shake/metric/variables.go index f3a60a1..163cfcd 100644 --- a/src/redis-shake/metric/variables.go +++ b/src/redis-shake/metric/variables.go @@ -30,6 +30,7 @@ type MetricRest struct { SourceDBOffset interface{} // source redis offset SourceAddress interface{} TargetAddress interface{} + Details interface{} // other details info } func NewMetricRest() []MetricRest { @@ -76,6 +77,7 @@ func NewMetricRest() []MetricRest { SourceDBOffset: detailMap["SourceDBOffset"], SourceAddress: detailMap["SourceAddress"], TargetAddress: detailMap["TargetAddress"], + Details: detailMap["Details"], } } diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index a7032a7..5caf0cf 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -1,26 +1,48 @@ package run import ( - "pkg/libs/log" "strconv" "sync" "fmt" + "reflect" + "math" + "pkg/libs/log" + "pkg/libs/atomic2" "redis-shake/common" "redis-shake/configure" "redis-shake/scanner" + "redis-shake/metric" "github.com/garyburd/redigo/redis" ) type CmdRump struct { + dumpers []*dbRumper } func (cr *CmdRump) GetDetailedInfo() interface{} { - return nil + ret := make(map[string]interface{}, len(cr.dumpers)) + for _, dumper := range cr.dumpers { + if dumper == nil { + continue + } + ret[dumper.address] = dumper.getStats() + } + + // TODO, better to move to the next level + metric.AddMetric(0) + + return []map[string]interface{} { + { + "Details": ret, + }, + } } func (cr *CmdRump) Main() { + cr.dumpers = make([]*dbRumper, len(conf.Options.SourceAddressList)) + var wg sync.WaitGroup wg.Add(len(conf.Options.SourceAddressList)) // build dbRumper @@ -29,6 +51,9 @@ func (cr *CmdRump) Main() { id: i, address: address, } + + cr.dumpers[i] = dr + log.Infof("start dbRumper[%v]", i) go func() { defer wg.Done() @@ -37,17 +62,33 @@ func (cr *CmdRump) Main() { } wg.Wait() - log.Info("all rumpers finish!") + log.Infof("all rumpers finish!, total data: %v", cr.GetDetailedInfo()) } /*------------------------------------------------------*/ // one rump(1 db or 1 proxy) link corresponding to one dbRumper type dbRumper struct { - id int // id - address string + id int // id + address string // source address client redis.Conn // source client tencentNodes []string // for tencent cluster only + + executors []*dbRumperExecutor +} + +func (dr *dbRumper) getStats() map[string]interface{} { + ret := make(map[string]interface{}, len(dr.executors)) + for _, exe := range dr.executors { + if exe == nil { + continue + } + + id := fmt.Sprintf("%v", exe.executorId) + ret[id] = exe.getStats() + } + + return ret } func (dr *dbRumper) run() { @@ -63,6 +104,8 @@ func (dr *dbRumper) run() { log.Infof("dbRumper[%v] get node count: %v", dr.id, count) + dr.executors = make([]*dbRumperExecutor, count) + var wg sync.WaitGroup wg.Add(count) for i := 0; i < count; i++ { @@ -80,16 +123,16 @@ func (dr *dbRumper) run() { tencentNodeId = dr.tencentNodes[i] } - executor := &dbRumperExecutor{ - rumperId: dr.id, - executorId: i, - sourceClient: utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType, - conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable), - targetClient: utils.OpenRedisConn(target, conf.Options.TargetAuthType, - conf.Options.TargetPasswordRaw, conf.Options.TargetType == conf.RedisTypeCluster, - conf.Options.TargetTLSEnable), - tencentNodeId: tencentNodeId, - } + sourceClient := utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType, + conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable) + targetClient := utils.OpenRedisConn(target, conf.Options.TargetAuthType, + conf.Options.TargetPasswordRaw, conf.Options.TargetType == conf.RedisTypeCluster, + conf.Options.TargetTLSEnable) + targetBigKeyClient := utils.OpenRedisConn(target, conf.Options.TargetAuthType, + conf.Options.TargetPasswordRaw, conf.Options.TargetType == conf.RedisTypeCluster, + conf.Options.TargetTLSEnable) + executor := NewDbRumperExecutor(dr.id, i, sourceClient, targetClient, targetBigKeyClient, tencentNodeId) + dr.executors[i] = executor go func() { defer wg.Done() @@ -134,23 +177,84 @@ func (dr *dbRumper) getNode() (int, error) { /*------------------------------------------------------*/ // one executor(1 db only) link corresponding to one dbRumperExecutor type dbRumperExecutor struct { - rumperId int // father id - executorId int // current id, also == aliyun cluster node id - sourceClient redis.Conn - targetClient redis.Conn - tencentNodeId string // tencent cluster node id + rumperId int // father id + executorId int // current id, also == aliyun cluster node id + sourceClient redis.Conn // source client + targetClient redis.Conn // target client + tencentNodeId string // tencent cluster node id + targetBigKeyClient redis.Conn // target client only used in big key, this is a bit ugly keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2 resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3 scanner scanner.Scanner // one scanner match one db/proxy fetcherWg sync.WaitGroup + + stat dbRumperExexutorStats +} + +func NewDbRumperExecutor(rumperId, executorId int, sourceClient, targetClient, targetBigKeyClient redis.Conn, + tencentNodeId string) *dbRumperExecutor { + executor := &dbRumperExecutor{ + rumperId: rumperId, + executorId: executorId, + sourceClient: sourceClient, + targetClient: targetClient, + tencentNodeId: tencentNodeId, + targetBigKeyClient: targetBigKeyClient, + stat: dbRumperExexutorStats{ + minSize: 1 << 30, + maxSize: 0, + sumSize: 0, + }, + } + + return executor } type KeyNode struct { key string value string pttl int64 + db int +} + +type dbRumperExexutorStats struct { + rBytes atomic2.Int64 // read bytes + rCommands atomic2.Int64 // read commands + wBytes atomic2.Int64 // write bytes + wCommands atomic2.Int64 // write commands + cCommands atomic2.Int64 // confirmed commands + minSize int64 // min package size + maxSize int64 // max package size + sumSize int64 // total package size +} + +func (dre *dbRumperExecutor) getStats() map[string]interface{} { + kv := make(map[string]interface{}) + // stats -> map + v := reflect.ValueOf(dre.stat) + for i := 0; i < v.NumField(); i++ { + f := v.Field(i) + name := v.Type().Field(i).Name + switch f.Kind() { + case reflect.Struct: + // todo + kv[name] = f.Field(0).Int() + // kv[name] = f.Interface() + case reflect.Int64: + if name == "sumSize" { + continue + } + kv[name] = f.Int() + } + } + + kv["keyChan"] = len(dre.keyChan) + kv["resultChan"] = len(dre.resultChan) + kv["avgSize"] = float64(dre.stat.sumSize) / float64(dre.stat.rCommands.Get()) + + return kv } func (dre *dbRumperExecutor) exec() { @@ -195,7 +299,7 @@ func (dre *dbRumperExecutor) fetcher() { } log.Infof("dbRumper[%v] executor[%v] fetch db list: %v", dre.rumperId, dre.executorId, dbNumber) - // iterate all db + // iterate all db nodes for _, db := range dbNumber { log.Infof("dbRumper[%v] executor[%v] fetch logical db: %v", dre.rumperId, dre.executorId, db) if err := dre.doFetch(int(db)); err != nil { @@ -208,7 +312,17 @@ func (dre *dbRumperExecutor) fetcher() { func (dre *dbRumperExecutor) writer() { var count uint32 + var wBytes int64 + var err error + batch := make([]*KeyNode, 0, conf.Options.ScanKeyNumber) + + // used in QoS + bucket := utils.StartQoS(conf.Options.Qps) + preDb := 0 for ele := range dre.keyChan { + // QoS, limit the qps + <-bucket + if ele.pttl == -1 { // not set ttl ele.pttl = 0 } @@ -217,37 +331,93 @@ func (dre *dbRumperExecutor) writer() { continue } - // TODO, big key split - log.Debugf("dbRumper[%v] executor[%v] restore %s", dre.rumperId, dre.executorId, ele.key) + log.Debugf("dbRumper[%v] executor[%v] restore[%s], length[%v]", dre.rumperId, dre.executorId, ele.key, + len(ele.value)) + if uint64(len(ele.value)) >= conf.Options.BigKeyThreshold { + log.Infof("dbRumper[%v] executor[%v] restore big key[%v] with length[%v]", dre.rumperId, + dre.executorId, ele.key, len(ele.value)) + // flush previous cache + batch = dre.writeSend(batch, &count, &wBytes) + + // handle big key + utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db) + continue + } + + // send "select" command if db is different + if ele.db != preDb { + dre.targetClient.Send("select", ele.db) + preDb = ele.db + } + if conf.Options.Rewrite { - dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE") + err = dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE") } else { - dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value) + err = dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value) + } + if err != nil { + log.Panicf("dbRumper[%v] executor[%v] send key[%v] failed[%v]", dre.rumperId, dre.executorId, + ele.key, err) } - dre.resultChan <- ele + wBytes += int64(len(ele.value)) + batch = append(batch, ele) + // move to real send + // dre.resultChan <- ele count++ + if count == conf.Options.ScanKeyNumber { // batch log.Debugf("dbRumper[%v] executor[%v] send keys %d", dre.rumperId, dre.executorId, count) - dre.targetClient.Flush() - count = 0 + + batch = dre.writeSend(batch, &count, &wBytes) } } - dre.targetClient.Flush() + dre.writeSend(batch, &count, &wBytes) + close(dre.resultChan) } +func (dre *dbRumperExecutor) writeSend(batch []*KeyNode, count *uint32, wBytes *int64) []*KeyNode { + newBatch := make([]*KeyNode, 0, conf.Options.ScanKeyNumber) + if len(batch) == 0 { + return newBatch + } + + if err := dre.targetClient.Flush(); err != nil { + log.Panicf("dbRumper[%v] executor[%v] flush failed[%v]", dre.rumperId, dre.executorId, err) + } + + // real send + for _, ele := range batch { + dre.resultChan <- ele + } + + dre.stat.wCommands.Add(int64(*count)) + dre.stat.wBytes.Add(*wBytes) + + *count = 0 + *wBytes = 0 + + return newBatch +} + func (dre *dbRumperExecutor) receiver() { for ele := range dre.resultChan { if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil { log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId, dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err) } + dre.stat.cCommands.Incr() } } func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) { + // tencent cluster only has 1 logical db + if conf.Options.ScanSpecialCloud == utils.TencentCluster { + return []int32{0}, nil + } + conn := dre.sourceClient if ret, err := conn.Do("info", "keyspace"); err != nil { return nil, err @@ -265,19 +435,15 @@ func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) { } func (dre *dbRumperExecutor) doFetch(db int) error { - // send 'select' command to both source and target - log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId) - if _, err := dre.sourceClient.Do("select", db); err != nil { - return err + if conf.Options.ScanSpecialCloud != utils.TencentCluster { + // send 'select' command to both source and target + log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId) + if _, err := dre.sourceClient.Do("select", db); err != nil { + return err + } } - // it's ok to send select directly because the message order can be guaranteed. - log.Infof("dbRumper[%v] executor[%v] send target select db", dre.rumperId, dre.executorId) - dre.targetClient.Flush() - if err := dre.targetClient.Send("select", db); err != nil { - return err - } - dre.targetClient.Flush() + // selecting target db is moving into writer log.Infof("dbRumper[%v] executor[%v] start fetching node db[%v]", dre.rumperId, dre.executorId, db) @@ -292,7 +458,7 @@ func (dre *dbRumperExecutor) doFetch(db int) error { if len(keys) != 0 { // pipeline dump for _, key := range keys { - log.Debug("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key) + log.Debugf("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key) dre.sourceClient.Send("DUMP", key) } dumps, err := redis.Strings(dre.sourceClient.Do("")) @@ -309,8 +475,14 @@ func (dre *dbRumperExecutor) doFetch(db int) error { return fmt.Errorf("do ttl with failed[%v]", err) } + dre.stat.rCommands.Add(int64(len(keys))) for i, k := range keys { - dre.keyChan <- &KeyNode{k, dumps[i], pttls[i]} + length := len(dumps[i]) + dre.stat.rBytes.Add(int64(length)) // length of value + dre.stat.minSize = int64(math.Min(float64(dre.stat.minSize), float64(length))) + dre.stat.maxSize = int64(math.Max(float64(dre.stat.maxSize), float64(length))) + dre.stat.sumSize += int64(length) + dre.keyChan <- &KeyNode{k, dumps[i], pttls[i], db} } }