From 647804b6fcf26046d7ad3e707cda45453f516639 Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 8 May 2019 17:16:24 +0800 Subject: [PATCH] 1.source address supports cluster 2.target address supports several proxies to write data in a round-robin way. --- ChangeLog | 5 + conf/redis-shake.conf | 51 +++++----- src/redis-shake/common/common.go | 22 ++++- src/redis-shake/configure/configure.go | 8 +- src/redis-shake/decode.go | 6 +- src/redis-shake/dump.go | 2 +- src/redis-shake/main/main.go | 80 ++++++++++----- src/redis-shake/restore.go | 131 +++++++++++++++++-------- src/redis-shake/rump.go | 3 +- src/redis-shake/sync.go | 8 +- 10 files changed, 212 insertions(+), 104 deletions(-) diff --git a/ChangeLog b/ChangeLog index eb7a4b9..ea26ce2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +2019-04-21 Alibaba Cloud. + * VERSION: 1.6.0 + * FEATURE: source address supports cluster. + * FEATURE: target address supports several proxies to write data in + a round-robin way. 2019-04-24 Alibaba Cloud. * VERSION: 1.4.2 * IMPROVE: improve rump to support fetching data from given keys in diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 7838124..8ff4d47 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -23,19 +23,6 @@ ncpu = 0 # parallel routines number used in RDB file syncing. default is 64. parallel = 32 -# input RDB file. read from stdin, default is stdin ('/dev/stdin'). -# used in `decode` and `restore`. -# if the input is list split by semicolon(;), redis-shake will restore the list one by one. -# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 -# redis-shake将会挨个进行恢复。 -input_rdb = local - -# output RDB file prefix. -# used in `decode` and `dump`. -# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: -# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 -output_rdb = local_dump - # source redis configuration. # used in `dump`, `sync` and `rump`. # ip:port @@ -48,11 +35,6 @@ source.address = 127.0.0.1:20441 source.password_raw = 123456 # auth type, don't modify it source.auth_type = auth -# the concurrence of fetching data, default is len(source.address) -# used in `dump` and `sync`. -# 拉取的并发度,默认是source.address中db的个数。假如db节点有5个,但source.parallel=3,那么一次只会 -# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。 -source.parallel = # target redis configuration. used in `restore` and `sync`. # used in `restore`, `sync` and `rump`. @@ -65,15 +47,34 @@ target.address = 127.0.0.1:20551 target.password_raw = 123456 # auth type, don't modify it target.auth_type = auth +# the type of target redis can be `standalone`, `proxy` or `cluster`. +# `standalone`: standalone db mode. +# `proxy`: proxy layer ahead redis. +# `cluster`: open source cluster (not supported currently) +# If the target is proxy, data will be inserted in a round-robin way. +# standalone表示单个db节点进行写入(包括主从模式),proxy表示写入的是代理层,将会以轮回的方式进行写入,1个proxy对应一条 +# 长连接通道(1个源端db的数据只会写入1个proxy),cluster表示开源的集群架构 +target.type = standalone # all the data will be written into this db. < 0 means disable. target.db = -1 -# the type of target redis: `cluster`, `opensource`. If the target is open source redis cluster, -# redis-shake uses redis-go-cluster driver to write data, otherwise, redigo driver is used to -# insert data in round robin way. -# 目的redis的类型:`opensource`和`proxy`. `opensource`表示是开源的cluster或者redis db节点; -# `proxy`表示是proxy类型,将会以round robin循环方式写入。对于开源的cluster用redis-go-cluster驱动写入,其余 -# 的则用redigo写入 -# target.type = opensource + +# input RDB file. read from stdin, default is stdin ('/dev/stdin'). +# used in `decode` and `restore`. +# if the input is list split by semicolon(;), redis-shake will restore the list one by one. +# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 +# redis-shake将会挨个进行恢复。 +rdb.input = local +# output RDB file prefix. +# used in `decode` and `dump`. +# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: +# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 +rdb.output = local_dump +# the concurrence of fetching data, default is len(source.address) or len(rdb.input). +# used in `dump`, `sync` and `restore`. +# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(rdb.input)。 +# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会 +# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。 +rdb.parallel = # use for expire key, set the time gap when source and target timestamp are not the same. # 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值 diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 88e7219..d66e9d8 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -8,7 +8,7 @@ import ( "pkg/libs/bytesize" "redis-shake/configure" - + logRotate "gopkg.in/natefinch/lumberjack.v2" ) @@ -26,9 +26,10 @@ const ( ) var ( - Version = "$" - LogRotater *logRotate.Logger - StartTime string + Version = "$" + LogRotater *logRotate.Logger + StartTime string + TargetRoundRobin int ) // read until hit the end of RESP: "\r\n" @@ -70,5 +71,16 @@ func ParseInfo(content []byte) map[string]string { } func GetTotalLink() int { - return len(conf.Options.SourceAddress) + if len(conf.Options.SourceAddress) != 0 { + return len(conf.Options.SourceAddress) + } else { + return len(conf.Options.RdbInput) + } +} + +func PickTargetRoundRobin(n int) int { + defer func() { + TargetRoundRobin = (TargetRoundRobin + 1) % n + }() + return TargetRoundRobin } \ No newline at end of file diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 321066d..afc3642 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -11,20 +11,22 @@ type Configuration struct { HttpProfile int `config:"http_profile"` NCpu int `config:"ncpu"` Parallel int `config:"parallel"` - InputRdb []string `config:"input_rdb"` - OutputRdb string `config:"output_rdb"` SourceAddress []string `config:"source.address"` SourcePasswordRaw string `config:"source.password_raw"` SourcePasswordEncoding string `config:"source.password_encoding"` SourceVersion uint `config:"source.version"` SourceAuthType string `config:"source.auth_type"` SourceParallel uint `config:"source.parallel"` - TargetAddress string `config:"target.address"` + TargetAddress []string `config:"target.address"` TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordEncoding string `config:"target.password_encoding"` TargetVersion uint `config:"target.version"` TargetDB int `config:"target.db"` TargetAuthType string `config:"target.auth_type"` + TargetType string `config:"target.type"` + RdbInput []string `config:"rdb.input"` + RdbOutput string `config:"rdb.output"` + RdbParallel int `config:"rdb.parallel"` FakeTime string `config:"fake_time"` Rewrite bool `config:"rewrite"` FilterDB string `config:"filter.db"` diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index 09aee6b..e71dbf7 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -40,11 +40,11 @@ func (cmd *CmdDecode) GetDetailedInfo() interface{} { } func (cmd *CmdDecode) Main() { - log.Infof("decode from '%s' to '%s'\n", conf.Options.InputRdb, conf.Options.OutputRdb) + log.Infof("decode from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.RdbOutput) - for i, input := range conf.Options.InputRdb { + for i, input := range conf.Options.RdbInput { // decode one by one - output := fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i) + output := fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i) cmd.decode(input, output) } diff --git a/src/redis-shake/dump.go b/src/redis-shake/dump.go index b3e5b27..4303174 100644 --- a/src/redis-shake/dump.go +++ b/src/redis-shake/dump.go @@ -36,7 +36,7 @@ func (cmd *CmdDump) Main() { for i, source := range conf.Options.SourceAddress { nd := node{ source: source, - output: fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i), + output: fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i), } cmd.dumpChan <- nd } diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index a329469..e744df9 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -193,20 +193,48 @@ func sanitizeOptions(tp string) error { return fmt.Errorf("BigKeyThreshold[%v] should <= 524288000", conf.Options.BigKeyThreshold) } - if (tp == TypeRestore || tp == TypeSync) && conf.Options.TargetAddress == "" { - return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}") - } - if (tp == TypeDump || tp == TypeSync) && len(conf.Options.SourceAddress) == 0 { - return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}") + if tp == TypeRestore || tp == TypeSync || tp == TypeRump { + if len(conf.Options.TargetAddress) == 0 { + return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}") + } + + switch conf.Options.TargetType { + case "standalone": + if len(conf.Options.TargetAddress) != 1 { + return fmt.Errorf("the source address[%v] != 1 when target.type is standalone", + len(conf.Options.TargetAddress)) + } + case "proxy": + case "cluster": + if tp == TypeRump || tp == TypeRestore { + // TODO + return fmt.Errorf("{rump, restore} mode doesn't support cluster currently") + } + return fmt.Errorf("coming soon") + default: + return fmt.Errorf("illegal target.type[%v]", conf.Options.TargetType) + } } - if tp == TypeRump && (len(conf.Options.SourceAddress) == 0 || conf.Options.TargetAddress == "") { - return fmt.Errorf("source and target address shouldn't be empty when type in {rump}") + if (tp == TypeDump || tp == TypeSync || tp == TypeRump) && len(conf.Options.SourceAddress) == 0 { + return fmt.Errorf("source address shouldn't be empty when type in {dump, sync, rump}") } - if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.InputRdb) == 0 { + if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.RdbInput) == 0 { return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}") } - if tp == TypeDump && conf.Options.OutputRdb == "" { - conf.Options.OutputRdb = "output-rdb-dump" + if tp == TypeDump && conf.Options.RdbOutput == "" { + conf.Options.RdbOutput = "output-rdb-dump" + } + + if conf.Options.RdbParallel == 0 { + if tp == TypeDump || tp == TypeSync { + conf.Options.RdbParallel = len(conf.Options.SourceAddress) + } else if tp == TypeRestore { + conf.Options.RdbParallel = len(conf.Options.RdbInput) + } + } + + if tp == TypeRestore && conf.Options.RdbParallel > len(conf.Options.RdbInput) { + conf.Options.RdbParallel = len(conf.Options.RdbInput) } if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" { @@ -318,45 +346,51 @@ func sanitizeOptions(tp string) error { if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 { return fmt.Errorf("HttpProfile[%v] should in [0, 65535]", conf.Options.HttpProfile) - } else if conf.Options.HttpProfile == 0 { + } else if conf.Options.HttpProfile == 0 { // set to default when not set conf.Options.HttpProfile = defaultHttpPort } if conf.Options.SystemProfile < 0 || conf.Options.SystemProfile > 65535 { return fmt.Errorf("SystemProfile[%v] should in [0, 65535]", conf.Options.SystemProfile) - } else if conf.Options.SystemProfile == 0 { + } else if conf.Options.SystemProfile == 0 { // set to default when not set conf.Options.SystemProfile = defaultSystemPort } if conf.Options.SenderSize < 0 || conf.Options.SenderSize >= 1073741824 { return fmt.Errorf("SenderSize[%v] should in [0, 1073741824]", conf.Options.SenderSize) - } else if conf.Options.SenderSize == 0 { + } else if conf.Options.SenderSize == 0 { // set to default when not set conf.Options.SenderSize = defaultSenderSize } if conf.Options.SenderCount < 0 || conf.Options.SenderCount >= 100000 { return fmt.Errorf("SenderCount[%v] should in [0, 100000]", conf.Options.SenderCount) - } else if conf.Options.SenderCount == 0 { + } else if conf.Options.SenderCount == 0 { // set to default when not set conf.Options.SenderCount = defaultSenderCount } if tp == TypeRestore || tp == TypeSync { // get target redis version and set TargetReplace. - if conf.Options.TargetRedisVersion, err = utils.GetRedisVersion(conf.Options.TargetAddress, - conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw); err != nil { - return fmt.Errorf("get target redis version failed[%v]", err) - } else { - if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") || - strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") { - conf.Options.TargetReplace = true + for _, address := range conf.Options.TargetAddress { + if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType, + conf.Options.TargetPasswordRaw); err != nil { + return fmt.Errorf("get target redis version failed[%v]", err) + } else if conf.Options.TargetRedisVersion != "" && conf.Options.TargetRedisVersion != v { + return fmt.Errorf("target redis version is different: [%v %v]", conf.Options.TargetRedisVersion, v) } else { - conf.Options.TargetReplace = false + conf.Options.TargetRedisVersion = v } } + if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") || + strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") || + strings.HasPrefix(conf.Options.TargetRedisVersion, "5.") { + conf.Options.TargetReplace = true + } else { + conf.Options.TargetReplace = false + } } if tp == TypeRump { @@ -365,7 +399,7 @@ func sanitizeOptions(tp string) error { } if conf.Options.ScanSpecialCloud != "" && conf.Options.ScanSpecialCloud != scanner.TencentCluster && - conf.Options.ScanSpecialCloud != scanner.AliyunCluster { + conf.Options.ScanSpecialCloud != scanner.AliyunCluster { return fmt.Errorf("special cloud type[%s] is not supported", conf.Options.ScanSpecialCloud) } diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index 7878fa7..ae2348a 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -17,12 +17,10 @@ import ( "redis-shake/common" "strconv" "redis-shake/base" + "sync" ) type CmdRestore struct { - rbytes, ebytes, nentry, ignore atomic2.Int64 - - forward, nbypass atomic2.Int64 } type cmdRestoreStat struct { @@ -31,31 +29,57 @@ type cmdRestoreStat struct { forward, nbypass int64 } -func (cmd *CmdRestore) Stat() *cmdRestoreStat { - return &cmdRestoreStat{ - rbytes: cmd.rbytes.Get(), - ebytes: cmd.ebytes.Get(), - nentry: cmd.nentry.Get(), - ignore: cmd.ignore.Get(), - - forward: cmd.forward.Get(), - nbypass: cmd.nbypass.Get(), - } -} - func (cmd *CmdRestore) GetDetailedInfo() interface{} { return nil } func (cmd *CmdRestore) Main() { - log.Infof("restore from '%s' to '%s'\n", conf.Options.InputRdb, conf.Options.TargetAddress) + log.Infof("restore from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.TargetAddress) + type restoreNode struct { + id int + input string + } base.Status = "waitRestore" - for _, input := range conf.Options.InputRdb { - // restore one by one - cmd.restore(input) + total := utils.GetTotalLink() + restoreChan := make(chan restoreNode, total) + + for i, rdb := range conf.Options.RdbInput { + restoreChan <- restoreNode{id: i, input: rdb} } + var wg sync.WaitGroup + wg.Add(len(conf.Options.RdbInput)) + for i := 0; i < conf.Options.RdbParallel; i++ { + go func() { + for { + node, ok := <-restoreChan + if !ok { + break + } + + // round-robin pick + pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddress)) + target := conf.Options.TargetAddress[pick] + + dr := &dbRestorer{ + id: node.id, + input: node.input, + target: target, + targetPassword: conf.Options.TargetPasswordRaw, + } + dr.restore() + log.Infof("routine[%v] starts restoring data from %v to %v", + dr.id, dr.input, dr.target) + + wg.Done() + } + }() + } + + wg.Wait() + close(restoreChan) + if conf.Options.HttpProfile > 0 { //fake status if set http_port. and wait forever base.Status = "incr" @@ -64,25 +88,50 @@ func (cmd *CmdRestore) Main() { } } -func (cmd *CmdRestore) restore(input string) { - readin, nsize := utils.OpenReadFile(input) +/*------------------------------------------------------*/ +// one restore link corresponding to one dbRestorer +type dbRestorer struct { + id int // id + input string // input rdb + target string + targetPassword string + + // metric + rbytes, ebytes, nentry, ignore atomic2.Int64 + forward, nbypass atomic2.Int64 +} + +func (dr *dbRestorer) Stat() *cmdRestoreStat { + return &cmdRestoreStat{ + rbytes: dr.rbytes.Get(), + ebytes: dr.ebytes.Get(), + nentry: dr.nentry.Get(), + ignore: dr.ignore.Get(), + + forward: dr.forward.Get(), + nbypass: dr.nbypass.Get(), + } +} + +func (dr *dbRestorer) restore() { + readin, nsize := utils.OpenReadFile(dr.input) defer readin.Close() base.Status = "restore" reader := bufio.NewReaderSize(readin, utils.ReaderBufferSize) - cmd.restoreRDBFile(reader, conf.Options.TargetAddress, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, + dr.restoreRDBFile(reader, dr.target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, nsize) base.Status = "extra" - if conf.Options.ExtraInfo && (nsize == 0 || nsize != cmd.rbytes.Get()) { - cmd.restoreCommand(reader, conf.Options.TargetAddress, conf.Options.TargetAuthType, + if conf.Options.ExtraInfo && (nsize == 0 || nsize != dr.rbytes.Get()) { + dr.restoreCommand(reader, dr.target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) } } -func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { - pipe := utils.NewRDBLoader(reader, &cmd.rbytes, base.RDBPipeSize) +func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { + pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize) wait := make(chan struct{}) go func() { defer close(wait) @@ -97,9 +146,9 @@ func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, p var lastdb uint32 = 0 for e := range pipe { if !base.AcceptDB(e.DB) { - cmd.ignore.Incr() + dr.ignore.Incr() } else { - cmd.nentry.Incr() + dr.nentry.Incr() if conf.Options.TargetDB != -1 { if conf.Options.TargetDB != int(lastdb) { lastdb = uint32(conf.Options.TargetDB) @@ -127,12 +176,12 @@ func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, p done = true case <-time.After(time.Second): } - stat := cmd.Stat() + stat := dr.Stat() var b bytes.Buffer if nsize != 0 { - fmt.Fprintf(&b, "total = %d - %12d [%3d%%]", nsize, stat.rbytes, 100*stat.rbytes/nsize) + fmt.Fprintf(&b, "routine[%v] total = %d - %12d [%3d%%]", dr.id, nsize, stat.rbytes, 100*stat.rbytes/nsize) } else { - fmt.Fprintf(&b, "total = %12d", stat.rbytes) + fmt.Fprintf(&b, "routine[%v] total = %12d", dr.id, stat.rbytes) } fmt.Fprintf(&b, " entry=%-12d", stat.nentry) if stat.ignore != 0 { @@ -140,10 +189,10 @@ func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, p } log.Info(b.String()) } - log.Info("restore: rdb done") + log.Info("routine[%v] restore: rdb done", dr.id) } -func (cmd *CmdRestore) restoreCommand(reader *bufio.Reader, target, auth_type, passwd string) { +func (dr *dbRestorer) restoreCommand(reader *bufio.Reader, target, auth_type, passwd string) { c := utils.OpenNetConn(target, auth_type, passwd) defer c.Close() @@ -162,35 +211,35 @@ func (cmd *CmdRestore) restoreCommand(reader *bufio.Reader, target, auth_type, p for { resp := redis.MustDecode(reader) if scmd, args, err := redis.ParseArgs(resp); err != nil { - log.PanicError(err, "parse command arguments failed") + log.PanicError(err, "routine[%v] parse command arguments failed", dr.id) } else if scmd != "ping" { if scmd == "select" { if len(args) != 1 { - log.Panicf("select command len(args) = %d", len(args)) + log.Panicf("routine[%v] select command len(args) = %d", dr.id, len(args)) } s := string(args[0]) n, err := strconv.Atoi(s) if err != nil { - log.PanicErrorf(err, "parse db = %s failed", s) + log.PanicErrorf(err, "routine[%v] parse db = %s failed", dr.id, s) } bypass = !base.AcceptDB(uint32(n)) } if bypass { - cmd.nbypass.Incr() + dr.nbypass.Incr() continue } } - cmd.forward.Incr() + dr.forward.Incr() redis.MustEncode(writer, resp) utils.FlushWriter(writer) } }() - for lstat := cmd.Stat(); ; { + for lstat := dr.Stat(); ; { time.Sleep(time.Second) - nstat := cmd.Stat() + nstat := dr.Stat() var b bytes.Buffer - fmt.Fprintf(&b, "restore: ") + fmt.Fprintf(&b, "routine[%v] restore: ", dr.id) fmt.Fprintf(&b, " +forward=%-6d", nstat.forward-lstat.forward) fmt.Fprintf(&b, " +nbypass=%-6d", nstat.nbypass-lstat.nbypass) log.Info(b.String()) diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 0700d39..e5f7243 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -40,7 +40,8 @@ func (cr *CmdRump) Main() { for i, address := range conf.Options.SourceAddress { cr.sourceConn[i] = utils.OpenRedisConn(address, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) } - cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType, + // TODO, current only support write data into 1 db or proxy + cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress[0], conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) // init two channels diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 27907ff..94943e6 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -80,11 +80,15 @@ func (cmd *CmdSync) Main() { syncChan := make(chan syncNode, total) cmd.dbSyncers = make([]*dbSyncer, total) for i, source := range conf.Options.SourceAddress { + // round-robin pick + pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddress)) + target := conf.Options.TargetAddress[pick] + nd := syncNode{ id: i, source: source, sourcePassword: conf.Options.SourcePasswordRaw, - target: conf.Options.TargetAddress, // todo, target address load balance + target: target, targetPassword: conf.Options.TargetPasswordRaw, } syncChan <- nd @@ -125,7 +129,7 @@ func (cmd *CmdSync) Main() { } /*------------------------------------------------------*/ -// one sync tunnel corresponding to one dbSyncer +// one sync link corresponding to one dbSyncer func NewDbSyncer(id int, source, sourcePassword, target, targetPassword string, httpPort int) *dbSyncer { ds := &dbSyncer{ id: id,