Merge pull request #92 from alibaba/feature-1.6

Feature 1.6
v4
Vinllen Chen 6 years ago committed by GitHub
commit d961fd0d33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      ChangeLog
  2. 9
      src/redis-shake/command/redis-command.go
  3. 8
      src/redis-shake/common/common.go
  4. 4
      src/redis-shake/common/utils.go
  5. 3
      src/redis-shake/configure/configure.go
  6. 27
      src/redis-shake/main/main.go
  7. 17
      src/redis-shake/sync.go

@ -1,6 +1,16 @@
2019-06-09 Alibaba Cloud.
* VERSION: 1.6.6
* cherry-pick merge v1.4.4
* BUGFIX: delete single command failed when filter key given.
2019-06-06 Alibaba Cloud. 2019-06-06 Alibaba Cloud.
* VERSION: 1.6.5 * VERSION: 1.6.5
* IMPROVE: run rump in parallel to support several db nodes behind proxy. * IMPROVE: run rump in parallel to support several db nodes behind proxy.
* BUGFIX: run rump panic when the source is proxy with more than 1 db.
2019-06-05 Alibaba Cloud.
* VERSION: 1.4.4
* BUGFIX: modify the ttl from millisecond to second in restore when
overpass big key threshold.
* IMPROVE: set some default values in configuration.
2019-05-30 Alibaba Cloud. 2019-05-30 Alibaba Cloud.
* VERSION: 1.6.4 * VERSION: 1.6.4
* BUGFIX: fix bug of `GetDetailedInfo` panic. * BUGFIX: fix bug of `GetDetailedInfo` panic.

