Merge pull request #146 from alibaba/bugfix-1.6.17

Bugfix 1.6.17
v4
Vinllen Chen 5 years ago committed by GitHub
commit 3ab573ad3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .gitignore
  2. 8
      ChangeLog
  3. 3
      README.md
  4. 34
      conf/redis-shake.conf
  5. 6
      src/pkg/rdb/loader.go
  6. 4
      src/pkg/rdb/reader.go
  7. 2
      src/redis-shake/common/cluster.go
  8. 5
      src/redis-shake/common/command.go
  9. 2
      src/redis-shake/common/utils.go
  10. 108
      src/redis-shake/configure/configure.go
  11. 9
      src/redis-shake/main/main.go
  12. 13
      src/redis-shake/rump.go

1
.gitignore vendored

@ -15,6 +15,7 @@ result.db.*
bin/*
conf/*
!conf/redis-shake.conf
!.circleci/config.yml
dump.data
runtime.trace

@ -1,3 +1,11 @@
2019-08-27 Alibaba Cloud.
* VERSION: 1.6.17
* BUGFIX: transaction syncing panic when target redis is cluster. see
#145.
* IMPROVE: adjust RecvChanSize based on `sender.count` or `scan.key_number`
if target redis type is cluster.
* IMPROVE: remove some variables in conf like `heartbeat`, `ncpu`.
* IMPROVE: print inner error message from redigo driver return message.
2019-08-09 Alibaba Cloud.
* VERSION: 1.6.16
* BUGFIX: big key in `rump` mode all expired.

@ -17,7 +17,7 @@ The type can be one of the followings:<br>
* **decode**: Decode dumped payload to human readable format (hex-encoding).
* **restore**: Restore RDB file to target redis.
* **dump**: Dump RDB file from souce redis.
* **dump**: Dump RDB file from source redis.
* **sync**: Sync data from source redis to target redis by `sync` or `psync` command. Including full synchronization and incremental synchronization.
* **rump**: Sync data from source redis to target redis by `scan` command. Only support full synchronization. Plus, RedisShake also supports fetching data from given keys in the input file when `scan` command is not supported on the source side. This mode is usually used when `sync` and `psync` redis commands aren't supported.
@ -99,3 +99,4 @@ Plus, we have a WeChat group so that users can join and discuss, but the group u
| :------: | :------: |
| ceshihao | davidzheng23@gmail.com |
| wangyiyang | wangyiyang.kk@gmail.com |
| muicoder | muicoder@gmail.com |

@ -19,9 +19,6 @@ system_profile = 9310
# restful port,查看metric端口, -1表示不启用,如果是`restore`模式,只有设置为-1才会在完成RDB恢复后退出,否则会一直block。
http_profile = 9320
# runtime.GOMAXPROCS, 0 means use cpu core number: runtime.NumCPU()
ncpu = 0
# parallel routines number used in RDB file syncing. default is 64.
# 启动多少个并发线程同步一个RDB文件。
parallel = 32
@ -41,7 +38,8 @@ source.type = standalone
# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type.
# 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. e.g., 10.1.1.1:20331;10.1.1.2:20441.
# 4. proxy address(used in "rump" mode only). for "proxy" type.
# 源redis地址。对于sentinel模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址"
# 源redis地址。对于sentinel或者开源cluster模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址",别的cluster
# 架构,比如codis, twemproxy, aliyun proxy等需要配置所有master或者slave的db地址。
source.address = 127.0.0.1:20441
# password of db/proxy. even if type is sentinel.
source.password_raw = 123456
@ -165,31 +163,16 @@ metric = true
# 是否将metric打印到log中
metric.print_log = false
# heartbeat
# send heartbeat to this url
# used in `sync`.
# 心跳的url地址,redis-shake将会发送到这个地址
#heartbeat.url = http://127.0.0.1:8000
heartbeat.url =
# interval by seconds
# 心跳保活周期
heartbeat.interval = 3
# external info which will be included in heartbeat data.
# 在心跳报文中添加额外的信息
heartbeat.external = test external
# local network card to get ip address, e.g., "lo", "eth0", "en0"
# 获取ip的网卡
heartbeat.network_interface =
# sender information.
# sender flush buffer size of byte.
# used in `sync`.
# 发送缓存的字节长度,超过这个阈值将会强行刷缓存发送
sender.size = 104857600
# sender flush buffer size of oplog number.
# used in `sync`.
# 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送
sender.count = 5000
# used in `sync`. flush sender buffer when bigger than this threshold.
# 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送,对于目的端是cluster的情况,这个值
# 的调大将会占用部分内存。
sender.count = 4096
# delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also
# stored in this delay queue. this timestamp will be used to calculate the time delay when receiving
# ack from target redis.
@ -207,13 +190,11 @@ keep_alive = 0
# number of keys captured each time. default is 100.
# 每次scan的个数,不配置则默认100.
scan.key_number = 50
# used in `rump`.
# we support some special redis types that don't use default `scan` command like alibaba cloud and tencent cloud.
# 有些版本具有特殊的格式,与普通的scan命令有所不同,我们进行了特殊的适配。目前支持腾讯云的集群版"tencent_cluster"
# 和阿里云的集群版"aliyun_cluster"。
scan.special_cloud =
# used in `rump`.
# we support to fetching data from given file which marks the key list.
# 有些云版本,既不支持sync/psync,也不支持scan,我们支持从文件中进行读取所有key列表并进行抓取:一行一个key。
@ -229,6 +210,3 @@ qps = 200000
# replace hash tag.
# used in `sync`.
replace_hash_tag = false
# used in `restore` and `dump`.
extra = false

@ -196,9 +196,9 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) {
} else {
key = l.lastEntry.Key
}
// log.Infof("l %p r %p", l, l.rdbReader)
// log.Info("remainMember:", l.remainMember, " key:", string(key[:]), " type:", t)
// log.Info("r.remainMember:", l.rdbReader.remainMember)
//log.Debugf("l %p r %p", l, l.rdbReader)
//log.Debug("remainMember:", l.remainMember, " key:", string(key[:]), " type:", t)
//log.Debug("r.remainMember:", l.rdbReader.remainMember)
val, err := l.readObjectValue(t, l)
if err != nil {
return nil, err

@ -13,7 +13,7 @@ import (
"strconv"
"pkg/libs/errors"
// "libs/log"
)
var FromVersion int64 = 9
@ -144,10 +144,12 @@ func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) {
if n, err := r.ReadLength(); err != nil {
return nil, err
} else {
// log.Debug("zset length: ", n)
for i := 0; i < int(n); i++ {
if _, err := r.ReadString(); err != nil {
return nil, err
}
// log.Debug("zset read: ", i)
if t == RdbTypeZSet2 {
if _, err := r.ReadDouble(); err != nil {
return nil, err

@ -7,7 +7,7 @@ import (
"pkg/libs/log"
)
const(
var (
RecvChanSize = 4096
)

@ -10,6 +10,11 @@ import (
"strings"
)
const (
ReplayString = "string"
ReplayInt64s = "int64s"
)
type ClusterNodeInfo struct {
Id string
Address string

@ -811,6 +811,8 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
params = append(params, "FREQ")
params = append(params, e.Freq)
}
log.Debugf("restore key[%s] with params[%v]", e.Key, params)
// fmt.Printf("key: %v, value: %v params: %v\n", string(e.Key), e.Value, params)
// s, err := redigo.String(c.Do("restore", params...))
s, err := redigoCluster.String(c.Do("restore", params...))

@ -4,65 +4,65 @@ import "time"
type Configuration struct {
// config file variables
Id string `config:"id"`
LogFile string `config:"log.file"`
LogLevel string `config:"log.level"`
SystemProfile int `config:"system_profile"`
HttpProfile int `config:"http_profile"`
Id string `config:"id"`
LogFile string `config:"log.file"`
LogLevel string `config:"log.level"`
SystemProfile int `config:"system_profile"`
HttpProfile int `config:"http_profile"`
Parallel int `config:"parallel"`
SourceType string `config:"source.type"`
SourceAddress string `config:"source.address"`
SourcePasswordRaw string `config:"source.password_raw"`
SourcePasswordEncoding string `config:"source.password_encoding"`
SourceAuthType string `config:"source.auth_type"`
SourceTLSEnable bool `config:"source.tls_enable"`
SourceRdbInput []string `config:"source.rdb.input"`
SourceRdbParallel int `config:"source.rdb.parallel"`
SourceRdbSpecialCloud string `config:"source.rdb.special_cloud"`
TargetAddress string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"`
TargetDBString string `config:"target.db"`
TargetAuthType string `config:"target.auth_type"`
TargetType string `config:"target.type"`
TargetTLSEnable bool `config:"target.tls_enable"`
TargetRdbOutput string `config:"target.rdb.output"`
TargetVersion string `config:"target.version"`
FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"`
FilterDBWhitelist []string `config:"filter.db.whitelist"`
FilterDBBlacklist []string `config:"filter.db.blacklist"`
FilterKeyWhitelist []string `config:"filter.key.whitelist"`
FilterKeyBlacklist []string `config:"filter.key.blacklist"`
FilterSlot []string `config:"filter.slot"`
FilterLua bool `config:"filter.lua"`
BigKeyThreshold uint64 `config:"big_key_threshold"`
Psync bool `config:"psync"`
Metric bool `config:"metric"`
MetricPrintLog bool `config:"metric.print_log"`
SenderSize uint64 `config:"sender.size"`
SenderCount uint `config:"sender.count"`
SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
KeepAlive uint `config:"keep_alive"`
PidPath string `config:"pid_path"`
ScanKeyNumber uint32 `config:"scan.key_number"`
ScanSpecialCloud string `config:"scan.special_cloud"`
ScanKeyFile string `config:"scan.key_file"`
Qps int `config:"qps"`
/*---------------------------------------------------------*/
// inner variables
NCpu int `config:"ncpu"`
Parallel int `config:"parallel"`
SourceType string `config:"source.type"`
SourceAddress string `config:"source.address"`
SourcePasswordRaw string `config:"source.password_raw"`
SourcePasswordEncoding string `config:"source.password_encoding"`
SourceAuthType string `config:"source.auth_type"`
SourceTLSEnable bool `config:"source.tls_enable"`
SourceRdbInput []string `config:"source.rdb.input"`
SourceRdbParallel int `config:"source.rdb.parallel"`
SourceRdbSpecialCloud string `config:"source.rdb.special_cloud"`
TargetAddress string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"`
TargetDBString string `config:"target.db"`
TargetAuthType string `config:"target.auth_type"`
TargetType string `config:"target.type"`
TargetTLSEnable bool `config:"target.tls_enable"`
TargetRdbOutput string `config:"target.rdb.output"`
TargetVersion string `config:"target.version"`
FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"`
FilterDBWhitelist []string `config:"filter.db.whitelist"`
FilterDBBlacklist []string `config:"filter.db.blacklist"`
FilterKeyWhitelist []string `config:"filter.key.whitelist"`
FilterKeyBlacklist []string `config:"filter.key.blacklist"`
FilterSlot []string `config:"filter.slot"`
FilterLua bool `config:"filter.lua"`
BigKeyThreshold uint64 `config:"big_key_threshold"`
Psync bool `config:"psync"`
Metric bool `config:"metric"`
MetricPrintLog bool `config:"metric.print_log"`
HeartbeatUrl string `config:"heartbeat.url"`
HeartbeatInterval uint `config:"heartbeat.interval"`
HeartbeatExternal string `config:"heartbeat.external"`
HeartbeatNetworkInterface string `config:"heartbeat.network_interface"`
SenderSize uint64 `config:"sender.size"`
SenderCount uint `config:"sender.count"`
SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
KeepAlive uint `config:"keep_alive"`
PidPath string `config:"pid_path"`
ScanKeyNumber uint32 `config:"scan.key_number"`
ScanSpecialCloud string `config:"scan.special_cloud"`
ScanKeyFile string `config:"scan.key_file"`
Qps int `config:"qps"`
/*---------------------------------------------------------*/
// inner variables
ReplaceHashTag bool `config:"replace_hash_tag"`
ExtraInfo bool `config:"extra"`
SockFileName string `config:"sock.file_name"`
SockFileSize uint `config:"sock.file_size"`
FilterKey []string `config:"filter.key"` // compatible with older versions
FilterDB string `config:"filter.db"` // compatible with older versions
ReplaceHashTag bool `config:"replace_hash_tag"`
ExtraInfo bool `config:"extra"`
SockFileName string `config:"sock.file_name"`
SockFileSize uint `config:"sock.file_size"`
FilterKey []string `config:"filter.key"` // compatible with older versions
FilterDB string `config:"filter.db"` // compatible with older versions
/*---------------------------------------------------------*/
// generated variables

