merge remote

v4
vinllen 5 years ago
commit 185b8a832e
  1. 10
      ChangeLog
  2. 37
      conf/redis-shake.conf
  3. 7
      src/redis-shake/common/common.go
  4. 2
      src/redis-shake/common/utils.go
  5. 11
      src/redis-shake/configure/configure.go
  6. 6
      src/redis-shake/decode.go
  7. 4
      src/redis-shake/dump.go
  8. 41
      src/redis-shake/main/main.go
  9. 10
      src/redis-shake/restore.go
  10. 2
      src/redis-shake/sync.go

@ -1,3 +1,13 @@
2019-08-01 Alibaba Cloud.
* VERSION: 1.6.14
* BUGFIX: the `rdb.parallel` parameter limits concurrency without effect.
see #133
* BUGFIX: call `select` when target redis type is cluster in `rump` mode.
* IMPROVE: add `http_profile = -1` to exit once finish rdb syncing in
`restore` mode.
* IMPROVE: 'info xxx' command isn't supported in codis, used 'info' and
parse 'xxx'.
* IMPROVE: rename `rdb.xx` to `source.rdb.xx` or `target.rdb.xx`.
2019-07-24 Alibaba Cloud.
* VERSION: 1.6.13
* IMPROVE: support `filter.db.whitelist` and `filter.db.blacklist` to let

@ -50,6 +50,24 @@ source.auth_type = auth
# tls enable, true or false. Currently, only support standalone.
# open source redis does NOT support tls so far, but some cloud versions do.
source.tls_enable = false
# input RDB file.
# used in `decode` and `restore`.
# if the input is list split by semicolon(;), redis-shake will restore the list one by one.
# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2
# redis-shake将会挨个进行恢复。
source.rdb.input = local
# the concurrence of RDB syncing, default is len(source.address) or len(source.rdb.input).
# used in `dump`, `sync` and `restore`. 0 means default.
# This is useless when source.type isn't cluster or only input is only one RDB.
# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(source.rdb.input)。
# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会
# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕并进入增量,才会拉取第4个db节点的rdb,
# 以此类推,最后会有len(source.address)或者len(rdb.input)个增量线程同时存在。
source.rdb.parallel = 0
# for special cloud vendor: ucloud
# used in `decode` and `restore`.
# ucloud集群版的rdb文件添加了slot前缀,进行特判剥离: ucloud_cluster。
source.rdb.special_cloud =
# target redis configuration. used in `restore`, `sync` and `rump`.
# the type of target redis can be "standalone", "proxy" or "cluster".
@ -75,28 +93,11 @@ target.db = -1
# tls enable, true or false. Currently, only support standalone.
# open source redis does NOT support tls so far, but some cloud versions do.
target.tls_enable = false
# input RDB file.
# used in `decode` and `restore`.
# if the input is list split by semicolon(;), redis-shake will restore the list one by one.
# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2
# redis-shake将会挨个进行恢复。
rdb.input = local
# output RDB file prefix.
# used in `decode` and `dump`.
# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是:
# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2
rdb.output = local_dump
# the concurrence of fetching data, default is len(source.address) or len(rdb.input).
# used in `dump`, `sync` and `restore`. 0 means default.
# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(rdb.input)。
# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会
# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕并进入增量,才会拉取第4个db节点的rdb,
# 以此类推,最后会有len(source.address)或者len(rdb.input)个增量线程同时存在。
rdb.parallel = 0
# for special cloud vendor: ucloud
# ucloud集群版的rdb文件添加了slot前缀,进行特判剥离: ucloud_cluster。
rdb.special_cloud =
target.rdb.output = local_dump
# use for expire key, set the time gap when source and target timestamp are not the same.
# 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值