@ -17,7 +17,7 @@ var RedisCommands = map[string]redisCommand{
"setex": {nil, 1, 1, 1}, "setex": {nil, 1, 1, 1},
"psetex": {nil, 1, 1, 1}, "psetex": {nil, 1, 1, 1},
"append": {nil, 1, 1, 1}, "append": {nil, 1, 1, 1},
"del": {nil, 1, -1, 1}, "del": {nil, 1, 0, 1},
"unlink": {nil, 1, -1, 1}, "unlink": {nil, 1, -1, 1},
"setbit": {nil, 1, 1, 1}, "setbit": {nil, 1, 1, 1},
"bitfield": {nil, 1, 1, 1}, "bitfield": {nil, 1, 1, 1},
@ -86,7 +86,7 @@ var RedisCommands = map[string]redisCommand{
"pfmerge": {nil, 1, -1, 1}, "pfmerge": {nil, 1, -1, 1},
} }
func GetMatchKeys(redis_cmd redisCommand, args [][]byte, filterkeys []string) (new_args [][]byte, ret bool) { func GetMatchKeys(redis_cmd redisCommand, args [][]byte, filterkeys []string) (new_args [][]byte, pass bool) {
lastkey := redis_cmd.lastkey - 1 lastkey := redis_cmd.lastkey - 1
keystep := redis_cmd.keystep keystep := redis_cmd.keystep
@ -107,10 +107,10 @@ func GetMatchKeys(redis_cmd redisCommand, args [][]byte, filterkeys []string) (n
} }
} }
ret = false pass = false
new_args = make([][]byte, number*redis_cmd.keystep+len(args)-lastkey-redis_cmd.keystep) new_args = make([][]byte, number*redis_cmd.keystep+len(args)-lastkey-redis_cmd.keystep)
if number > 0 { if number > 0 {
ret = true pass = true
for i := 0; i < number; i++ { for i := 0; i < number; i++ {
for j := 0; j < redis_cmd.keystep; j++ { for j := 0; j < redis_cmd.keystep; j++ {
new_args[i*redis_cmd.keystep+j] = args[array[i]+j] new_args[i*redis_cmd.keystep+j] = args[array[i]+j]
@ -124,5 +124,6 @@ func GetMatchKeys(redis_cmd redisCommand, args [][]byte, filterkeys []string) (n
new_args[number*redis_cmd.keystep+j] = args[i] new_args[number*redis_cmd.keystep+j] = args[i]
j = j + 1 j = j + 1
} }
return return
} }

@ -36,6 +36,14 @@ var (
TargetRoundRobin int TargetRoundRobin int
) )
const (
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
TB = 1024 * GB
PB = 1024 * TB
)
// read until hit the end of RESP: "\r\n" // read until hit the end of RESP: "\r\n"
func ReadRESPEnd(c net.Conn) (string, error) { func ReadRESPEnd(c net.Conn) (string, error) {
var ret string var ret string

@ -723,7 +723,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
} }
restoreQuicklistEntry(c, e) restoreQuicklistEntry(c, e)
if e.ExpireAt != 0 { if e.ExpireAt != 0 {
r, err := redigo.Int64(c.Do("expire", e.Key, ttlms)) r, err := redigo.Int64(c.Do("pexpire", e.Key, ttlms))
if err != nil && r != 1 { if err != nil && r != 1 {
log.Panicf("expire ", string(e.Key), err) log.Panicf("expire ", string(e.Key), err)
} }
@ -755,7 +755,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
} }
restoreBigRdbEntry(c, e) restoreBigRdbEntry(c, e)
if e.ExpireAt != 0 { if e.ExpireAt != 0 {
r, err := redigo.Int64(c.Do("expire", e.Key, ttlms)) r, err := redigo.Int64(c.Do("pexpire", e.Key, ttlms))
if err != nil && r != 1 { if err != nil && r != 1 {
log.Panicf("expire ", string(e.Key), err) log.Panicf("expire ", string(e.Key), err)
} }

@ -23,7 +23,7 @@ type Configuration struct {
TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"` TargetPasswordEncoding string `config:"target.password_encoding"`
TargetVersion uint `config:"target.version"` TargetVersion uint `config:"target.version"`
TargetDB int `config:"target.db"` TargetDBString string `config:"target.db"`
TargetAuthType string `config:"target.auth_type"` TargetAuthType string `config:"target.auth_type"`
TargetType string `config:"target.type"` TargetType string `config:"target.type"`
TargetTLSEnable bool `config:"target.tls_enable"` TargetTLSEnable bool `config:"target.tls_enable"`
@ -67,6 +67,7 @@ type Configuration struct {
ShiftTime time.Duration // shift ShiftTime time.Duration // shift
TargetRedisVersion string // to_redis_version TargetRedisVersion string // to_redis_version
TargetReplace bool // to_replace TargetReplace bool // to_replace
TargetDB int // int type
} }
var Options Configuration var Options Configuration

@ -181,8 +181,11 @@ func sanitizeOptions(tp string) error {
conf.Options.Parallel = int(math.Max(float64(conf.Options.Parallel), float64(conf.Options.NCpu))) conf.Options.Parallel = int(math.Max(float64(conf.Options.Parallel), float64(conf.Options.NCpu)))
} }
if conf.Options.BigKeyThreshold > 524288000 { // 500 M
return fmt.Errorf("BigKeyThreshold[%v] should <= 524288000", conf.Options.BigKeyThreshold) if conf.Options.BigKeyThreshold > 500 * utils.MB {
return fmt.Errorf("BigKeyThreshold[%v] should <= 500 MB", conf.Options.BigKeyThreshold)
} else if conf.Options.BigKeyThreshold == 0 {
conf.Options.BigKeyThreshold = 50 * utils.MB
} }
// source password // source password
@ -266,7 +269,10 @@ func sanitizeOptions(tp string) error {
// heartbeat, 86400 = 1 day // heartbeat, 86400 = 1 day
if conf.Options.HeartbeatInterval > 86400 { if conf.Options.HeartbeatInterval > 86400 {
return fmt.Errorf("HeartbeatInterval[%v] should in [0, 86400]", conf.Options.HeartbeatInterval) return fmt.Errorf("HeartbeatInterval[%v] should in [0, 86400]", conf.Options.HeartbeatInterval)
} else if conf.Options.HeartbeatInterval == 0 {
conf.Options.HeartbeatInterval = 10
} }
if conf.Options.HeartbeatNetworkInterface == "" { if conf.Options.HeartbeatNetworkInterface == "" {
conf.Options.HeartbeatIp = "127.0.0.1" conf.Options.HeartbeatIp = "127.0.0.1"
} else { } else {
@ -325,8 +331,14 @@ func sanitizeOptions(tp string) error {
} }
} }
if conf.Options.TargetDB >= 0 { if conf.Options.TargetDBString == "" {
// pass, >= 0 means enable conf.Options.TargetDB = -1
} else if v, err := strconv.Atoi(conf.Options.TargetDBString); err != nil {
return fmt.Errorf("parse target.db[%v] failed[%v]", conf.Options.TargetDBString, err)
} else if v < 0 {
conf.Options.TargetDB = -1
} else {
conf.Options.TargetDB = v
} }
if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 { if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 {
@ -357,7 +369,12 @@ func sanitizeOptions(tp string) error {
conf.Options.SenderCount = defaultSenderCount conf.Options.SenderCount = defaultSenderCount
} }
if tp == conf.TypeRestore || tp == conf.TypeSync {
if conf.Options.SenderDelayChannelSize == 0 {
conf.Options.SenderDelayChannelSize = 32
}
if tp == conf.TypeRestore || tp == conf.TypeSync || tp == conf.TypeRump {
// get target redis version and set TargetReplace. // get target redis version and set TargetReplace.
for _, address := range conf.Options.TargetAddressList { for _, address := range conf.Options.TargetAddressList {
// single connection even if the target is cluster // single connection even if the target is cluster

@ -531,7 +531,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
id := recvId.Get() // receive id id := recvId.Get() // receive id
// print debug log of receive reply // print debug log of receive reply
log.Debugf("receive reply[%v]: [%v], error: [%v]", id, reply, err) log.Debugf("receive reply-id[%v]: [%v], error:[%v]", id, reply, err)
if conf.Options.Metric == false { if conf.Options.Metric == false {
continue continue
@ -623,25 +623,28 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
ds.nbypass.Incr() ds.nbypass.Incr()
// ds.SyncStat.BypassCmdCount.Incr() // ds.SyncStat.BypassCmdCount.Incr()
metric.GetMetric(ds.id).AddBypassCmdCount(1) metric.GetMetric(ds.id).AddBypassCmdCount(1)
log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd)
continue continue
} }
} }
is_filter := false pass := false
if len(conf.Options.FilterKey) != 0 { if len(conf.Options.FilterKey) != 0 {
ds, ok := command.RedisCommands[scmd] cmdNode, ok := command.RedisCommands[scmd]
if ok && len(argv) > 0 { if ok && len(argv) > 0 {
new_argv, is_filter = command.GetMatchKeys(ds, argv, conf.Options.FilterKey) log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd)
new_argv, pass = command.GetMatchKeys(cmdNode, argv, conf.Options.FilterKey)
} else { } else {
is_filter = true pass = true
new_argv = argv new_argv = argv
} }
} else { } else {
is_filter = true pass = true
new_argv = argv new_argv = argv
} }
if bypass || ignorecmd || !is_filter { if bypass || ignorecmd || !pass {
ds.nbypass.Incr() ds.nbypass.Incr()
log.Debugf("dbSyncer[%v] filter command[%v]", ds.id, scmd)
continue continue
} }
} }

Loading…
Cancel
Save