@ -396,6 +396,10 @@ func sanitizeOptions(tp string) error {
// set to default when not set
conf.Options.SenderCount = defaultSenderCount
}
if conf.Options.TargetType == conf.RedisTypeCluster && int(conf.Options.SenderCount) > utils.RecvChanSize {
log.Infof("RecvChanSize is modified from [%v] to [%v]", utils.RecvChanSize, int(conf.Options.SenderCount))
utils.RecvChanSize = int(conf.Options.SenderCount)
}
if conf.Options.SenderDelayChannelSize == 0 {
conf.Options.SenderDelayChannelSize = 32
@ -448,8 +452,9 @@ func sanitizeOptions(tp string) error {
conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile)
}
if conf.Options.ScanKeyNumber > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster {
return fmt.Errorf("scan.key_number should less than [%v] when target type is cluster", utils.RecvChanSize)
if int(conf.Options.ScanKeyNumber) > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster {
log.Infof("RecvChanSize is modified from [%v] to [%v]", utils.RecvChanSize, int(conf.Options.ScanKeyNumber))
utils.RecvChanSize = int(conf.Options.ScanKeyNumber)
}
//if len(conf.Options.SourceAddressList) == 1 {

@ -13,9 +13,9 @@ import (
"redis-shake/configure"
"redis-shake/metric"
"redis-shake/scanner"
"redis-shake/filter"
"github.com/garyburd/redigo/redis"
"redis-shake/filter"
)
type CmdRump struct {
@ -483,18 +483,21 @@ func (dre *dbRumperExecutor) doFetch(db int) error {
log.Debugf("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key)
dre.sourceClient.Send("DUMP", key)
}
dumps, err := redis.Strings(dre.sourceClient.Do(""))
reply, err := dre.sourceClient.Do("")
dumps, err := redis.Strings(reply, err)
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do dump with failed[%v]", err)
return fmt.Errorf("do dump with failed[%v], reply[%v]", err, reply)
}
// pipeline ttl
for _, key := range keys {
dre.sourceClient.Send("PTTL", key)
}
pttls, err := redis.Int64s(dre.sourceClient.Do(""))
reply, err = dre.sourceClient.Do("")
pttls, err := redis.Int64s(reply, err)
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do ttl with failed[%v]", err)
return fmt.Errorf("do ttl with failed[%v], reply[%v]", err, reply)
}
dre.stat.rCommands.Add(int64(len(keys)))

Loading…
Cancel
Save