@ -92,11 +92,12 @@ func ParseInfo(content []byte) map[string]string {
}
func GetTotalLink() int {
if len(conf.Options.SourceAddressList) != 0 {
if conf.Options.Type == conf.TypeSync || conf.Options.Type == conf.TypeRump || conf.Options.Type == conf.TypeDump {
return len(conf.Options.SourceAddressList)
} else {
return len(conf.Options.RdbInput)
} else if conf.Options.Type == conf.TypeDecode || conf.Options.Type == conf.TypeRestore {
return len(conf.Options.SourceRdbInput)
}
return 0
}
func PickTargetRoundRobin(n int) int {

@ -688,7 +688,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
* for ucloud, special judge.
* 046110.key -> key
*/
if conf.Options.RdbSpecialCloud == UCloudCluster {
if conf.Options.SourceRdbSpecialCloud == UCloudCluster {
e.Key = e.Key[7:]
}

@ -17,8 +17,10 @@ type Configuration struct {
SourcePasswordEncoding string `config:"source.password_encoding"`
SourceVersion uint `config:"source.version"`
SourceAuthType string `config:"source.auth_type"`
SourceParallel uint `config:"source.parallel"`
SourceTLSEnable bool `config:"source.tls_enable"`
SourceRdbInput []string `config:"source.rdb.input"`
SourceRdbParallel int `config:"source.rdb.parallel"`
SourceRdbSpecialCloud string `config:"source.rdb.special_cloud"`
TargetAddress string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"`
@ -27,10 +29,7 @@ type Configuration struct {
TargetAuthType string `config:"target.auth_type"`
TargetType string `config:"target.type"`
TargetTLSEnable bool `config:"target.tls_enable"`
RdbInput []string `config:"rdb.input"`
RdbOutput string `config:"rdb.output"`
RdbParallel int `config:"rdb.parallel"`
RdbSpecialCloud string `config:"rdb.special_cloud"`
TargetRdbOutput string `config:"target.rdb.output"`
FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"`
FilterDBWhitelist []string `config:"filter.db.whitelist"`
@ -57,6 +56,7 @@ type Configuration struct {
ScanKeyFile string `config:"scan.key_file"`
Qps int `config:"qps"`
/*---------------------------------------------------------*/
// inner variables
ReplaceHashTag bool `config:"replace_hash_tag"`
ExtraInfo bool `config:"extra"`
@ -75,6 +75,7 @@ type Configuration struct {
TargetReplace bool // to_replace
TargetDB int // int type
Version string // version
Type string // input mode -type=xxx
}
var Options Configuration

@ -40,11 +40,11 @@ func (cmd *CmdDecode) GetDetailedInfo() interface{} {
}
func (cmd *CmdDecode) Main() {
log.Infof("decode from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.RdbOutput)
log.Infof("decode from '%s' to '%s'\n", conf.Options.SourceRdbInput, conf.Options.TargetRdbOutput)
for i, input := range conf.Options.RdbInput {
for i, input := range conf.Options.SourceRdbInput {
// decode one by one. By now, we don't support decoding concurrence.
output := fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i)
output := fmt.Sprintf("%s.%d", conf.Options.TargetRdbOutput, i)
cmd.decode(input, output)
}

@ -37,7 +37,7 @@ func (cmd *CmdDump) Main() {
nd := node{
id: i,
source: source,
output: fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i),
output: fmt.Sprintf("%s.%d", conf.Options.TargetRdbOutput, i),
}
cmd.dumpChan <- nd
}
@ -49,7 +49,7 @@ func (cmd *CmdDump) Main() {
wg sync.WaitGroup
)
wg.Add(len(conf.Options.SourceAddressList))
for i := 0; i < int(conf.Options.SourceParallel); i++ {
for i := 0; i < int(conf.Options.SourceRdbParallel); i++ {
go func(idx int) {
log.Infof("start routine[%v]", idx)
for {

@ -66,6 +66,7 @@ func main() {
}
conf.Options.Version = utils.Version
conf.Options.Type = *tp
var file *os.File
if file, err = os.Open(*configuration); err != nil {
@ -219,31 +220,33 @@ func sanitizeOptions(tp string) error {
return fmt.Errorf("mode[%v] parse address failed[%v]", tp, err)
}
if (tp == conf.TypeRestore || tp == conf.TypeDecode) && len(conf.Options.RdbInput) == 0 {
return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}")
}
if tp == conf.TypeDump && conf.Options.RdbOutput == "" {
conf.Options.RdbOutput = "output-rdb-dump"
}
if conf.Options.RdbParallel == 0 {
if tp == conf.TypeDump || tp == conf.TypeSync {
conf.Options.RdbParallel = len(conf.Options.SourceAddressList)
} else if tp == conf.TypeRestore {
conf.Options.RdbParallel = len(conf.Options.RdbInput)
if tp == conf.TypeRestore || tp == conf.TypeDecode {
if len(conf.Options.SourceRdbInput) == 0 {
return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}")
}
// check file exist
for _, rdb := range conf.Options.SourceRdbInput {
if _, err := os.Stat(rdb); os.IsNotExist(err) {
return fmt.Errorf("input rdb file[%v] not exists", rdb)
}
}
}
if tp == conf.TypeRestore && conf.Options.RdbParallel > len(conf.Options.RdbInput) {
conf.Options.RdbParallel = len(conf.Options.RdbInput)
if tp == conf.TypeDump && conf.Options.TargetRdbOutput == "" {
conf.Options.TargetRdbOutput = "output-rdb-dump"
}
if conf.Options.RdbSpecialCloud != "" && conf.Options.RdbSpecialCloud != utils.UCloudCluster {
return fmt.Errorf("rdb special cloud type[%s] is not supported", conf.Options.RdbSpecialCloud)
if tp == conf.TypeDump || tp == conf.TypeSync {
if conf.Options.SourceRdbParallel <= 0 || conf.Options.SourceRdbParallel > len(conf.Options.SourceAddressList) {
conf.Options.SourceRdbParallel = len(conf.Options.SourceAddressList)
}
} else if tp == conf.TypeRestore || tp == conf.TypeDecode {
if conf.Options.SourceRdbParallel <= 0 || conf.Options.SourceRdbParallel > len(conf.Options.SourceRdbInput) {
conf.Options.SourceRdbParallel = len(conf.Options.SourceRdbInput)
}
}
if conf.Options.SourceParallel == 0 || conf.Options.SourceParallel > uint(len(conf.Options.SourceAddressList)) {
conf.Options.SourceParallel = uint(len(conf.Options.SourceAddressList))
if conf.Options.SourceRdbSpecialCloud != "" && conf.Options.SourceRdbSpecialCloud != utils.UCloudCluster {
return fmt.Errorf("rdb special cloud type[%s] is not supported", conf.Options.SourceRdbSpecialCloud)
}
if conf.Options.LogFile != "" {

@ -36,7 +36,7 @@ func (cmd *CmdRestore) GetDetailedInfo() interface{} {
}
func (cmd *CmdRestore) Main() {
log.Infof("restore from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.TargetAddressList)
log.Infof("restore from '%s' to '%s'\n", conf.Options.SourceRdbInput, conf.Options.TargetAddressList)
type restoreNode struct {
id int
@ -46,13 +46,13 @@ func (cmd *CmdRestore) Main() {
total := utils.GetTotalLink()
restoreChan := make(chan restoreNode, total)
for i, rdb := range conf.Options.RdbInput {
for i, rdb := range conf.Options.SourceRdbInput {
restoreChan <- restoreNode{id: i, input: rdb}
}
var wg sync.WaitGroup
wg.Add(len(conf.Options.RdbInput))
for i := 0; i < conf.Options.RdbParallel; i++ {
wg.Add(len(conf.Options.SourceRdbInput))
for i := 0; i < conf.Options.SourceRdbParallel; i++ {
go func() {
for {
node, ok := <-restoreChan
@ -87,7 +87,7 @@ func (cmd *CmdRestore) Main() {
wg.Wait()
close(restoreChan)
log.Infof("restore from '%s' to '%s' done", conf.Options.RdbInput, conf.Options.TargetAddressList)
log.Infof("restore from '%s' to '%s' done", conf.Options.SourceRdbInput, conf.Options.TargetAddressList)
if conf.Options.HttpProfile != -1 {
//fake status if set http_port. and wait forever
base.Status = "incr"

@ -105,7 +105,7 @@ func (cmd *CmdSync) Main() {
var wg sync.WaitGroup
wg.Add(len(conf.Options.SourceAddressList))
for i := 0; i < int(conf.Options.SourceParallel); i++ {
for i := 0; i < int(conf.Options.SourceRdbParallel); i++ {
go func() {
for {
nd, ok := <-syncChan

Loading…
Cancel
Save