diff --git a/.gitignore b/.gitignore index 40c2ac9..0d3ceb7 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ result.db.* bin/* conf/* !conf/redis-shake.conf +!.circleci/config.yml dump.data runtime.trace diff --git a/ChangeLog b/ChangeLog index 68fe4a1..cf892b7 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +2019-08-27 Alibaba Cloud. + * VERSION: 1.6.17 + * BUGFIX: transaction syncing panic when target redis is cluster. see + #145. + * IMPROVE: adjust RecvChanSize based on `sender.count` or `scan.key_number` + if target redis type is cluster. + * IMPROVE: remove some variables in conf like `heartbeat`, `ncpu`. + * IMPROVE: print inner error message from redigo driver return message. 2019-08-09 Alibaba Cloud. * VERSION: 1.6.16 * BUGFIX: big key in `rump` mode all expired. diff --git a/README.md b/README.md index eebe86b..d7cf427 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ The type can be one of the followings:
* **decode**: Decode dumped payload to human readable format (hex-encoding). * **restore**: Restore RDB file to target redis. -* **dump**: Dump RDB file from souce redis. +* **dump**: Dump RDB file from source redis. * **sync**: Sync data from source redis to target redis by `sync` or `psync` command. Including full synchronization and incremental synchronization. * **rump**: Sync data from source redis to target redis by `scan` command. Only support full synchronization. Plus, RedisShake also supports fetching data from given keys in the input file when `scan` command is not supported on the source side. This mode is usually used when `sync` and `psync` redis commands aren't supported. @@ -99,3 +99,4 @@ Plus, we have a WeChat group so that users can join and discuss, but the group u | :------: | :------: | | ceshihao | davidzheng23@gmail.com | | wangyiyang | wangyiyang.kk@gmail.com | +| muicoder | muicoder@gmail.com | diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 08b30dc..9a7f76e 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -19,9 +19,6 @@ system_profile = 9310 # restful port,查看metric端口, -1表示不启用,如果是`restore`模式,只有设置为-1才会在完成RDB恢复后退出,否则会一直block。 http_profile = 9320 -# runtime.GOMAXPROCS, 0 means use cpu core number: runtime.NumCPU() -ncpu = 0 - # parallel routines number used in RDB file syncing. default is 64. # 启动多少个并发线程同步一个RDB文件。 parallel = 32 @@ -41,7 +38,8 @@ source.type = standalone # 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. # 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. e.g., 10.1.1.1:20331;10.1.1.2:20441. # 4. proxy address(used in "rump" mode only). for "proxy" type. -# 源redis地址。对于sentinel模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址" +# 源redis地址。对于sentinel或者开源cluster模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址",别的cluster +# 架构,比如codis, twemproxy, aliyun proxy等需要配置所有master或者slave的db地址。 source.address = 127.0.0.1:20441 # password of db/proxy. even if type is sentinel. source.password_raw = 123456 @@ -165,31 +163,16 @@ metric = true # 是否将metric打印到log中 metric.print_log = false -# heartbeat -# send heartbeat to this url -# used in `sync`. -# 心跳的url地址,redis-shake将会发送到这个地址 -#heartbeat.url = http://127.0.0.1:8000 -heartbeat.url = -# interval by seconds -# 心跳保活周期 -heartbeat.interval = 3 -# external info which will be included in heartbeat data. -# 在心跳报文中添加额外的信息 -heartbeat.external = test external -# local network card to get ip address, e.g., "lo", "eth0", "en0" -# 获取ip的网卡 -heartbeat.network_interface = - # sender information. # sender flush buffer size of byte. # used in `sync`. # 发送缓存的字节长度,超过这个阈值将会强行刷缓存发送 sender.size = 104857600 # sender flush buffer size of oplog number. -# used in `sync`. -# 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送 -sender.count = 5000 +# used in `sync`. flush sender buffer when bigger than this threshold. +# 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送,对于目的端是cluster的情况,这个值 +# 的调大将会占用部分内存。 +sender.count = 4096 # delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also # stored in this delay queue. this timestamp will be used to calculate the time delay when receiving # ack from target redis. @@ -207,13 +190,11 @@ keep_alive = 0 # number of keys captured each time. default is 100. # 每次scan的个数,不配置则默认100. scan.key_number = 50 - # used in `rump`. # we support some special redis types that don't use default `scan` command like alibaba cloud and tencent cloud. # 有些版本具有特殊的格式,与普通的scan命令有所不同,我们进行了特殊的适配。目前支持腾讯云的集群版"tencent_cluster" # 和阿里云的集群版"aliyun_cluster"。 scan.special_cloud = - # used in `rump`. # we support to fetching data from given file which marks the key list. # 有些云版本,既不支持sync/psync,也不支持scan,我们支持从文件中进行读取所有key列表并进行抓取:一行一个key。 @@ -229,6 +210,3 @@ qps = 200000 # replace hash tag. # used in `sync`. replace_hash_tag = false - -# used in `restore` and `dump`. -extra = false diff --git a/src/pkg/rdb/loader.go b/src/pkg/rdb/loader.go index c7ec8d4..a591ac0 100644 --- a/src/pkg/rdb/loader.go +++ b/src/pkg/rdb/loader.go @@ -196,9 +196,9 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) { } else { key = l.lastEntry.Key } - // log.Infof("l %p r %p", l, l.rdbReader) - // log.Info("remainMember:", l.remainMember, " key:", string(key[:]), " type:", t) - // log.Info("r.remainMember:", l.rdbReader.remainMember) + //log.Debugf("l %p r %p", l, l.rdbReader) + //log.Debug("remainMember:", l.remainMember, " key:", string(key[:]), " type:", t) + //log.Debug("r.remainMember:", l.rdbReader.remainMember) val, err := l.readObjectValue(t, l) if err != nil { return nil, err diff --git a/src/pkg/rdb/reader.go b/src/pkg/rdb/reader.go index f9a5c5e..a4ab75a 100644 --- a/src/pkg/rdb/reader.go +++ b/src/pkg/rdb/reader.go @@ -13,7 +13,7 @@ import ( "strconv" "pkg/libs/errors" - // "libs/log" + ) var FromVersion int64 = 9 @@ -144,10 +144,12 @@ func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) { if n, err := r.ReadLength(); err != nil { return nil, err } else { + // log.Debug("zset length: ", n) for i := 0; i < int(n); i++ { if _, err := r.ReadString(); err != nil { return nil, err } + // log.Debug("zset read: ", i) if t == RdbTypeZSet2 { if _, err := r.ReadDouble(); err != nil { return nil, err diff --git a/src/redis-shake/common/cluster.go b/src/redis-shake/common/cluster.go index 8bf9df5..2abb1e7 100644 --- a/src/redis-shake/common/cluster.go +++ b/src/redis-shake/common/cluster.go @@ -7,7 +7,7 @@ import ( "pkg/libs/log" ) -const( +var ( RecvChanSize = 4096 ) diff --git a/src/redis-shake/common/command.go b/src/redis-shake/common/command.go index d7b7ab1..678e95d 100644 --- a/src/redis-shake/common/command.go +++ b/src/redis-shake/common/command.go @@ -10,6 +10,11 @@ import ( "strings" ) +const ( + ReplayString = "string" + ReplayInt64s = "int64s" +) + type ClusterNodeInfo struct { Id string Address string diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index b429299..6fb9b35 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -811,6 +811,8 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { params = append(params, "FREQ") params = append(params, e.Freq) } + + log.Debugf("restore key[%s] with params[%v]", e.Key, params) // fmt.Printf("key: %v, value: %v params: %v\n", string(e.Key), e.Value, params) // s, err := redigo.String(c.Do("restore", params...)) s, err := redigoCluster.String(c.Do("restore", params...)) diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 5a8cc85..ec5343d 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -4,65 +4,65 @@ import "time" type Configuration struct { // config file variables - Id string `config:"id"` - LogFile string `config:"log.file"` - LogLevel string `config:"log.level"` - SystemProfile int `config:"system_profile"` - HttpProfile int `config:"http_profile"` + Id string `config:"id"` + LogFile string `config:"log.file"` + LogLevel string `config:"log.level"` + SystemProfile int `config:"system_profile"` + HttpProfile int `config:"http_profile"` + Parallel int `config:"parallel"` + SourceType string `config:"source.type"` + SourceAddress string `config:"source.address"` + SourcePasswordRaw string `config:"source.password_raw"` + SourcePasswordEncoding string `config:"source.password_encoding"` + SourceAuthType string `config:"source.auth_type"` + SourceTLSEnable bool `config:"source.tls_enable"` + SourceRdbInput []string `config:"source.rdb.input"` + SourceRdbParallel int `config:"source.rdb.parallel"` + SourceRdbSpecialCloud string `config:"source.rdb.special_cloud"` + TargetAddress string `config:"target.address"` + TargetPasswordRaw string `config:"target.password_raw"` + TargetPasswordEncoding string `config:"target.password_encoding"` + TargetDBString string `config:"target.db"` + TargetAuthType string `config:"target.auth_type"` + TargetType string `config:"target.type"` + TargetTLSEnable bool `config:"target.tls_enable"` + TargetRdbOutput string `config:"target.rdb.output"` + TargetVersion string `config:"target.version"` + FakeTime string `config:"fake_time"` + Rewrite bool `config:"rewrite"` + FilterDBWhitelist []string `config:"filter.db.whitelist"` + FilterDBBlacklist []string `config:"filter.db.blacklist"` + FilterKeyWhitelist []string `config:"filter.key.whitelist"` + FilterKeyBlacklist []string `config:"filter.key.blacklist"` + FilterSlot []string `config:"filter.slot"` + FilterLua bool `config:"filter.lua"` + BigKeyThreshold uint64 `config:"big_key_threshold"` + Psync bool `config:"psync"` + Metric bool `config:"metric"` + MetricPrintLog bool `config:"metric.print_log"` + SenderSize uint64 `config:"sender.size"` + SenderCount uint `config:"sender.count"` + SenderDelayChannelSize uint `config:"sender.delay_channel_size"` + KeepAlive uint `config:"keep_alive"` + PidPath string `config:"pid_path"` + ScanKeyNumber uint32 `config:"scan.key_number"` + ScanSpecialCloud string `config:"scan.special_cloud"` + ScanKeyFile string `config:"scan.key_file"` + Qps int `config:"qps"` + + /*---------------------------------------------------------*/ + // inner variables NCpu int `config:"ncpu"` - Parallel int `config:"parallel"` - SourceType string `config:"source.type"` - SourceAddress string `config:"source.address"` - SourcePasswordRaw string `config:"source.password_raw"` - SourcePasswordEncoding string `config:"source.password_encoding"` - SourceAuthType string `config:"source.auth_type"` - SourceTLSEnable bool `config:"source.tls_enable"` - SourceRdbInput []string `config:"source.rdb.input"` - SourceRdbParallel int `config:"source.rdb.parallel"` - SourceRdbSpecialCloud string `config:"source.rdb.special_cloud"` - TargetAddress string `config:"target.address"` - TargetPasswordRaw string `config:"target.password_raw"` - TargetPasswordEncoding string `config:"target.password_encoding"` - TargetDBString string `config:"target.db"` - TargetAuthType string `config:"target.auth_type"` - TargetType string `config:"target.type"` - TargetTLSEnable bool `config:"target.tls_enable"` - TargetRdbOutput string `config:"target.rdb.output"` - TargetVersion string `config:"target.version"` - FakeTime string `config:"fake_time"` - Rewrite bool `config:"rewrite"` - FilterDBWhitelist []string `config:"filter.db.whitelist"` - FilterDBBlacklist []string `config:"filter.db.blacklist"` - FilterKeyWhitelist []string `config:"filter.key.whitelist"` - FilterKeyBlacklist []string `config:"filter.key.blacklist"` - FilterSlot []string `config:"filter.slot"` - FilterLua bool `config:"filter.lua"` - BigKeyThreshold uint64 `config:"big_key_threshold"` - Psync bool `config:"psync"` - Metric bool `config:"metric"` - MetricPrintLog bool `config:"metric.print_log"` HeartbeatUrl string `config:"heartbeat.url"` HeartbeatInterval uint `config:"heartbeat.interval"` HeartbeatExternal string `config:"heartbeat.external"` HeartbeatNetworkInterface string `config:"heartbeat.network_interface"` - SenderSize uint64 `config:"sender.size"` - SenderCount uint `config:"sender.count"` - SenderDelayChannelSize uint `config:"sender.delay_channel_size"` - KeepAlive uint `config:"keep_alive"` - PidPath string `config:"pid_path"` - ScanKeyNumber uint32 `config:"scan.key_number"` - ScanSpecialCloud string `config:"scan.special_cloud"` - ScanKeyFile string `config:"scan.key_file"` - Qps int `config:"qps"` - - /*---------------------------------------------------------*/ - // inner variables - ReplaceHashTag bool `config:"replace_hash_tag"` - ExtraInfo bool `config:"extra"` - SockFileName string `config:"sock.file_name"` - SockFileSize uint `config:"sock.file_size"` - FilterKey []string `config:"filter.key"` // compatible with older versions - FilterDB string `config:"filter.db"` // compatible with older versions + ReplaceHashTag bool `config:"replace_hash_tag"` + ExtraInfo bool `config:"extra"` + SockFileName string `config:"sock.file_name"` + SockFileSize uint `config:"sock.file_size"` + FilterKey []string `config:"filter.key"` // compatible with older versions + FilterDB string `config:"filter.db"` // compatible with older versions /*---------------------------------------------------------*/ // generated variables diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 3ee087c..ea58fc1 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -396,6 +396,10 @@ func sanitizeOptions(tp string) error { // set to default when not set conf.Options.SenderCount = defaultSenderCount } + if conf.Options.TargetType == conf.RedisTypeCluster && int(conf.Options.SenderCount) > utils.RecvChanSize { + log.Infof("RecvChanSize is modified from [%v] to [%v]", utils.RecvChanSize, int(conf.Options.SenderCount)) + utils.RecvChanSize = int(conf.Options.SenderCount) + } if conf.Options.SenderDelayChannelSize == 0 { conf.Options.SenderDelayChannelSize = 32 @@ -448,8 +452,9 @@ func sanitizeOptions(tp string) error { conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) } - if conf.Options.ScanKeyNumber > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster { - return fmt.Errorf("scan.key_number should less than [%v] when target type is cluster", utils.RecvChanSize) + if int(conf.Options.ScanKeyNumber) > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster { + log.Infof("RecvChanSize is modified from [%v] to [%v]", utils.RecvChanSize, int(conf.Options.ScanKeyNumber)) + utils.RecvChanSize = int(conf.Options.ScanKeyNumber) } //if len(conf.Options.SourceAddressList) == 1 { diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 247ec64..c6a0db4 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -13,9 +13,9 @@ import ( "redis-shake/configure" "redis-shake/metric" "redis-shake/scanner" + "redis-shake/filter" "github.com/garyburd/redigo/redis" - "redis-shake/filter" ) type CmdRump struct { @@ -483,18 +483,21 @@ func (dre *dbRumperExecutor) doFetch(db int) error { log.Debugf("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key) dre.sourceClient.Send("DUMP", key) } - dumps, err := redis.Strings(dre.sourceClient.Do("")) + + reply, err := dre.sourceClient.Do("") + dumps, err := redis.Strings(reply, err) if err != nil && err != redis.ErrNil { - return fmt.Errorf("do dump with failed[%v]", err) + return fmt.Errorf("do dump with failed[%v], reply[%v]", err, reply) } // pipeline ttl for _, key := range keys { dre.sourceClient.Send("PTTL", key) } - pttls, err := redis.Int64s(dre.sourceClient.Do("")) + reply, err = dre.sourceClient.Do("") + pttls, err := redis.Int64s(reply, err) if err != nil && err != redis.ErrNil { - return fmt.Errorf("do ttl with failed[%v]", err) + return fmt.Errorf("do ttl with failed[%v], reply[%v]", err, reply) } dre.stat.rCommands.Add(int64(len(keys)))