Improve 1.6.7 (#95)

* add metric for rump, haven't tested yet

* debug

* debug2

* debug3

* debug4

* debug5

* debug6

* debug7

* debug8

* debug9

* debug10

* debug11

* debug12

* add qos: limit the qps

* debug12

* debug13

* debug14

* debug15

* debug16

* split big key in rump

* update ChangeLog: 1.6.7

* update ChangeLog: 1.6.7
v4
Vinllen Chen 6 years ago committed by GitHub
parent 5b609f9d3a
commit f3f9b1b147
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      ChangeLog
  2. 4
      conf/redis-shake.conf
  3. 16
      src/redis-shake/common/common.go
  4. 22
      src/redis-shake/common/speed.go
  5. 34
      src/redis-shake/common/split.go
  6. 1
      src/redis-shake/configure/configure.go
  7. 8
      src/redis-shake/main/main.go
  8. 2
      src/redis-shake/metric/variables.go
  9. 238
      src/redis-shake/rump.go

@ -1,3 +1,8 @@
2019-06-13 Alibaba Cloud.
* VERSION: 1.6.7
* IMPROVE: split big key in `rump` mode.
* IMPROVE: add rate transmission mechanism in `rump` mode.
* IMPROVE: add metric in `rump` mode.
2019-06-09 Alibaba Cloud.
* VERSION: 1.6.6
* cherry-pick merge v1.4.4

@ -193,6 +193,10 @@ scan.special_cloud =
# 有些云版本,既不支持sync/psync,也不支持scan,我们支持从文件中进行读取所有key列表并进行抓取:一行一个key。
scan.key_file =
# limit the rate of transmission. Only used in `rump` currently.
# e.g., qps = 1000 means pass 1000 keys per second. default is 500,000(0 means default)
qps = 200000
# ----------------splitter----------------
# below variables are useless for current open source version so don't set.

@ -5,6 +5,8 @@ import (
"fmt"
"net"
"strings"
"reflect"
"unsafe"
"pkg/libs/bytesize"
"redis-shake/configure"
@ -96,3 +98,17 @@ func PickTargetRoundRobin(n int) int {
}()
return TargetRoundRobin
}
func String2Bytes(s string) []byte {
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
bh := reflect.SliceHeader{
Data: sh.Data,
Len: sh.Len,
Cap: sh.Len,
}
return *(*[]byte)(unsafe.Pointer(&bh))
}
func Bytes2String(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

@ -0,0 +1,22 @@
package utils
import "time"
func StartQoS(limit int) chan struct{} {
bucket := make(chan struct{}, limit)
go func() {
for range time.NewTicker(1 * time.Second).C {
for i := 0; i < limit; i++ {
select {
case bucket <- struct{}{}:
default:
// break if bucket if full
break
}
}
}
}()
return bucket
}

@ -0,0 +1,34 @@
package utils
// big key split in rump
import (
"pkg/rdb"
"pkg/libs/log"
redigo "github.com/garyburd/redigo/redis"
)
func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int) {
if _, err := client.Do("select", db); err != nil {
log.Panicf("send select db[%v] failed[%v]", db, err)
}
entry := rdb.BinEntry{
DB: uint32(db),
Key: String2Bytes(key),
Type: 0, // uselss
Value: String2Bytes(value),
ExpireAt: 0, // useless here
RealMemberCount: 0,
NeedReadLen: 1,
IdleTime: 0,
Freq: 0,
}
restoreBigRdbEntry(client, &entry)
// pttl
if _, err := client.Do("pexpire", key, pttl); err != nil {
log.Panicf("send key[%v] pexpire failed[%v]", key, err)
}
}

@ -52,6 +52,7 @@ type Configuration struct {
ScanKeyNumber uint32 `config:"scan.key_number"`
ScanSpecialCloud string `config:"scan.special_cloud"`
ScanKeyFile string `config:"scan.key_file"`
Qps int `config:"qps"`
// inner variables
ReplaceHashTag bool `config:"replace_hash_tag"`

@ -371,11 +371,17 @@ func sanitizeOptions(tp string) error {
conf.Options.SenderCount = defaultSenderCount
}
if conf.Options.SenderDelayChannelSize == 0 {
conf.Options.SenderDelayChannelSize = 32
}
// [0, 100 million]
if conf.Options.Qps < 0 || conf.Options.Qps >= 100000000 {
return fmt.Errorf("qps[%v] should in (0, 100000000]", conf.Options.Qps)
} else if conf.Options.Qps == 0 {
conf.Options.Qps = 500000
}
if tp == conf.TypeRestore || tp == conf.TypeSync || tp == conf.TypeRump {
// get target redis version and set TargetReplace.
for _, address := range conf.Options.TargetAddressList {

@ -30,6 +30,7 @@ type MetricRest struct {
SourceDBOffset interface{} // source redis offset
SourceAddress interface{}
TargetAddress interface{}
Details interface{} // other details info
}
func NewMetricRest() []MetricRest {
@ -76,6 +77,7 @@ func NewMetricRest() []MetricRest {
SourceDBOffset: detailMap["SourceDBOffset"],
SourceAddress: detailMap["SourceAddress"],
TargetAddress: detailMap["TargetAddress"],
Details: detailMap["Details"],
}
}

@ -1,26 +1,48 @@
package run
import (
"pkg/libs/log"
"strconv"
"sync"
"fmt"
"reflect"
"math"
"pkg/libs/log"
"pkg/libs/atomic2"
"redis-shake/common"
"redis-shake/configure"
"redis-shake/scanner"
"redis-shake/metric"
"github.com/garyburd/redigo/redis"
)
type CmdRump struct {
dumpers []*dbRumper
}
func (cr *CmdRump) GetDetailedInfo() interface{} {
return nil
ret := make(map[string]interface{}, len(cr.dumpers))
for _, dumper := range cr.dumpers {
if dumper == nil {
continue
}
ret[dumper.address] = dumper.getStats()
}
// TODO, better to move to the next level
metric.AddMetric(0)
return []map[string]interface{} {
{
"Details": ret,
},
}
}
func (cr *CmdRump) Main() {
cr.dumpers = make([]*dbRumper, len(conf.Options.SourceAddressList))
var wg sync.WaitGroup
wg.Add(len(conf.Options.SourceAddressList))
// build dbRumper
@ -29,6 +51,9 @@ func (cr *CmdRump) Main() {
id: i,
address: address,
}
cr.dumpers[i] = dr
log.Infof("start dbRumper[%v]", i)
go func() {
defer wg.Done()
@ -37,17 +62,33 @@ func (cr *CmdRump) Main() {
}
wg.Wait()
log.Info("all rumpers finish!")
log.Infof("all rumpers finish!, total data: %v", cr.GetDetailedInfo())
}
/*------------------------------------------------------*/
// one rump(1 db or 1 proxy) link corresponding to one dbRumper
type dbRumper struct {
id int // id
address string
address string // source address
client redis.Conn // source client
tencentNodes []string // for tencent cluster only
executors []*dbRumperExecutor
}
func (dr *dbRumper) getStats() map[string]interface{} {
ret := make(map[string]interface{}, len(dr.executors))
for _, exe := range dr.executors {
if exe == nil {
continue
}
id := fmt.Sprintf("%v", exe.executorId)
ret[id] = exe.getStats()
}
return ret
}
func (dr *dbRumper) run() {
@ -63,6 +104,8 @@ func (dr *dbRumper) run() {
log.Infof("dbRumper[%v] get node count: %v", dr.id, count)
dr.executors = make([]*dbRumperExecutor, count)
var wg sync.WaitGroup
wg.Add(count)
for i := 0; i < count; i++ {
@ -80,16 +123,16 @@ func (dr *dbRumper) run() {
tencentNodeId = dr.tencentNodes[i]
}
executor := &dbRumperExecutor{
rumperId: dr.id,
executorId: i,
sourceClient: utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable),
targetClient: utils.OpenRedisConn(target, conf.Options.TargetAuthType,
sourceClient := utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable)
targetClient := utils.OpenRedisConn(target, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw, conf.Options.TargetType == conf.RedisTypeCluster,
conf.Options.TargetTLSEnable),
tencentNodeId: tencentNodeId,
}
conf.Options.TargetTLSEnable)
targetBigKeyClient := utils.OpenRedisConn(target, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw, conf.Options.TargetType == conf.RedisTypeCluster,
conf.Options.TargetTLSEnable)
executor := NewDbRumperExecutor(dr.id, i, sourceClient, targetClient, targetBigKeyClient, tencentNodeId)
dr.executors[i] = executor
go func() {
defer wg.Done()
@ -136,21 +179,82 @@ func (dr *dbRumper) getNode() (int, error) {
type dbRumperExecutor struct {
rumperId int // father id
executorId int // current id, also == aliyun cluster node id
sourceClient redis.Conn
targetClient redis.Conn
sourceClient redis.Conn // source client
targetClient redis.Conn // target client
tencentNodeId string // tencent cluster node id
targetBigKeyClient redis.Conn // target client only used in big key, this is a bit ugly
keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2
resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3
scanner scanner.Scanner // one scanner match one db/proxy
fetcherWg sync.WaitGroup
stat dbRumperExexutorStats
}
func NewDbRumperExecutor(rumperId, executorId int, sourceClient, targetClient, targetBigKeyClient redis.Conn,
tencentNodeId string) *dbRumperExecutor {
executor := &dbRumperExecutor{
rumperId: rumperId,
executorId: executorId,
sourceClient: sourceClient,
targetClient: targetClient,
tencentNodeId: tencentNodeId,
targetBigKeyClient: targetBigKeyClient,
stat: dbRumperExexutorStats{
minSize: 1 << 30,
maxSize: 0,
sumSize: 0,
},
}
return executor
}
type KeyNode struct {
key string
value string
pttl int64
db int
}
type dbRumperExexutorStats struct {
rBytes atomic2.Int64 // read bytes
rCommands atomic2.Int64 // read commands
wBytes atomic2.Int64 // write bytes
wCommands atomic2.Int64 // write commands
cCommands atomic2.Int64 // confirmed commands
minSize int64 // min package size
maxSize int64 // max package size
sumSize int64 // total package size
}
func (dre *dbRumperExecutor) getStats() map[string]interface{} {
kv := make(map[string]interface{})
// stats -> map
v := reflect.ValueOf(dre.stat)
for i := 0; i < v.NumField(); i++ {
f := v.Field(i)
name := v.Type().Field(i).Name
switch f.Kind() {
case reflect.Struct:
// todo
kv[name] = f.Field(0).Int()
// kv[name] = f.Interface()
case reflect.Int64:
if name == "sumSize" {
continue
}
kv[name] = f.Int()
}
}
kv["keyChan"] = len(dre.keyChan)
kv["resultChan"] = len(dre.resultChan)
kv["avgSize"] = float64(dre.stat.sumSize) / float64(dre.stat.rCommands.Get())
return kv
}
func (dre *dbRumperExecutor) exec() {
@ -195,7 +299,7 @@ func (dre *dbRumperExecutor) fetcher() {
}
log.Infof("dbRumper[%v] executor[%v] fetch db list: %v", dre.rumperId, dre.executorId, dbNumber)
// iterate all db
// iterate all db nodes
for _, db := range dbNumber {
log.Infof("dbRumper[%v] executor[%v] fetch logical db: %v", dre.rumperId, dre.executorId, db)
if err := dre.doFetch(int(db)); err != nil {
@ -208,7 +312,17 @@ func (dre *dbRumperExecutor) fetcher() {
func (dre *dbRumperExecutor) writer() {
var count uint32
var wBytes int64
var err error
batch := make([]*KeyNode, 0, conf.Options.ScanKeyNumber)
// used in QoS
bucket := utils.StartQoS(conf.Options.Qps)
preDb := 0
for ele := range dre.keyChan {
// QoS, limit the qps
<-bucket
if ele.pttl == -1 { // not set ttl
ele.pttl = 0
}
@ -217,37 +331,93 @@ func (dre *dbRumperExecutor) writer() {
continue
}
// TODO, big key split
log.Debugf("dbRumper[%v] executor[%v] restore %s", dre.rumperId, dre.executorId, ele.key)
log.Debugf("dbRumper[%v] executor[%v] restore[%s], length[%v]", dre.rumperId, dre.executorId, ele.key,
len(ele.value))
if uint64(len(ele.value)) >= conf.Options.BigKeyThreshold {
log.Infof("dbRumper[%v] executor[%v] restore big key[%v] with length[%v]", dre.rumperId,
dre.executorId, ele.key, len(ele.value))
// flush previous cache
batch = dre.writeSend(batch, &count, &wBytes)
// handle big key
utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db)
continue
}
// send "select" command if db is different
if ele.db != preDb {
dre.targetClient.Send("select", ele.db)
preDb = ele.db
}
if conf.Options.Rewrite {
dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE")
err = dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE")
} else {
dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value)
err = dre.targetClient.Send("RESTORE", ele.key, ele.pttl, ele.value)
}
if err != nil {
log.Panicf("dbRumper[%v] executor[%v] send key[%v] failed[%v]", dre.rumperId, dre.executorId,
ele.key, err)
}
dre.resultChan <- ele
wBytes += int64(len(ele.value))
batch = append(batch, ele)
// move to real send
// dre.resultChan <- ele
count++
if count == conf.Options.ScanKeyNumber {
// batch
log.Debugf("dbRumper[%v] executor[%v] send keys %d", dre.rumperId, dre.executorId, count)
dre.targetClient.Flush()
count = 0
batch = dre.writeSend(batch, &count, &wBytes)
}
}
dre.targetClient.Flush()
dre.writeSend(batch, &count, &wBytes)
close(dre.resultChan)
}
func (dre *dbRumperExecutor) writeSend(batch []*KeyNode, count *uint32, wBytes *int64) []*KeyNode {
newBatch := make([]*KeyNode, 0, conf.Options.ScanKeyNumber)
if len(batch) == 0 {
return newBatch
}
if err := dre.targetClient.Flush(); err != nil {
log.Panicf("dbRumper[%v] executor[%v] flush failed[%v]", dre.rumperId, dre.executorId, err)
}
// real send
for _, ele := range batch {
dre.resultChan <- ele
}
dre.stat.wCommands.Add(int64(*count))
dre.stat.wBytes.Add(*wBytes)
*count = 0
*wBytes = 0
return newBatch
}
func (dre *dbRumperExecutor) receiver() {
for ele := range dre.resultChan {
if _, err := dre.targetClient.Receive(); err != nil && err != redis.ErrNil {
log.Panicf("dbRumper[%v] executor[%v] restore key[%v] with pttl[%v] error[%v]", dre.rumperId,
dre.executorId, ele.key, strconv.FormatInt(ele.pttl, 10), err)
}
dre.stat.cCommands.Incr()
}
}
func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) {
// tencent cluster only has 1 logical db
if conf.Options.ScanSpecialCloud == utils.TencentCluster {
return []int32{0}, nil
}
conn := dre.sourceClient
if ret, err := conn.Do("info", "keyspace"); err != nil {
return nil, err
@ -265,19 +435,15 @@ func (dre *dbRumperExecutor) getSourceDbList() ([]int32, error) {
}
func (dre *dbRumperExecutor) doFetch(db int) error {
if conf.Options.ScanSpecialCloud != utils.TencentCluster {
// send 'select' command to both source and target
log.Infof("dbRumper[%v] executor[%v] send source select db", dre.rumperId, dre.executorId)
if _, err := dre.sourceClient.Do("select", db); err != nil {
return err
}
// it's ok to send select directly because the message order can be guaranteed.
log.Infof("dbRumper[%v] executor[%v] send target select db", dre.rumperId, dre.executorId)
dre.targetClient.Flush()
if err := dre.targetClient.Send("select", db); err != nil {
return err
}
dre.targetClient.Flush()
// selecting target db is moving into writer
log.Infof("dbRumper[%v] executor[%v] start fetching node db[%v]", dre.rumperId, dre.executorId, db)
@ -292,7 +458,7 @@ func (dre *dbRumperExecutor) doFetch(db int) error {
if len(keys) != 0 {
// pipeline dump
for _, key := range keys {
log.Debug("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key)
log.Debugf("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key)
dre.sourceClient.Send("DUMP", key)
}
dumps, err := redis.Strings(dre.sourceClient.Do(""))
@ -309,8 +475,14 @@ func (dre *dbRumperExecutor) doFetch(db int) error {
return fmt.Errorf("do ttl with failed[%v]", err)
}
dre.stat.rCommands.Add(int64(len(keys)))
for i, k := range keys {
dre.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
length := len(dumps[i])
dre.stat.rBytes.Add(int64(length)) // length of value
dre.stat.minSize = int64(math.Min(float64(dre.stat.minSize), float64(length)))
dre.stat.maxSize = int64(math.Max(float64(dre.stat.maxSize), float64(length)))
dre.stat.sumSize += int64(length)
dre.keyChan <- &KeyNode{k, dumps[i], pttls[i], db}
}
}

Loading…
Cancel
Save