From dcff2d358f9883d8b934009b637fe6205978ad09 Mon Sep 17 00:00:00 2001 From: vinllen Date: Mon, 29 Apr 2019 14:10:19 +0800 Subject: [PATCH 03/11] polish README and comments --- README.md | 8 +++++--- src/redis-shake/base/runner.go | 2 ++ src/redis-shake/common/utils.go | 2 +- src/redis-shake/decode.go | 3 ++- src/redis-shake/restore.go | 2 +- src/redis-shake/sync.go | 24 +++++++++++++++++------- 6 files changed, 28 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 43f7b80..e56cd38 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ The type can be one of the followings:
* **restore**: Restore RDB file to target redis. * **dump**: Dump RDB file from souce redis. * **sync**: Sync data from source redis to target redis by `sync` or `psync` command. Including full synchronization and incremental synchronization. -* **rump**: Sync data from source redis to target redis by `scan` command. Only support full synchronization. +* **rump**: Sync data from source redis to target redis by `scan` command. Only support full synchronization. Plus, RedisShake also supports fetching data from given keys in the input file when `scan` command is not supported on the source side. Please check out the `conf/redis-shake.conf` to see the detailed parameters description.
@@ -55,8 +55,10 @@ Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0. Run `./bin/redis-shake.darwin64` or `redis-shake.linux64` which is built in OSX and Linux respectively.
Or you can build redis-shake yourself according to the following steps: * git clone https://github.com/alibaba/RedisShake.git -* cd RedisShake/src/vendor -* GOPATH=\`pwd\`/../..; govendor sync #please note: must install govendor first and then pull all dependencies: `go get -u github.com/kardianos/govendor` +* cd RedisShake +* export GOPATH=\`pwd\`/../.. +* cd src/sync +* govendor sync #please note: must install govendor first and then pull all dependencies: `go get -u github.com/kardianos/govendor` * cd ../../ && ./build.sh * ./bin/redis-shake -type=$(type_must_be_sync_dump_restore_or_decode) -conf=conf/redis-shake.conf #please note: user must modify collector.conf first to match needs. diff --git a/src/redis-shake/base/runner.go b/src/redis-shake/base/runner.go index df6173e..fc7efbf 100644 --- a/src/redis-shake/base/runner.go +++ b/src/redis-shake/base/runner.go @@ -5,6 +5,8 @@ var( AcceptDB = func(db uint32) bool { return db >= 0 && db < 1024 } + + RDBPipeSize = 1024 ) type Runner interface{ diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 7629f28..6c13371 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -752,7 +752,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { log.Panicf("target key name is busy:", string(e.Key)) } } else { - log.PanicError(err, "restore command error key:", string(e.Key), "err:", err.Error()) + log.PanicError(err, "restore command error key:", string(e.Key), " err:", err.Error()) } } else { if s != "OK" { diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index a452caa..c8cc52b 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -18,6 +18,7 @@ import ( "pkg/rdb" "redis-shake/common" "redis-shake/configure" + "redis-shake/base" ) type CmdDecode struct { @@ -71,7 +72,7 @@ func (cmd *CmdDecode) Main() { reader := bufio.NewReaderSize(readin, utils.ReaderBufferSize) writer := bufio.NewWriterSize(saveto, utils.WriterBufferSize) - ipipe := utils.NewRDBLoader(reader, &cmd.rbytes, int(conf.Options.Parallel) * 32) + ipipe := utils.NewRDBLoader(reader, &cmd.rbytes, base.RDBPipeSize) opipe := make(chan string, cap(ipipe)) go func() { diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index 3734a40..26c9de5 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -89,7 +89,7 @@ func (cmd *CmdRestore) Main() { } func (cmd *CmdRestore) RestoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { - pipe := utils.NewRDBLoader(reader, &cmd.rbytes, conf.Options.Parallel * 32) + pipe := utils.NewRDBLoader(reader, &cmd.rbytes, base.RDBPipeSize) wait := make(chan struct{}) go func() { defer close(wait) diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 3a6ea46..d585577 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -94,12 +94,6 @@ func (cmd *CmdSync) GetDetailedInfo() []interface{} { func (cmd *CmdSync) Main() { from, target := conf.Options.SourceAddress, conf.Options.TargetAddress - if len(from) == 0 { - log.Panic("invalid argument: from") - } - if len(target) == 0 { - log.Panic("invalid argument: target") - } log.Infof("sync from '%s' to '%s' with http-port[%d]\n", from, target, conf.Options.HttpProfile) cmd.wait_full = make(chan struct{}) @@ -179,10 +173,13 @@ func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, utils.SendPSyncListeningPort(c, conf.Options.HttpProfile) log.Infof("psync send listening port[%v] OK!", conf.Options.HttpProfile) + // reader buffer bind to client br := bufio.NewReaderSize(c, utils.ReaderBufferSize) + // writer buffer bind to client bw := bufio.NewWriterSize(c, utils.WriterBufferSize) log.Infof("try to send 'psync' command") + // send psync command and decode the result runid, offset, wait := utils.SendPSyncFullsync(br, bw) cmd.targetOffset.Set(offset) log.Infof("psync runid = %s offset = %d, fullsync", runid, offset) @@ -200,21 +197,33 @@ func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, } } + // write -> pipew -> piper -> read piper, pipew := pipe.NewSize(utils.ReaderBufferSize) go func() { defer pipew.Close() p := make([]byte, 8192) + // read rdb in for loop for rdbsize := int(nsize); rdbsize != 0; { + // br -> pipew rdbsize -= utils.Iocopy(br, pipew, p, rdbsize) } + for { + /* + * read from br(source redis) and write into pipew. + * Generally speaking, this function is forever run. + */ n, err := cmd.PSyncPipeCopy(c, br, bw, offset, pipew) if err != nil { log.PanicErrorf(err, "psync runid = %s, offset = %d, pipe is broken", runid, offset) } + // the 'c' is closed every loop + offset += n cmd.targetOffset.Set(offset) + + // reopen 'c' every time for { // cmd.SyncStat.SetStatus("reopen") base.Status = "reopen" @@ -242,6 +251,7 @@ func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, } func (cmd *CmdSync) PSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer, offset int64, copyto io.Writer) (int64, error) { + // TODO, two times call c.Close() ? maybe a bug defer c.Close() var nread atomic2.Int64 go func() { @@ -275,7 +285,7 @@ func (cmd *CmdSync) PSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer } func (cmd *CmdSync) SyncRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { - pipe := utils.NewRDBLoader(reader, &cmd.rbytes, conf.Options.Parallel * 32) + pipe := utils.NewRDBLoader(reader, &cmd.rbytes, base.RDBPipeSize) wait := make(chan struct{}) go func() { defer close(wait) From f2e0849d8e71762de67c66a3f700b9ad90b7b38d Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 30 Apr 2019 10:20:06 +0800 Subject: [PATCH 04/11] modify source cluster of dump --- README.md | 11 ++++ conf/redis-shake.conf | 33 +++++++--- src/redis-shake/common/utils.go | 1 + src/redis-shake/configure/configure.go | 83 +++++++++++++------------ src/redis-shake/dump.go | 86 ++++++++++++++++++-------- src/redis-shake/main/main.go | 11 +++- src/redis-shake/rump.go | 5 +- src/redis-shake/sync.go | 19 ++++-- src/vendor/vendor.json | 3 +- 9 files changed, 169 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index e56cd38..ae91ba2 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,9 @@ The type can be one of the followings:
Please check out the `conf/redis-shake.conf` to see the detailed parameters description.
+# Configuration +Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters. + # Verification --- User can use [RedisFullCheck](https://github.com/alibaba/RedisFullCheck) to verify correctness.
@@ -30,6 +33,14 @@ Redis-shake offers metrics through restful api and log file.
* restful api: `curl 127.0.0.1:9320/metric`. * log: the metric info will be printed in the log periodically if enable. +m + +# Redis Type +--- +Both the source and target type can be single redis, master-slave architecture redis, redis cluster and codis. Although the architecture patterns of different vendors are different for the cluster architecture, we still support different cloud vendors like alibaba-cloud, tencent-cloud and so on.
+If the target is open source redis cluster, redis-shake uses [redis-go-cluster](https://github.com/chasex/redis-go-cluster) driver to write data, otherwise, [redigo](https://github.com/garyburd/redigo) driver is used to insert data in round robin way.
+If the source is redis cluster, redis-shake launches multiple goroutines for parallel pull. User can control the concurrency.
+The "move slot" operations must be disabled on the source side.
# Code branch rules --- diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index a8f913c..c2a2686 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -28,33 +28,50 @@ parallel = 32 # 如果是decode或者restore,这个参数表示读取的rdb文件 input_rdb = local -# output RDB file. default is stdout ('/dev/stdout'). +# output RDB file prefix. # used in `decode` and `dump`. -# 如果是decode或者dump,这个参数表示输出的rdb +# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: +# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 output_rdb = local_dump # source redis configuration. # used in `dump`, `sync` and `rump`. # ip:port -# 源redis地址 +# the source address can be redis db or cluster that has several db nodes split by semicolon(;). +# if the type is `rump`, source type can also be the proxy address. +# 源redis地址(ip:port),源端支持开源cluster以及db节点暴露且有权限的proxy集群模式,不同db以分号(;) +# 分割,例如:ip1:port1;ip2;port2;ip3;port3。 source.address = 127.0.0.1:20441 # password. source.password_raw = 123456 # auth type, don't modify it source.auth_type = auth +# the concurrence of fetching data, default is len(source.address) +# used in `dump` and `sync`. +# 拉取的并发度,默认是source.address中db的个数。假如db节点有5个,但source.parallel=3,那么一次只会 +# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。 +source.parallel = # target redis configuration. used in `restore` and `sync`. # used in `restore`, `sync` and `rump`. # ip:port -# 目的redis地址 +# the source address can be cluster that has several db nodes split by semicolon(;). +# 目的redis地址(ip:port),目的端支持开源的cluster架构和proxy模式,不同db以分号(;)分割,例如: +# ip1:port1;ip2;port2;ip3;port3。 target.address = 127.0.0.1:20551 # password. target.password_raw = 123456 # auth type, don't modify it target.auth_type = auth -# all the data will come into this db. < 0 means disable. -# used in `restore` and `sync`. +# all the data will be written into this db. < 0 means disable. target.db = -1 +# the type of target redis: `cluster`, `opensource`. If the target is open source redis cluster, +# redis-shake uses redis-go-cluster driver to write data, otherwise, redigo driver is used to +# insert data in round robin way. +# 目的redis的类型:`opensource`和`proxy`. `opensource`表示是开源的cluster或者redis db节点; +# `proxy`表示是proxy类型,将会以round robin循环方式写入。对于开源的cluster用redis-go-cluster驱动写入,其余 +# 的则用redigo写入 +# target.type = opensource # use for expire key, set the time gap when source and target timestamp are not the same. # 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值 @@ -126,7 +143,9 @@ sender.size = 104857600 # used in `sync`. # 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送 sender.count = 5000 -# delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also stored in this delay queue. this timestamp will be used to calculate the time delay when receiving ack from target redis. +# delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also +# stored in this delay queue. this timestamp will be used to calculate the time delay when receiving +# ack from target redis. # used in `sync`. # 用于metric统计时延的队列 sender.delay_channel_size = 65535 diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 6c13371..5023cbd 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -22,6 +22,7 @@ import ( "pkg/rdb" "pkg/redis" "redis-shake/configure" + redigo "github.com/garyburd/redigo/redis" ) diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 78033bd..6035480 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -4,47 +4,48 @@ import "time" type Configuration struct { // config file variables - Id string `config:"id"` - LogFile string `config:"log.file"` - LogLevel string `config:"log.level"` - SystemProfile int `config:"system_profile"` - HttpProfile int `config:"http_profile"` - NCpu int `config:"ncpu"` - Parallel int `config:"parallel"` - InputRdb string `config:"input_rdb"` - OutputRdb string `config:"output_rdb"` - SourceAddress string `config:"source.address"` - SourcePasswordRaw string `config:"source.password_raw"` - SourcePasswordEncoding string `config:"source.password_encoding"` - SourceVersion uint `config:"source.version"` - SourceAuthType string `config:"source.auth_type"` - TargetAddress string `config:"target.address"` - TargetPasswordRaw string `config:"target.password_raw"` - TargetPasswordEncoding string `config:"target.password_encoding"` - TargetVersion uint `config:"target.version"` - TargetDB int `config:"target.db"` - TargetAuthType string `config:"target.auth_type"` - FakeTime string `config:"fake_time"` - Rewrite bool `config:"rewrite"` - FilterDB string `config:"filter.db"` - FilterKey []string `config:"filter.key"` - FilterSlot []string `config:"filter.slot"` - BigKeyThreshold uint64 `config:"big_key_threshold"` - Psync bool `config:"psync"` - Metric bool `config:"metric"` - MetricPrintLog bool `config:"metric.print_log"` - HeartbeatUrl string `config:"heartbeat.url"` - HeartbeatInterval uint `config:"heartbeat.interval"` - HeartbeatExternal string `config:"heartbeat.external"` - HeartbeatNetworkInterface string `config:"heartbeat.network_interface"` - SenderSize uint64 `config:"sender.size"` - SenderCount uint `config:"sender.count"` - SenderDelayChannelSize uint `config:"sender.delay_channel_size"` - KeepAlive uint `config:"keep_alive"` - PidPath string `config:"pid_path"` - ScanKeyNumber uint32 `config:"scan.key_number"` - ScanSpecialCloud string `config:"scan.special_cloud"` - ScanKeyFile string `config:"scan.key_file"` + Id string `config:"id"` + LogFile string `config:"log.file"` + LogLevel string `config:"log.level"` + SystemProfile int `config:"system_profile"` + HttpProfile int `config:"http_profile"` + NCpu int `config:"ncpu"` + Parallel int `config:"parallel"` + InputRdb string `config:"input_rdb"` + OutputRdb string `config:"output_rdb"` + SourceAddress []string `config:"source.address"` + SourcePasswordRaw string `config:"source.password_raw"` + SourcePasswordEncoding string `config:"source.password_encoding"` + SourceVersion uint `config:"source.version"` + SourceAuthType string `config:"source.auth_type"` + SourceParallel uint `config:"source.parallel"` + TargetAddress string `config:"target.address"` + TargetPasswordRaw string `config:"target.password_raw"` + TargetPasswordEncoding string `config:"target.password_encoding"` + TargetVersion uint `config:"target.version"` + TargetDB int `config:"target.db"` + TargetAuthType string `config:"target.auth_type"` + FakeTime string `config:"fake_time"` + Rewrite bool `config:"rewrite"` + FilterDB string `config:"filter.db"` + FilterKey []string `config:"filter.key"` + FilterSlot []string `config:"filter.slot"` + BigKeyThreshold uint64 `config:"big_key_threshold"` + Psync bool `config:"psync"` + Metric bool `config:"metric"` + MetricPrintLog bool `config:"metric.print_log"` + HeartbeatUrl string `config:"heartbeat.url"` + HeartbeatInterval uint `config:"heartbeat.interval"` + HeartbeatExternal string `config:"heartbeat.external"` + HeartbeatNetworkInterface string `config:"heartbeat.network_interface"` + SenderSize uint64 `config:"sender.size"` + SenderCount uint `config:"sender.count"` + SenderDelayChannelSize uint `config:"sender.delay_channel_size"` + KeepAlive uint `config:"keep_alive"` + PidPath string `config:"pid_path"` + ScanKeyNumber uint32 `config:"scan.key_number"` + ScanSpecialCloud string `config:"scan.special_cloud"` + ScanKeyFile string `config:"scan.key_file"` // inner variables ReplaceHashTag bool `config:"replace_hash_tag"` diff --git a/src/redis-shake/dump.go b/src/redis-shake/dump.go index f73b450..6517183 100644 --- a/src/redis-shake/dump.go +++ b/src/redis-shake/dump.go @@ -5,10 +5,10 @@ package run import ( "bufio" - "io" "net" - "os" "time" + "fmt" + "sync" "pkg/libs/atomic2" "pkg/libs/log" @@ -17,6 +17,13 @@ import ( ) type CmdDump struct { + dumpChan chan node + wg sync.WaitGroup +} + +type node struct { + source string + output string } func (cmd *CmdDump) GetDetailedInfo() []interface{} { @@ -24,42 +31,71 @@ func (cmd *CmdDump) GetDetailedInfo() []interface{} { } func (cmd *CmdDump) Main() { - from, output := conf.Options.SourceAddress, conf.Options.OutputRdb - if len(from) == 0 { - log.Panic("invalid argument: from") + cmd.dumpChan = make(chan node, len(conf.Options.SourceAddress)) + + for i, source := range conf.Options.SourceAddress { + nd := node{ + source: source, + output: fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i), + } + cmd.dumpChan <- nd } - if len(output) == 0 { - output = "/dev/stdout" + + var ( + reader *bufio.Reader + writer *bufio.Writer + nsize int64 + ) + cmd.wg.Add(len(conf.Options.SourceAddress)) + for i := 0; i < int(conf.Options.SourceParallel); i++ { + go func(idx int) { + log.Infof("start routine[%v]", idx) + for { + select { + case nd, ok := <-cmd.dumpChan: + if !ok { + log.Infof("close routine[%v]", idx) + return + } + reader, writer, nsize = cmd.dump(nd.source, nd.output) + cmd.wg.Done() + } + } + }(i) } - log.Infof("dump from '%s' to '%s'\n", from, output) + cmd.wg.Wait() - var dumpto io.WriteCloser - if output != "/dev/stdout" { - dumpto = utils.OpenWriteFile(output) - defer dumpto.Close() - } else { - dumpto = os.Stdout + close(cmd.dumpChan) + + if len(conf.Options.SourceAddress) != 1 || !conf.Options.ExtraInfo { + return } - master, nsize := cmd.SendCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + // inner usage + cmd.dumpCommand(reader, writer, nsize) +} + +func (cmd *CmdDump) dump(source, output string) (*bufio.Reader, *bufio.Writer, int64) { + log.Infof("dump from '%s' to '%s'\n", source, output) + + dumpto := utils.OpenWriteFile(output) + defer dumpto.Close() + + master, nsize := cmd.sendCmd(source, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) defer master.Close() - log.Infof("rdb file = %d\n", nsize) + log.Infof("source db[%v] dump rdb file-size[%d]\n", source, nsize) reader := bufio.NewReaderSize(master, utils.ReaderBufferSize) writer := bufio.NewWriterSize(dumpto, utils.WriterBufferSize) - cmd.DumpRDBFile(reader, writer, nsize) - - if !conf.Options.ExtraInfo { - return - } + cmd.dumpRDBFile(reader, writer, nsize) - cmd.DumpCommand(reader, writer, nsize) + return reader, writer, nsize } -func (cmd *CmdDump) SendCmd(master, auth_type, passwd string) (net.Conn, int64) { +func (cmd *CmdDump) sendCmd(master, auth_type, passwd string) (net.Conn, int64) { c, wait := utils.OpenSyncConn(master, auth_type, passwd) var nsize int64 for nsize == 0 { @@ -75,7 +111,7 @@ func (cmd *CmdDump) SendCmd(master, auth_type, passwd string) (net.Conn, int64) return c, nsize } -func (cmd *CmdDump) DumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { +func (cmd *CmdDump) dumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { var nread atomic2.Int64 wait := make(chan struct{}) go func() { @@ -102,7 +138,7 @@ func (cmd *CmdDump) DumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsiz log.Info("dump: rdb done") } -func (cmd *CmdDump) DumpCommand(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { +func (cmd *CmdDump) dumpCommand(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { var nread atomic2.Int64 go func() { p := make([]byte, utils.ReaderBufferSize) diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 14c7609..5e9c2e5 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -196,12 +196,15 @@ func sanitizeOptions(tp string) error { if (tp == TypeRestore || tp == TypeSync) && conf.Options.TargetAddress == "" { return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}") } - if (tp == TypeDump || tp == TypeSync) && conf.Options.SourceAddress == "" { + if (tp == TypeDump || tp == TypeSync) && len(conf.Options.SourceAddress) == 0 { return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}") } - if tp == TypeRump && (conf.Options.SourceAddress == "" || conf.Options.TargetAddress == "") { + if tp == TypeRump && (len(conf.Options.SourceAddress) == 0 || conf.Options.TargetAddress == "") { return fmt.Errorf("source and target address shouldn't be empty when type in {rump}") } + if tp == TypeDump && conf.Options.OutputRdb == "" { + conf.Options.OutputRdb = "output-rdb-dump" + } if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" { return fmt.Errorf("only one of source password_raw or password_encoding should be given") @@ -210,6 +213,10 @@ func sanitizeOptions(tp string) error { conf.Options.SourcePasswordRaw = string(sourcePassword) } + if conf.Options.SourceParallel == 0 || conf.Options.SourceParallel > uint(len(conf.Options.SourceAddress)) { + conf.Options.SourceParallel = uint(len(conf.Options.SourceAddress)) + } + if conf.Options.TargetPasswordRaw != "" && conf.Options.TargetPasswordEncoding != "" { return fmt.Errorf("only one of target password_raw or password_encoding should be given") } else if conf.Options.TargetPasswordEncoding != "" { diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 241bc70..5106899 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -33,8 +33,9 @@ func (cr *CmdRump) GetDetailedInfo() []interface{} { func (cr *CmdRump) Main() { // build connection - cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType, - conf.Options.SourcePasswordRaw) + // todo + //cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType, + // conf.Options.SourcePasswordRaw) cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index d585577..dfe94fc 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -93,6 +93,7 @@ func (cmd *CmdSync) GetDetailedInfo() []interface{} { } func (cmd *CmdSync) Main() { + // todo from, target := conf.Options.SourceAddress, conf.Options.TargetAddress log.Infof("sync from '%s' to '%s' with http-port[%d]\n", from, target, conf.Options.HttpProfile) @@ -108,9 +109,11 @@ func (cmd *CmdSync) Main() { var input io.ReadCloser var nsize int64 if conf.Options.Psync { - input, nsize = cmd.SendPSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + // todo + // input, nsize = cmd.SendPSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) } else { - input, nsize = cmd.SendSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + // todo + // input, nsize = cmd.SendSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) } defer input.Close() @@ -377,7 +380,9 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd return } - srcConn := utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + // todo + // srcConn := utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + srcConn := utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress[0], conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) ticker := time.NewTicker(10 * time.Second) for range ticker.C { @@ -388,10 +393,14 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd // Reconnect while network error happen if err == io.EOF { - srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + // todo + // srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress[0], conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) } else if _, ok := err.(net.Error); ok { - srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + // todo + // srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress[0], conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) } } else { diff --git a/src/vendor/vendor.json b/src/vendor/vendor.json index 3d15e8e..92a5ba2 100644 --- a/src/vendor/vendor.json +++ b/src/vendor/vendor.json @@ -62,5 +62,6 @@ "revision": "a96e63847dc3c67d17befa69c303767e2f84e54f", "revisionTime": "2017-05-31T16:03:50Z" } - ] + ], + "rootPath": "/Users/vinllen-ali/code/redis-shake-inner/redis-shake/src" } From e7b3356b06bcc86b58b66cda45394a24f1e57cf5 Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 30 Apr 2019 11:16:52 +0800 Subject: [PATCH 05/11] modify source cluster of rump --- src/redis-shake/main/main.go | 5 +++ src/redis-shake/rump.go | 67 ++++++++++++++++++++---------- src/redis-shake/scanner/scanner.go | 30 ++++++++----- 3 files changed, 68 insertions(+), 34 deletions(-) diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 5e9c2e5..afb3bc9 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -370,6 +370,11 @@ func sanitizeOptions(tp string) error { return fmt.Errorf("scan.special_cloud[%v] and scan.key_file[%v] cann't be given at the same time", conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) } + + if (conf.Options.ScanSpecialCloud != "" || conf.Options.ScanKeyFile != "") && len(conf.Options.SourceAddress) > 1 { + return fmt.Errorf("source address should <= 1 when scan.special_cloud[%v] or scan.key_file[%v] given", + conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) + } } return nil diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 5106899..01e1786 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -9,16 +9,18 @@ import ( "github.com/garyburd/redigo/redis" "redis-shake/scanner" + "sync" ) type CmdRump struct { - sourceConn redis.Conn + sourceConn []redis.Conn targetConn redis.Conn keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2 resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3 - scanner scanner.Scanner + scanners []scanner.Scanner // one scanner match one db/proxy + fetcherWg sync.WaitGroup } type KeyNode struct { @@ -33,34 +35,53 @@ func (cr *CmdRump) GetDetailedInfo() []interface{} { func (cr *CmdRump) Main() { // build connection - // todo - //cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType, - // conf.Options.SourcePasswordRaw) + + cr.sourceConn = make([]redis.Conn, len(conf.Options.SourceAddress)) + for i, address := range conf.Options.SourceAddress { + cr.sourceConn[i] = utils.OpenRedisConn(address, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + } cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) // init two channels - cr.keyChan = make(chan *KeyNode, conf.Options.ScanKeyNumber) - cr.resultChan = make(chan *KeyNode, conf.Options.ScanKeyNumber) - - cr.scanner = scanner.NewScanner(cr.sourceConn) + chanSize := int(conf.Options.ScanKeyNumber) * len(conf.Options.SourceAddress) + cr.keyChan = make(chan *KeyNode, chanSize) + cr.resultChan = make(chan *KeyNode, chanSize) + + cr.scanners = scanner.NewScanner(cr.sourceConn) + if cr.scanners == nil || len(cr.scanners) == 0 { + log.Panic("create scanner failed") + return + } /* - * we start 3 routines to run: + * we start 4 routines to run: * 1. fetch keys from the source redis - * 2. write keys into the target redis - * 3. read result from the target redis + * 2. wait fetcher all exit + * 3. write keys into the target redis + * 4. read result from the target redis */ // routine1 - go cr.fetcher() + cr.fetcherWg.Add(len(cr.scanners)) + for i := range cr.scanners { + go cr.fetcher(i) + } + // routine2 - go cr.writer() + go func() { + cr.fetcherWg.Wait() + close(cr.keyChan) + }() + // routine3 + go cr.writer() + + // routine4 cr.receiver() } -func (cr *CmdRump) fetcher() { - length, err := cr.scanner.NodeCount() +func (cr *CmdRump) fetcher(idx int) { + length, err := cr.scanners[idx].NodeCount() if err != nil || length <= 0 { log.Panicf("fetch db node failed: length[%v], error[%v]", length, err) } @@ -71,7 +92,7 @@ func (cr *CmdRump) fetcher() { for i := 0; i < length; i++ { // fetch data from on node for { - keys, err := cr.scanner.ScanKey(i) + keys, err := cr.scanners[idx].ScanKey(i) if err != nil { log.Panic(err) } @@ -82,18 +103,18 @@ func (cr *CmdRump) fetcher() { // pipeline dump for _, key := range keys { log.Debug("scan key: ", key) - cr.sourceConn.Send("DUMP", key) + cr.sourceConn[idx].Send("DUMP", key) } - dumps, err := redis.Strings(cr.sourceConn.Do("")) + dumps, err := redis.Strings(cr.sourceConn[idx].Do("")) if err != nil && err != redis.ErrNil { log.Panicf("do dump with failed[%v]", err) } // pipeline ttl for _, key := range keys { - cr.sourceConn.Send("PTTL", key) + cr.sourceConn[idx].Send("PTTL", key) } - pttls, err := redis.Int64s(cr.sourceConn.Do("")) + pttls, err := redis.Int64s(cr.sourceConn[idx].Do("")) if err != nil && err != redis.ErrNil { log.Panicf("do ttl with failed[%v]", err) } @@ -104,13 +125,13 @@ func (cr *CmdRump) fetcher() { } // Last iteration of scan. - if cr.scanner.EndNode() { + if cr.scanners[idx].EndNode() { break } } } - close(cr.keyChan) + cr.fetcherWg.Done() } func (cr *CmdRump) writer() { diff --git a/src/redis-shake/scanner/scanner.go b/src/redis-shake/scanner/scanner.go index be29d2a..c33d498 100644 --- a/src/redis-shake/scanner/scanner.go +++ b/src/redis-shake/scanner/scanner.go @@ -32,27 +32,35 @@ type Scanner interface { Close() } -func NewScanner(client redis.Conn) Scanner { +func NewScanner(client []redis.Conn) []Scanner { if conf.Options.ScanSpecialCloud != "" { - return &SpecialCloudScanner{ - client: client, - cursor: 0, + return []Scanner { + &SpecialCloudScanner{ + client: client[0], + cursor: 0, + }, } } else if conf.Options.ScanKeyFile != "" { if f, err := os.Open(conf.Options.ScanKeyFile); err != nil { log.Errorf("open scan-key-file[%v] error[%v]", conf.Options.ScanKeyFile, err) return nil } else { - return &KeyFileScanner{ - f: f, - bufScan: bufio.NewScanner(f), - cnt: -1, + return []Scanner { + &KeyFileScanner{ + f: f, + bufScan: bufio.NewScanner(f), + cnt: -1, + }, } } } else { - return &NormalScanner{ - client: client, - cursor: 0, + ret := make([]Scanner, 0, len(client)) + for _, c := range client { + ret = append(ret, &NormalScanner{ + client: c, + cursor: 0, + }) } + return ret } } From fd638c87816eef9161e7799b85f37f4b00ffef5b Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 30 Apr 2019 15:14:28 +0800 Subject: [PATCH 06/11] modify source cluster of decode --- conf/redis-shake.conf | 4 +- src/redis-shake/configure/configure.go | 2 +- src/redis-shake/decode.go | 38 +++++++------------ src/redis-shake/main/main.go | 3 ++ src/redis-shake/restore.go | 51 +++++++++++--------------- 5 files changed, 42 insertions(+), 56 deletions(-) diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index c2a2686..7838124 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -25,7 +25,9 @@ parallel = 32 # input RDB file. read from stdin, default is stdin ('/dev/stdin'). # used in `decode` and `restore`. -# 如果是decode或者restore,这个参数表示读取的rdb文件 +# if the input is list split by semicolon(;), redis-shake will restore the list one by one. +# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 +# redis-shake将会挨个进行恢复。 input_rdb = local # output RDB file prefix. diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 6035480..321066d 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -11,7 +11,7 @@ type Configuration struct { HttpProfile int `config:"http_profile"` NCpu int `config:"ncpu"` Parallel int `config:"parallel"` - InputRdb string `config:"input_rdb"` + InputRdb []string `config:"input_rdb"` OutputRdb string `config:"output_rdb"` SourceAddress []string `config:"source.address"` SourcePasswordRaw string `config:"source.password_raw"` diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index c8cc52b..dc2fa56 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -9,8 +9,6 @@ import ( "encoding/base64" "encoding/json" "fmt" - "io" - "os" "time" "pkg/libs/atomic2" @@ -42,32 +40,23 @@ func (cmd *CmdDecode) GetDetailedInfo() []interface{} { } func (cmd *CmdDecode) Main() { - input, output := conf.Options.InputRdb, conf.Options.OutputRdb - if len(input) == 0 { - input = "/dev/stdin" - } - if len(output) == 0 { - output = "/dev/stdout" + log.Infof("decode from '%s' to '%s'\n", conf.Options.InputRdb, conf.Options.OutputRdb) + + for i, input := range conf.Options.InputRdb { + // decode one by one + output := fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i) + cmd.decode(input, output) } - log.Infof("decode from '%s' to '%s'\n", input, output) + log.Info("decode: done") +} - var readin io.ReadCloser - var nsize int64 - if input != "/dev/stdin" { - readin, nsize = utils.OpenReadFile(input) - defer readin.Close() - } else { - readin, nsize = os.Stdin, 0 - } +func (cmd *CmdDecode) decode(input, output string) { + readin, nsize := utils.OpenReadFile(input) + defer readin.Close() - var saveto io.WriteCloser - if output != "/dev/stdout" { - saveto = utils.OpenWriteFile(output) - defer saveto.Close() - } else { - saveto = os.Stdout - } + saveto := utils.OpenWriteFile(output) + defer saveto.Close() reader := bufio.NewReaderSize(readin, utils.ReaderBufferSize) writer := bufio.NewWriterSize(saveto, utils.WriterBufferSize) @@ -121,7 +110,6 @@ func (cmd *CmdDecode) Main() { fmt.Fprintf(&b, " entry=%-12d", stat.nentry) log.Info(b.String()) } - log.Info("decode: done") } func (cmd *CmdDecode) decoderMain(ipipe <-chan *rdb.BinEntry, opipe chan<- string) { diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index afb3bc9..a329469 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -202,6 +202,9 @@ func sanitizeOptions(tp string) error { if tp == TypeRump && (len(conf.Options.SourceAddress) == 0 || conf.Options.TargetAddress == "") { return fmt.Errorf("source and target address shouldn't be empty when type in {rump}") } + if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.InputRdb) == 0 { + return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}") + } if tp == TypeDump && conf.Options.OutputRdb == "" { conf.Options.OutputRdb = "output-rdb-dump" } diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index 26c9de5..8399260 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -7,9 +7,7 @@ import ( "bufio" "bytes" "fmt" - "io" "io/ioutil" - "os" "time" "pkg/libs/atomic2" @@ -50,45 +48,40 @@ func (cmd *CmdRestore) GetDetailedInfo() []interface{} { } func (cmd *CmdRestore) Main() { - input, target := conf.Options.InputRdb, conf.Options.TargetAddress - if len(target) == 0 { - log.Panic("invalid argument: target") - } - if len(input) == 0 { - input = "/dev/stdin" - } - - log.Infof("restore from '%s' to '%s'\n", input, target) + log.Infof("restore from '%s' to '%s'\n", conf.Options.InputRdb, conf.Options.TargetAddress) base.Status = "waitRestore" - var readin io.ReadCloser - var nsize int64 - if input != "/dev/stdin" { - readin, nsize = utils.OpenReadFile(input) - defer readin.Close() - } else { - readin, nsize = os.Stdin, 0 + for _, input := range conf.Options.InputRdb { + // restore one by one + cmd.restore(input) } + if conf.Options.HttpProfile > 0 { + //fake status if set http_port. and wait forever + base.Status = "incr" + log.Infof("Enabled http stats, set status (incr), and wait forever.") + select{} + } +} + +func (cmd *CmdRestore) restore(input string) { + readin, nsize := utils.OpenReadFile(input) + defer readin.Close() base.Status = "restore" + reader := bufio.NewReaderSize(readin, utils.ReaderBufferSize) - cmd.RestoreRDBFile(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, nsize) + cmd.restoreRDBFile(reader, conf.Options.TargetAddress, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, + nsize) base.Status = "extra" if conf.Options.ExtraInfo && (nsize == 0 || nsize != cmd.rbytes.Get()) { - cmd.RestoreCommand(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) - } - - if conf.Options.HttpProfile > 0 { - //fake status if set http_port. and wait forever - base.Status = "incr" - log.Infof("Enabled http stats, set status (incr), and wait forever.") - select{} + cmd.restoreCommand(reader, conf.Options.TargetAddress, conf.Options.TargetAuthType, + conf.Options.TargetPasswordRaw) } } -func (cmd *CmdRestore) RestoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { +func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { pipe := utils.NewRDBLoader(reader, &cmd.rbytes, base.RDBPipeSize) wait := make(chan struct{}) go func() { @@ -150,7 +143,7 @@ func (cmd *CmdRestore) RestoreRDBFile(reader *bufio.Reader, target, auth_type, p log.Info("restore: rdb done") } -func (cmd *CmdRestore) RestoreCommand(reader *bufio.Reader, target, auth_type, passwd string) { +func (cmd *CmdRestore) restoreCommand(reader *bufio.Reader, target, auth_type, passwd string) { c := utils.OpenNetConn(target, auth_type, passwd) defer c.Close() From 2198d0320745852ed1de18cea4a1882d15788ed2 Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 7 May 2019 22:37:05 +0800 Subject: [PATCH 07/11] support fetching from cluster --- README.md | 2 + src/redis-shake/base/runner.go | 2 +- src/redis-shake/common/common.go | 7 +- src/redis-shake/decode.go | 2 +- src/redis-shake/dump.go | 2 +- src/redis-shake/metric/metric.go | 19 +- src/redis-shake/metric/variables.go | 75 +++--- src/redis-shake/restore.go | 2 +- src/redis-shake/rump.go | 2 +- src/redis-shake/sync.go | 393 +++++++++++++++++----------- 10 files changed, 309 insertions(+), 197 deletions(-) diff --git a/README.md b/README.md index ae91ba2..894dd36 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ RedisShake is mainly used to synchronize data from one redis database to another Thanks to the Douyu's WSD team for the support.
* [中文文档](https://yq.aliyun.com/articles/691794) +* [English tutorial](https://github.com/alibaba/RedisShake/wiki/tutorial-about-how-to-set-up) +* [中文使用文档](https://github.com/alibaba/RedisShake/wiki/%E7%AC%AC%E4%B8%80%E6%AC%A1%E4%BD%BF%E7%94%A8%EF%BC%8C%E5%A6%82%E4%BD%95%E8%BF%9B%E8%A1%8C%E9%85%8D%E7%BD%AE%EF%BC%9F) # Redis-Shake --- diff --git a/src/redis-shake/base/runner.go b/src/redis-shake/base/runner.go index fc7efbf..70742d1 100644 --- a/src/redis-shake/base/runner.go +++ b/src/redis-shake/base/runner.go @@ -12,5 +12,5 @@ var( type Runner interface{ Main() - GetDetailedInfo() []interface{} + GetDetailedInfo() interface{} } \ No newline at end of file diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 24a1288..88e7219 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -7,7 +7,8 @@ import ( "bytes" "pkg/libs/bytesize" - + "redis-shake/configure" + logRotate "gopkg.in/natefinch/lumberjack.v2" ) @@ -66,4 +67,8 @@ func ParseInfo(content []byte) map[string]string { result[string(items[0])] = string(items[1]) } return result +} + +func GetTotalLink() int { + return len(conf.Options.SourceAddress) } \ No newline at end of file diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index dc2fa56..09aee6b 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -35,7 +35,7 @@ func (cmd *CmdDecode) Stat() *cmdDecodeStat { } } -func (cmd *CmdDecode) GetDetailedInfo() []interface{} { +func (cmd *CmdDecode) GetDetailedInfo() interface{} { return nil } diff --git a/src/redis-shake/dump.go b/src/redis-shake/dump.go index 6517183..b3e5b27 100644 --- a/src/redis-shake/dump.go +++ b/src/redis-shake/dump.go @@ -26,7 +26,7 @@ type node struct { output string } -func (cmd *CmdDump) GetDetailedInfo() []interface{} { +func (cmd *CmdDump) GetDetailedInfo() interface{} { return nil } diff --git a/src/redis-shake/metric/metric.go b/src/redis-shake/metric/metric.go index 5b43f80..d085741 100644 --- a/src/redis-shake/metric/metric.go +++ b/src/redis-shake/metric/metric.go @@ -9,6 +9,7 @@ import ( "redis-shake/base" "redis-shake/configure" "pkg/libs/log" + "sync" ) const ( @@ -16,7 +17,7 @@ const ( ) var ( - MetricVar *Metric + MetricMap = new(sync.Map) runner base.Runner ) @@ -92,9 +93,21 @@ type Metric struct { func CreateMetric(r base.Runner) { runner = r - MetricVar = &Metric{} +} + +func AddMetric(id int) { + if _, ok := MetricMap.Load(id); ok { + return + } + + singleMetric := new(Metric) + MetricMap.Store(id, singleMetric) + go singleMetric.run() +} - go MetricVar.run() +func GetMetric(id int) *Metric { + metric, _ := MetricMap.Load(id) + return metric.(*Metric) } func (m *Metric) resetEverySecond(items []Op) { diff --git a/src/redis-shake/metric/variables.go b/src/redis-shake/metric/variables.go index 619e7d3..3f9d915 100644 --- a/src/redis-shake/metric/variables.go +++ b/src/redis-shake/metric/variables.go @@ -1,6 +1,6 @@ package metric -import( +import ( "fmt" "redis-shake/base" "redis-shake/common" @@ -28,39 +28,48 @@ type MetricRest struct { ProcessingCmdCount interface{} // length of delay channel TargetDBOffset interface{} // target redis offset SourceDBOffset interface{} // source redis offset + SourceAddress interface{} + TargetAddress interface{} } -func NewMetricRest() *MetricRest { - detailedInfo := runner.GetDetailedInfo() - if len(detailedInfo) < 4 { - return &MetricRest{} - } - senderBufCount := detailedInfo[0] - processingCmdCount := detailedInfo[1] - targetDbOffset := detailedInfo[2] - sourceDbOffset := detailedInfo[3] +func NewMetricRest() []MetricRest { + detailMapList := runner.GetDetailedInfo().([]map[string]interface{}) - return &MetricRest{ - StartTime: utils.StartTime, - PullCmdCount: MetricVar.GetPullCmdCount(), - PullCmdCountTotal: MetricVar.GetPullCmdCountTotal(), - BypassCmdCount: MetricVar.GetBypassCmdCount(), - BypassCmdCountTotal: MetricVar.GetBypassCmdCountTotal(), - PushCmdCount: MetricVar.GetPushCmdCount(), - PushCmdCountTotal: MetricVar.GetPushCmdCountTotal(), - SuccessCmdCount: MetricVar.GetSuccessCmdCount(), - SuccessCmdCountTotal: MetricVar.GetSuccessCmdCountTotal(), - FailCmdCount: MetricVar.GetFailCmdCount(), - FailCmdCountTotal: MetricVar.GetFailCmdCountTotal(), - Delay: fmt.Sprintf("%s ms", MetricVar.GetDelay()), - AvgDelay: fmt.Sprintf("%s ms", MetricVar.GetAvgDelay()), - NetworkSpeed: MetricVar.GetNetworkFlow(), - NetworkFlowTotal: MetricVar.GetNetworkFlowTotal(), - FullSyncProgress: MetricVar.GetFullSyncProgress(), - Status: base.Status, - SenderBufCount: senderBufCount, - ProcessingCmdCount: processingCmdCount, - TargetDBOffset: targetDbOffset, - SourceDBOffset: sourceDbOffset, + total := utils.GetTotalLink() + ret := make([]MetricRest, total) + for i := 0; i < total; i++ { + val, ok := MetricMap.Load(i) + if !ok { + continue + } + singleMetric := val.(*Metric) + detailMap := detailMapList[i] + ret[i] = MetricRest{ + StartTime: utils.StartTime, + PullCmdCount: singleMetric.GetPullCmdCount(), + PullCmdCountTotal: singleMetric.GetPullCmdCountTotal(), + BypassCmdCount: singleMetric.GetBypassCmdCount(), + BypassCmdCountTotal: singleMetric.GetBypassCmdCountTotal(), + PushCmdCount: singleMetric.GetPushCmdCount(), + PushCmdCountTotal: singleMetric.GetPushCmdCountTotal(), + SuccessCmdCount: singleMetric.GetSuccessCmdCount(), + SuccessCmdCountTotal: singleMetric.GetSuccessCmdCountTotal(), + FailCmdCount: singleMetric.GetFailCmdCount(), + FailCmdCountTotal: singleMetric.GetFailCmdCountTotal(), + Delay: fmt.Sprintf("%s ms", singleMetric.GetDelay()), + AvgDelay: fmt.Sprintf("%s ms", singleMetric.GetAvgDelay()), + NetworkSpeed: singleMetric.GetNetworkFlow(), + NetworkFlowTotal: singleMetric.GetNetworkFlowTotal(), + FullSyncProgress: singleMetric.GetFullSyncProgress(), + Status: base.Status, + SenderBufCount: detailMap["SenderBufCount"], + ProcessingCmdCount: detailMap["ProcessingCmdCount"], + TargetDBOffset: detailMap["TargetDBOffset"], + SourceDBOffset: detailMap["SourceDBOffset"], + SourceAddress: detailMap["SourceAddress"], + TargetAddress: detailMap["TargetAddress"], + } } -} \ No newline at end of file + + return ret +} diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index 8399260..7878fa7 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -43,7 +43,7 @@ func (cmd *CmdRestore) Stat() *cmdRestoreStat { } } -func (cmd *CmdRestore) GetDetailedInfo() []interface{} { +func (cmd *CmdRestore) GetDetailedInfo() interface{} { return nil } diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 01e1786..0700d39 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -29,7 +29,7 @@ type KeyNode struct { pttl int64 } -func (cr *CmdRump) GetDetailedInfo() []interface{} { +func (cr *CmdRump) GetDetailedInfo() interface{} { return nil } diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index dfe94fc..27907ff 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -13,6 +13,8 @@ import ( "strconv" "strings" "time" + "unsafe" + "sync" "pkg/libs/atomic2" "pkg/libs/io/pipe" @@ -24,7 +26,6 @@ import ( "redis-shake/base" "redis-shake/heartbeat" "redis-shake/metric" - "unsafe" ) type delayNode struct { @@ -32,30 +33,7 @@ type delayNode struct { id int64 // id } -type CmdSync struct { - rbytes, wbytes, nentry, ignore atomic2.Int64 - - forward, nbypass atomic2.Int64 - - targetOffset atomic2.Int64 - sourceOffset int64 - - /* - * this channel is used to calculate delay between redis-shake and target redis. - * Once oplog sent, the corresponding delayNode push back into this queue. Next time - * receive reply from target redis, the front node poped and then delay calculated. - */ - delayChannel chan *delayNode - - // sending queue - sendBuf chan cmdDetail - - wait_full chan struct{} - - status string -} - -type cmdSyncStat struct { +type syncerStat struct { rbytes, wbytes, nentry, ignore int64 forward, nbypass int64 @@ -74,31 +52,148 @@ func (c *cmdDetail) String() string { return str } -func (cmd *CmdSync) Stat() *cmdSyncStat { - return &cmdSyncStat{ - rbytes: cmd.rbytes.Get(), - wbytes: cmd.wbytes.Get(), - nentry: cmd.nentry.Get(), - ignore: cmd.ignore.Get(), +// main struct +type CmdSync struct { + dbSyncers []*dbSyncer +} - forward: cmd.forward.Get(), - nbypass: cmd.nbypass.Get(), +// return send buffer length, delay channel length, target db offset +func (cmd *CmdSync) GetDetailedInfo() interface{} { + ret := make([]map[string]interface{}, len(cmd.dbSyncers)) + for i, syncer := range cmd.dbSyncers { + ret[i] = syncer.GetExtraInfo() } + return ret } -// return send buffer length, delay channel length, target db offset -func (cmd *CmdSync) GetDetailedInfo() []interface{} { +func (cmd *CmdSync) Main() { + type syncNode struct { + id int + source string + sourcePassword string + target string + targetPassword string + } + + // source redis number + total := utils.GetTotalLink() + syncChan := make(chan syncNode, total) + cmd.dbSyncers = make([]*dbSyncer, total) + for i, source := range conf.Options.SourceAddress { + nd := syncNode{ + id: i, + source: source, + sourcePassword: conf.Options.SourcePasswordRaw, + target: conf.Options.TargetAddress, // todo, target address load balance + targetPassword: conf.Options.TargetPasswordRaw, + } + syncChan <- nd + } + + var wg sync.WaitGroup + wg.Add(len(conf.Options.SourceAddress)) + + for i := 0; i < int(conf.Options.SourceParallel); i++ { + go func() { + for { + nd, ok := <-syncChan + if !ok { + break + } - return []interface{}{len(cmd.sendBuf), len(cmd.delayChannel), cmd.targetOffset.Get(), cmd.sourceOffset} + ds := NewDbSyncer(nd.id, nd.source, nd.sourcePassword, nd.target, nd.targetPassword, + conf.Options.HttpProfile + i) + cmd.dbSyncers[nd.id] = ds + log.Infof("routine[%v] starts syncing data from %v to %v with http[%v]", + ds.id, ds.source, ds.target, ds.httpProfilePort) + // run in routine + go ds.sync() + + // wait full sync done + <-ds.waitFull + + wg.Done() + } + }() + } + + wg.Wait() + close(syncChan) + + // never quit because increment syncing is still running + select{} } -func (cmd *CmdSync) Main() { - // todo - from, target := conf.Options.SourceAddress, conf.Options.TargetAddress +/*------------------------------------------------------*/ +// one sync tunnel corresponding to one dbSyncer +func NewDbSyncer(id int, source, sourcePassword, target, targetPassword string, httpPort int) *dbSyncer { + ds := &dbSyncer{ + id: id, + source: source, + sourcePassword: sourcePassword, + target: target, + targetPassword: targetPassword, + httpProfilePort: httpPort, + waitFull: make(chan struct{}), + } + + // add metric + metric.AddMetric(id) + + return ds +} + +type dbSyncer struct { + id int // current id in all syncer + + source string // source address + sourcePassword string // source password + target string // target address + targetPassword string // target password + + httpProfilePort int // http profile port + + // metric info + rbytes, wbytes, nentry, ignore atomic2.Int64 + forward, nbypass atomic2.Int64 + targetOffset atomic2.Int64 + sourceOffset int64 - log.Infof("sync from '%s' to '%s' with http-port[%d]\n", from, target, conf.Options.HttpProfile) - cmd.wait_full = make(chan struct{}) + /* + * this channel is used to calculate delay between redis-shake and target redis. + * Once oplog sent, the corresponding delayNode push back into this queue. Next time + * receive reply from target redis, the front node poped and then delay calculated. + */ + delayChannel chan *delayNode + + sendBuf chan cmdDetail // sending queue + waitFull chan struct{} // wait full sync done +} + +func (ds *dbSyncer) GetExtraInfo() map[string]interface{} { + return map[string]interface{}{ + "SourceAddress": ds.source, + "TargetAddress": ds.target, + "SenderBufCount": len(ds.sendBuf), + "ProcessingCmdCount": len(ds.delayChannel), + "TargetDBOffset": ds.targetOffset.Get(), + "SourceDBOffset": ds.sourceOffset, + } +} + +func (ds *dbSyncer) Stat() *syncerStat { + return &syncerStat{ + rbytes: ds.rbytes.Get(), + wbytes: ds.wbytes.Get(), + nentry: ds.nentry.Get(), + ignore: ds.ignore.Get(), + + forward: ds.forward.Get(), + nbypass: ds.nbypass.Get(), + } +} +func (ds *dbSyncer) sync() { var sockfile *os.File if len(conf.Options.SockFileName) != 0 { sockfile = utils.OpenReadWriteFile(conf.Options.SockFileName) @@ -109,15 +204,13 @@ func (cmd *CmdSync) Main() { var input io.ReadCloser var nsize int64 if conf.Options.Psync { - // todo - // input, nsize = cmd.SendPSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + input, nsize = ds.sendPSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword) } else { - // todo - // input, nsize = cmd.SendSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + input, nsize = ds.sendSyncCmd(ds.source, conf.Options.SourceAuthType, ds.sourcePassword) } defer input.Close() - log.Infof("rdb file size = %d\n", nsize) + log.Infof("dbSyncer[%v] rdb file size = %d\n", ds.id, nsize) if sockfile != nil { r, w := pipe.NewFilePipe(int(conf.Options.SockFileSize), sockfile) @@ -145,47 +238,47 @@ func (cmd *CmdSync) Main() { // sync rdb base.Status = "full" - cmd.SyncRDBFile(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, nsize) + ds.syncRDBFile(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, nsize) // sync increment base.Status = "incr" - close(cmd.wait_full) - cmd.SyncCommand(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) + close(ds.waitFull) + ds.syncCommand(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword) } -func (cmd *CmdSync) SendSyncCmd(master, auth_type, passwd string) (net.Conn, int64) { +func (ds *dbSyncer) sendSyncCmd(master, auth_type, passwd string) (net.Conn, int64) { c, wait := utils.OpenSyncConn(master, auth_type, passwd) for { select { case nsize := <-wait: if nsize == 0 { - log.Info("+") + log.Infof("dbSyncer[%v] +", ds.id) } else { return c, nsize } case <-time.After(time.Second): - log.Info("-") + log.Infof("dbSyncer[%v] -", ds.id) } } } -func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, int64) { +func (ds *dbSyncer) sendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, int64) { c := utils.OpenNetConn(master, auth_type, passwd) - log.Infof("psync connect '%v' with auth type[%v] OK!", master, auth_type) + log.Infof("dbSyncer[%v] psync connect '%v' with auth type[%v] OK!", ds.id, master, auth_type) utils.SendPSyncListeningPort(c, conf.Options.HttpProfile) - log.Infof("psync send listening port[%v] OK!", conf.Options.HttpProfile) + log.Infof("dbSyncer[%v] psync send listening port[%v] OK!", ds.id, conf.Options.HttpProfile) // reader buffer bind to client br := bufio.NewReaderSize(c, utils.ReaderBufferSize) // writer buffer bind to client bw := bufio.NewWriterSize(c, utils.WriterBufferSize) - log.Infof("try to send 'psync' command") + log.Infof("dbSyncer[%v] try to send 'psync' command", ds.id) // send psync command and decode the result runid, offset, wait := utils.SendPSyncFullsync(br, bw) - cmd.targetOffset.Set(offset) - log.Infof("psync runid = %s offset = %d, fullsync", runid, offset) + ds.targetOffset.Set(offset) + log.Infof("dbSyncer[%v] psync runid = %s offset = %d, fullsync", ds.id, runid, offset) // get rdb file size var nsize int64 @@ -193,10 +286,10 @@ func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, select { case nsize = <-wait: if nsize == 0 { - log.Info("+") + log.Infof("dbSyncer[%v] +", ds.id) } case <-time.After(time.Second): - log.Info("-") + log.Infof("dbSyncer[%v] -", ds.id) } } @@ -217,30 +310,32 @@ func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, * read from br(source redis) and write into pipew. * Generally speaking, this function is forever run. */ - n, err := cmd.PSyncPipeCopy(c, br, bw, offset, pipew) + n, err := ds.pSyncPipeCopy(c, br, bw, offset, pipew) if err != nil { - log.PanicErrorf(err, "psync runid = %s, offset = %d, pipe is broken", runid, offset) + log.PanicErrorf(err, "dbSyncer[%v] psync runid = %s, offset = %d, pipe is broken", + ds.id, runid, offset) } // the 'c' is closed every loop offset += n - cmd.targetOffset.Set(offset) + ds.targetOffset.Set(offset) // reopen 'c' every time for { - // cmd.SyncStat.SetStatus("reopen") + // ds.SyncStat.SetStatus("reopen") base.Status = "reopen" time.Sleep(time.Second) c = utils.OpenNetConnSoft(master, auth_type, passwd) if c != nil { // log.PurePrintf("%s\n", NewLogItem("SourceConnReopenSuccess", "INFO", LogDetail{Info: strconv.FormatInt(offset, 10)})) - log.Infof("Event:SourceConnReopenSuccess\tId: %s\toffset = %d", conf.Options.Id, offset) - // cmd.SyncStat.SetStatus("incr") + log.Infof("dbSyncer[%v] Event:SourceConnReopenSuccess\tId: %s\toffset = %d", + ds.id, conf.Options.Id, offset) + // ds.SyncStat.SetStatus("incr") base.Status = "incr" break } else { // log.PurePrintf("%s\n", NewLogItem("SourceConnReopenFail", "WARN", NewErrorLogDetail("", ""))) - log.Errorf("Event:SourceConnReopenFail\tId: %s", conf.Options.Id) + log.Errorf("dbSyncer[%v] Event:SourceConnReopenFail\tId: %s", ds.id, conf.Options.Id) } } utils.AuthPassword(c, auth_type, passwd) @@ -253,7 +348,7 @@ func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, return piper, nsize } -func (cmd *CmdSync) PSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer, offset int64, copyto io.Writer) (int64, error) { +func (ds *dbSyncer) pSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer, offset int64, copyto io.Writer) (int64, error) { // TODO, two times call c.Close() ? maybe a bug defer c.Close() var nread atomic2.Int64 @@ -262,7 +357,7 @@ func (cmd *CmdSync) PSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer for { time.Sleep(time.Second * 1) select { - case <-cmd.wait_full: + case <-ds.waitFull: if err := utils.SendPSyncAck(bw, offset+nread.Get()); err != nil { return } @@ -287,8 +382,8 @@ func (cmd *CmdSync) PSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer } } -func (cmd *CmdSync) SyncRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { - pipe := utils.NewRDBLoader(reader, &cmd.rbytes, base.RDBPipeSize) +func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { + pipe := utils.NewRDBLoader(reader, &ds.rbytes, base.RDBPipeSize) wait := make(chan struct{}) go func() { defer close(wait) @@ -303,9 +398,9 @@ func (cmd *CmdSync) SyncRDBFile(reader *bufio.Reader, target, auth_type, passwd var lastdb uint32 = 0 for e := range pipe { if !base.AcceptDB(e.DB) { - cmd.ignore.Incr() + ds.ignore.Incr() } else { - cmd.nentry.Incr() + ds.nentry.Incr() if conf.Options.TargetDB != -1 { if conf.Options.TargetDB != int(lastdb) { lastdb = uint32(conf.Options.TargetDB) @@ -345,7 +440,7 @@ func (cmd *CmdSync) SyncRDBFile(reader *bufio.Reader, target, auth_type, passwd } }() - var stat *cmdSyncStat + var stat *syncerStat for done := false; !done; { select { @@ -353,67 +448,63 @@ func (cmd *CmdSync) SyncRDBFile(reader *bufio.Reader, target, auth_type, passwd done = true case <-time.After(time.Second): } - stat = cmd.Stat() + stat = ds.Stat() var b bytes.Buffer - fmt.Fprintf(&b, "total=%d - %12d [%3d%%]", nsize, stat.rbytes, 100*stat.rbytes/nsize) - fmt.Fprintf(&b, " entry=%-12d", stat.nentry) + fmt.Fprintf(&b, "dbSyncer[%v] total=%d - %12d [%3d%%] entry=%-12d", + ds.id, nsize, stat.rbytes, 100*stat.rbytes/nsize, stat.nentry) if stat.ignore != 0 { fmt.Fprintf(&b, " ignore=%-12d", stat.ignore) } log.Info(b.String()) - metric.MetricVar.SetFullSyncProgress(uint64(100 * stat.rbytes / nsize)) + metric.GetMetric(ds.id).SetFullSyncProgress(uint64(100 * stat.rbytes / nsize)) } - log.Info("sync rdb done") + log.Infof("dbSyncer[%v] sync rdb done", ds.id) } -func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd string) { +func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target, auth_type, passwd string) { c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) defer c.Close() - cmd.sendBuf = make(chan cmdDetail, conf.Options.SenderCount) - cmd.delayChannel = make(chan *delayNode, conf.Options.SenderDelayChannelSize) + ds.sendBuf = make(chan cmdDetail, conf.Options.SenderCount) + ds.delayChannel = make(chan *delayNode, conf.Options.SenderDelayChannelSize) var sendId, recvId, sendMarkId atomic2.Int64 // sendMarkId is also used as mark the sendId in sender routine go func() { if conf.Options.Psync == false { - log.Warn("GetFakeSlaveOffset not enable when psync == false") + log.Warnf("dbSyncer[%v] GetFakeSlaveOffset not enable when psync == false", ds.id) return } - // todo - // srcConn := utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, - srcConn := utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress[0], conf.Options.SourceAuthType, - conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) + srcConn := utils.OpenRedisConnWithTimeout(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, + time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) ticker := time.NewTicker(10 * time.Second) for range ticker.C { offset, err := utils.GetFakeSlaveOffset(srcConn) if err != nil { // log.PurePrintf("%s\n", NewLogItem("GetFakeSlaveOffsetFail", "WARN", NewErrorLogDetail("", err.Error()))) - log.Warnf("Event:GetFakeSlaveOffsetFail\tId:%s\tWarn:%s", conf.Options.Id, err.Error()) + log.Warnf("dbSyncer[%v] Event:GetFakeSlaveOffsetFail\tId:%s\tWarn:%s", + ds.id, conf.Options.Id, err.Error()) // Reconnect while network error happen if err == io.EOF { - // todo - // srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, - srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress[0], conf.Options.SourceAuthType, - conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) + srcConn = utils.OpenRedisConnWithTimeout(ds.source, conf.Options.SourceAuthType, + ds.sourcePassword, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) } else if _, ok := err.(net.Error); ok { - // todo - // srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, - srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress[0], conf.Options.SourceAuthType, - conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) + srcConn = utils.OpenRedisConnWithTimeout(ds.source, conf.Options.SourceAuthType, + ds.sourcePassword, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) } } else { - // cmd.SyncStat.SetOffset(offset) - if cmd.sourceOffset, err = strconv.ParseInt(offset, 10, 64); err != nil { - log.Errorf("Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s", conf.Options.Id, err.Error()) + // ds.SyncStat.SetOffset(offset) + if ds.sourceOffset, err = strconv.ParseInt(offset, 10, 64); err != nil { + log.Errorf("dbSyncer[%v] Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s", + ds.id, conf.Options.Id, err.Error()) } } - // cmd.SyncStat.SendBufCount = int64(len(sendBuf)) - // cmd.SyncStat.ProcessingCmdCount = int64(len(cmd.delayChannel)) - //log.Infof("%s", cmd.SyncStat.Roll()) - // cmd.SyncStat.Roll() - // log.PurePrintf("%s\n", NewLogItem("Metric", "INFO", cmd.SyncStat.Snapshot())) + // ds.SyncStat.SendBufCount = int64(len(sendBuf)) + // ds.SyncStat.ProcessingCmdCount = int64(len(ds.delayChannel)) + //log.Infof("%s", ds.SyncStat.Roll()) + // ds.SyncStat.Roll() + // log.PurePrintf("%s\n", NewLogItem("Metric", "INFO", ds.SyncStat.Snapshot())) } }() @@ -433,25 +524,22 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd } if err == nil { - // cmd.SyncStat.SuccessCmdCount.Incr() - metric.MetricVar.AddSuccessCmdCount(1) + metric.GetMetric(ds.id).AddSuccessCmdCount(1) } else { - // cmd.SyncStat.FailCmdCount.Incr() - metric.MetricVar.AddFailCmdCount(1) + metric.GetMetric(ds.id).AddFailCmdCount(1) if utils.CheckHandleNetError(err) { - // log.PurePrintf("%s\n", NewLogItem("NetErrorWhileReceive", "ERROR", NewErrorLogDetail("", err.Error()))) - log.Panicf("Event:NetErrorWhileReceive\tId:%s\tError:%s", conf.Options.Id, err.Error()) + log.Panicf("dbSyncer[%v] Event:NetErrorWhileReceive\tId:%s\tError:%s", + ds.id, conf.Options.Id, err.Error()) } else { - // log.PurePrintf("%s\n", NewLogItem("ErrorReply", "ERROR", NewErrorLogDetail("", err.Error()))) - log.Panicf("Event:ErrorReply\tId:%s\tCommand: [unknown]\tError: %s", - conf.Options.Id, err.Error()) + log.Panicf("dbSyncer[%v] Event:ErrorReply\tId:%s\tCommand: [unknown]\tError: %s", + ds.id, conf.Options.Id, err.Error()) } } if node == nil { // non-blocking read from delay channel select { - case node = <-cmd.delayChannel: + case node = <-ds.delayChannel: default: // it's ok, channel is empty } @@ -459,11 +547,11 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd if node != nil { if node.id == id { - // cmd.SyncStat.Delay.Add(time.Now().Sub(node.t).Nanoseconds()) - metric.MetricVar.AddDelay(uint64(time.Now().Sub(node.t).Nanoseconds()) / 1000000) // ms + metric.GetMetric(ds.id).AddDelay(uint64(time.Now().Sub(node.t).Nanoseconds()) / 1000000) // ms node = nil } else if node.id < id { - log.Panicf("receive id invalid: node-id[%v] < receive-id[%v]", node.id, id) + log.Panicf("dbSyncer[%v] receive id invalid: node-id[%v] < receive-id[%v]", + ds.id, node.id, id) } } } @@ -480,8 +568,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd decoder := redis.NewDecoder(reader) - // log.PurePrintf("%s\n", NewLogItem("IncrSyncStart", "INFO", LogDetail{})) - log.Infof("Event:IncrSyncStart\tId:%s\t", conf.Options.Id) + log.Infof("dbSyncer[%v] Event:IncrSyncStart\tId:%s\t", ds.id, conf.Options.Id) for { ignorecmd := false @@ -489,10 +576,9 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd resp := redis.MustDecodeOpt(decoder) if scmd, argv, err = redis.ParseArgs(resp); err != nil { - log.PanicError(err, "parse command arguments failed") + log.PanicErrorf(err, "dbSyncer[%v] parse command arguments failed", ds.id) } else { - // cmd.SyncStat.PullCmdCount.Incr() - metric.MetricVar.AddPullCmdCount(1) + metric.GetMetric(ds.id).AddPullCmdCount(1) // print debug log of send command if conf.Options.LogLevel == utils.LogLevelAll { @@ -501,18 +587,18 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd strArgv[i] = *(*string)(unsafe.Pointer(&ele)) } sendMarkId.Incr() - log.Debugf("send command[%v]: [%s %v]", sendMarkId.Get(), scmd, strArgv) + log.Debugf("dbSyncer[%v] send command[%v]: [%s %v]", ds.id, sendMarkId.Get(), scmd, strArgv) } if scmd != "ping" { if strings.EqualFold(scmd, "select") { if len(argv) != 1 { - log.Panicf("select command len(args) = %d", len(argv)) + log.Panicf("dbSyncer[%v] select command len(args) = %d", ds.id, len(argv)) } s := string(argv[0]) n, err := strconv.Atoi(s) if err != nil { - log.PanicErrorf(err, "parse db = %s failed", s) + log.PanicErrorf(err, "dbSyncer[%v] parse db = %s failed", ds.id, s) } bypass = !base.AcceptDB(uint32(n)) isselect = true @@ -520,18 +606,18 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd ignorecmd = true } if bypass || ignorecmd { - cmd.nbypass.Incr() - // cmd.SyncStat.BypassCmdCount.Incr() - metric.MetricVar.AddBypassCmdCount(1) + ds.nbypass.Incr() + // ds.SyncStat.BypassCmdCount.Incr() + metric.GetMetric(ds.id).AddBypassCmdCount(1) continue } } is_filter := false if len(conf.Options.FilterKey) != 0 { - cmd, ok := command.RedisCommands[scmd] + ds, ok := command.RedisCommands[scmd] if ok && len(argv) > 0 { - new_argv, is_filter = command.GetMatchKeys(cmd, argv, conf.Options.FilterKey) + new_argv, is_filter = command.GetMatchKeys(ds, argv, conf.Options.FilterKey) } else { is_filter = true new_argv = argv @@ -541,7 +627,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd new_argv = argv } if bypass || ignorecmd || !is_filter { - cmd.nbypass.Incr() + ds.nbypass.Incr() continue } } @@ -551,15 +637,14 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd lastdb = int32(conf.Options.TargetDB) //sendBuf <- cmdDetail{Cmd: scmd, Args: argv, Timestamp: time.Now()} /* send select command. */ - cmd.sendBuf <- cmdDetail{Cmd: "SELECT", Args: [][]byte{[]byte(strconv.FormatInt(int64(lastdb), 10))}} + ds.sendBuf <- cmdDetail{Cmd: "SELECT", Args: [][]byte{[]byte(strconv.FormatInt(int64(lastdb), 10))}} } else { - cmd.nbypass.Incr() - // cmd.SyncStat.BypassCmdCount.Incr() - metric.MetricVar.AddBypassCmdCount(1) + ds.nbypass.Incr() + metric.GetMetric(ds.id).AddBypassCmdCount(1) } continue } - cmd.sendBuf <- cmdDetail{Cmd: scmd, Args: new_argv} + ds.sendBuf <- cmdDetail{Cmd: scmd, Args: new_argv} } }() @@ -567,7 +652,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd var noFlushCount uint var cachedSize uint64 - for item := range cmd.sendBuf { + for item := range ds.sendBuf { length := len(item.Cmd) data := make([]interface{}, len(item.Args)) for i := range item.Args { @@ -576,41 +661,39 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd } err := c.Send(item.Cmd, data...) if err != nil { - // log.PurePrintf("%s\n", NewLogItem("SendToTargetFail", "ERROR", NewErrorLogDetail("", err.Error()))) - log.Panicf("Event:SendToTargetFail\tId:%s\tError:%s\t", conf.Options.Id, err.Error()) + log.Panicf("dbSyncer[%v] Event:SendToTargetFail\tId:%s\tError:%s\t", + ds.id, conf.Options.Id, err.Error()) } noFlushCount += 1 - cmd.forward.Incr() - // cmd.SyncStat.PushCmdCount.Incr() - metric.MetricVar.AddPushCmdCount(1) - // cmd.SyncStat.NetworkFlow.Add(int64(length)) // 发送流量 - metric.MetricVar.AddNetworkFlow(uint64(length)) + ds.forward.Incr() + metric.GetMetric(ds.id).AddPushCmdCount(1) + metric.GetMetric(ds.id).AddNetworkFlow(uint64(length)) sendId.Incr() if conf.Options.Metric { // delay channel - cmd.addDelayChan(sendId.Get()) + ds.addDelayChan(sendId.Get()) } if noFlushCount > conf.Options.SenderCount || cachedSize > conf.Options.SenderSize || - len(cmd.sendBuf) == 0 { // 5000 cmd in a batch + len(ds.sendBuf) == 0 { // 5000 ds in a batch err := c.Flush() noFlushCount = 0 cachedSize = 0 if utils.CheckHandleNetError(err) { - // log.PurePrintf("%s\n", NewLogItem("NetErrorWhileFlush", "ERROR", NewErrorLogDetail("", err.Error()))) - log.Panicf("Event:NetErrorWhileFlush\tId:%s\tError:%s\t", conf.Options.Id, err.Error()) + log.Panicf("dbSyncer[%v] Event:NetErrorWhileFlush\tId:%s\tError:%s\t", + ds.id, conf.Options.Id, err.Error()) } } } }() - for lstat := cmd.Stat(); ; { + for lstat := ds.Stat(); ; { time.Sleep(time.Second) - nstat := cmd.Stat() + nstat := ds.Stat() var b bytes.Buffer - fmt.Fprintf(&b, "sync: ") + fmt.Fprintf(&b, "dbSyncer[%v] sync: ", ds.id) fmt.Fprintf(&b, " +forward=%-6d", nstat.forward-lstat.forward) fmt.Fprintf(&b, " +nbypass=%-6d", nstat.nbypass-lstat.nbypass) fmt.Fprintf(&b, " +nbytes=%d", nstat.wbytes-lstat.wbytes) @@ -619,7 +702,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd } } -func (cmd *CmdSync) addDelayChan(id int64) { +func (ds *dbSyncer) addDelayChan(id int64) { // send /* * available >=4096: 1:1 sampling @@ -627,17 +710,17 @@ func (cmd *CmdSync) addDelayChan(id int64) { * available >=128: 1:100 sampling * else: 1:1000 sampling */ - used := cap(cmd.delayChannel) - len(cmd.delayChannel) + used := cap(ds.delayChannel) - len(ds.delayChannel) if used >= 4096 || used >= 1024 && id % 10 == 0 || used >= 128 && id % 100 == 0 || id % 1000 == 0 { // non-blocking add select { - case cmd.delayChannel <- &delayNode{t: time.Now(), id: id}: + case ds.delayChannel <- &delayNode{t: time.Now(), id: id}: default: // do nothing but print when channel is full - log.Warn("delayChannel is full") + log.Warnf("dbSyncer[%v] delayChannel is full", ds.id) } } } From 08c5dc1f136d68cc76e6e45487a4b90b29dfb442 Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 8 May 2019 17:16:24 +0800 Subject: [PATCH 08/11] 1.source address supports cluster 2.target address supports several proxies to write data in a round-robin way. --- ChangeLog | 5 + conf/redis-shake.conf | 51 +++++----- src/redis-shake/common/common.go | 22 ++++- src/redis-shake/configure/configure.go | 8 +- src/redis-shake/decode.go | 6 +- src/redis-shake/dump.go | 2 +- src/redis-shake/main/main.go | 80 ++++++++++----- src/redis-shake/restore.go | 131 +++++++++++++++++-------- src/redis-shake/rump.go | 3 +- src/redis-shake/sync.go | 8 +- 10 files changed, 212 insertions(+), 104 deletions(-) diff --git a/ChangeLog b/ChangeLog index eb7a4b9..ea26ce2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +2019-04-21 Alibaba Cloud. + * VERSION: 1.6.0 + * FEATURE: source address supports cluster. + * FEATURE: target address supports several proxies to write data in + a round-robin way. 2019-04-24 Alibaba Cloud. * VERSION: 1.4.2 * IMPROVE: improve rump to support fetching data from given keys in diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 7838124..8ff4d47 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -23,19 +23,6 @@ ncpu = 0 # parallel routines number used in RDB file syncing. default is 64. parallel = 32 -# input RDB file. read from stdin, default is stdin ('/dev/stdin'). -# used in `decode` and `restore`. -# if the input is list split by semicolon(;), redis-shake will restore the list one by one. -# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 -# redis-shake将会挨个进行恢复。 -input_rdb = local - -# output RDB file prefix. -# used in `decode` and `dump`. -# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: -# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 -output_rdb = local_dump - # source redis configuration. # used in `dump`, `sync` and `rump`. # ip:port @@ -48,11 +35,6 @@ source.address = 127.0.0.1:20441 source.password_raw = 123456 # auth type, don't modify it source.auth_type = auth -# the concurrence of fetching data, default is len(source.address) -# used in `dump` and `sync`. -# 拉取的并发度,默认是source.address中db的个数。假如db节点有5个,但source.parallel=3,那么一次只会 -# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。 -source.parallel = # target redis configuration. used in `restore` and `sync`. # used in `restore`, `sync` and `rump`. @@ -65,15 +47,34 @@ target.address = 127.0.0.1:20551 target.password_raw = 123456 # auth type, don't modify it target.auth_type = auth +# the type of target redis can be `standalone`, `proxy` or `cluster`. +# `standalone`: standalone db mode. +# `proxy`: proxy layer ahead redis. +# `cluster`: open source cluster (not supported currently) +# If the target is proxy, data will be inserted in a round-robin way. +# standalone表示单个db节点进行写入(包括主从模式),proxy表示写入的是代理层,将会以轮回的方式进行写入,1个proxy对应一条 +# 长连接通道(1个源端db的数据只会写入1个proxy),cluster表示开源的集群架构 +target.type = standalone # all the data will be written into this db. < 0 means disable. target.db = -1 -# the type of target redis: `cluster`, `opensource`. If the target is open source redis cluster, -# redis-shake uses redis-go-cluster driver to write data, otherwise, redigo driver is used to -# insert data in round robin way. -# 目的redis的类型:`opensource`和`proxy`. `opensource`表示是开源的cluster或者redis db节点; -# `proxy`表示是proxy类型,将会以round robin循环方式写入。对于开源的cluster用redis-go-cluster驱动写入,其余 -# 的则用redigo写入 -# target.type = opensource + +# input RDB file. read from stdin, default is stdin ('/dev/stdin'). +# used in `decode` and `restore`. +# if the input is list split by semicolon(;), redis-shake will restore the list one by one. +# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 +# redis-shake将会挨个进行恢复。 +rdb.input = local +# output RDB file prefix. +# used in `decode` and `dump`. +# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: +# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 +rdb.output = local_dump +# the concurrence of fetching data, default is len(source.address) or len(rdb.input). +# used in `dump`, `sync` and `restore`. +# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(rdb.input)。 +# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会 +# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。 +rdb.parallel = # use for expire key, set the time gap when source and target timestamp are not the same. # 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值 diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 88e7219..d66e9d8 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -8,7 +8,7 @@ import ( "pkg/libs/bytesize" "redis-shake/configure" - + logRotate "gopkg.in/natefinch/lumberjack.v2" ) @@ -26,9 +26,10 @@ const ( ) var ( - Version = "$" - LogRotater *logRotate.Logger - StartTime string + Version = "$" + LogRotater *logRotate.Logger + StartTime string + TargetRoundRobin int ) // read until hit the end of RESP: "\r\n" @@ -70,5 +71,16 @@ func ParseInfo(content []byte) map[string]string { } func GetTotalLink() int { - return len(conf.Options.SourceAddress) + if len(conf.Options.SourceAddress) != 0 { + return len(conf.Options.SourceAddress) + } else { + return len(conf.Options.RdbInput) + } +} + +func PickTargetRoundRobin(n int) int { + defer func() { + TargetRoundRobin = (TargetRoundRobin + 1) % n + }() + return TargetRoundRobin } \ No newline at end of file diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 321066d..afc3642 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -11,20 +11,22 @@ type Configuration struct { HttpProfile int `config:"http_profile"` NCpu int `config:"ncpu"` Parallel int `config:"parallel"` - InputRdb []string `config:"input_rdb"` - OutputRdb string `config:"output_rdb"` SourceAddress []string `config:"source.address"` SourcePasswordRaw string `config:"source.password_raw"` SourcePasswordEncoding string `config:"source.password_encoding"` SourceVersion uint `config:"source.version"` SourceAuthType string `config:"source.auth_type"` SourceParallel uint `config:"source.parallel"` - TargetAddress string `config:"target.address"` + TargetAddress []string `config:"target.address"` TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordEncoding string `config:"target.password_encoding"` TargetVersion uint `config:"target.version"` TargetDB int `config:"target.db"` TargetAuthType string `config:"target.auth_type"` + TargetType string `config:"target.type"` + RdbInput []string `config:"rdb.input"` + RdbOutput string `config:"rdb.output"` + RdbParallel int `config:"rdb.parallel"` FakeTime string `config:"fake_time"` Rewrite bool `config:"rewrite"` FilterDB string `config:"filter.db"` diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index 09aee6b..e71dbf7 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -40,11 +40,11 @@ func (cmd *CmdDecode) GetDetailedInfo() interface{} { } func (cmd *CmdDecode) Main() { - log.Infof("decode from '%s' to '%s'\n", conf.Options.InputRdb, conf.Options.OutputRdb) + log.Infof("decode from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.RdbOutput) - for i, input := range conf.Options.InputRdb { + for i, input := range conf.Options.RdbInput { // decode one by one - output := fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i) + output := fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i) cmd.decode(input, output) } diff --git a/src/redis-shake/dump.go b/src/redis-shake/dump.go index b3e5b27..4303174 100644 --- a/src/redis-shake/dump.go +++ b/src/redis-shake/dump.go @@ -36,7 +36,7 @@ func (cmd *CmdDump) Main() { for i, source := range conf.Options.SourceAddress { nd := node{ source: source, - output: fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i), + output: fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i), } cmd.dumpChan <- nd } diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index a329469..e744df9 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -193,20 +193,48 @@ func sanitizeOptions(tp string) error { return fmt.Errorf("BigKeyThreshold[%v] should <= 524288000", conf.Options.BigKeyThreshold) } - if (tp == TypeRestore || tp == TypeSync) && conf.Options.TargetAddress == "" { - return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}") - } - if (tp == TypeDump || tp == TypeSync) && len(conf.Options.SourceAddress) == 0 { - return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}") + if tp == TypeRestore || tp == TypeSync || tp == TypeRump { + if len(conf.Options.TargetAddress) == 0 { + return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}") + } + + switch conf.Options.TargetType { + case "standalone": + if len(conf.Options.TargetAddress) != 1 { + return fmt.Errorf("the source address[%v] != 1 when target.type is standalone", + len(conf.Options.TargetAddress)) + } + case "proxy": + case "cluster": + if tp == TypeRump || tp == TypeRestore { + // TODO + return fmt.Errorf("{rump, restore} mode doesn't support cluster currently") + } + return fmt.Errorf("coming soon") + default: + return fmt.Errorf("illegal target.type[%v]", conf.Options.TargetType) + } } - if tp == TypeRump && (len(conf.Options.SourceAddress) == 0 || conf.Options.TargetAddress == "") { - return fmt.Errorf("source and target address shouldn't be empty when type in {rump}") + if (tp == TypeDump || tp == TypeSync || tp == TypeRump) && len(conf.Options.SourceAddress) == 0 { + return fmt.Errorf("source address shouldn't be empty when type in {dump, sync, rump}") } - if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.InputRdb) == 0 { + if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.RdbInput) == 0 { return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}") } - if tp == TypeDump && conf.Options.OutputRdb == "" { - conf.Options.OutputRdb = "output-rdb-dump" + if tp == TypeDump && conf.Options.RdbOutput == "" { + conf.Options.RdbOutput = "output-rdb-dump" + } + + if conf.Options.RdbParallel == 0 { + if tp == TypeDump || tp == TypeSync { + conf.Options.RdbParallel = len(conf.Options.SourceAddress) + } else if tp == TypeRestore { + conf.Options.RdbParallel = len(conf.Options.RdbInput) + } + } + + if tp == TypeRestore && conf.Options.RdbParallel > len(conf.Options.RdbInput) { + conf.Options.RdbParallel = len(conf.Options.RdbInput) } if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" { @@ -318,45 +346,51 @@ func sanitizeOptions(tp string) error { if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 { return fmt.Errorf("HttpProfile[%v] should in [0, 65535]", conf.Options.HttpProfile) - } else if conf.Options.HttpProfile == 0 { + } else if conf.Options.HttpProfile == 0 { // set to default when not set conf.Options.HttpProfile = defaultHttpPort } if conf.Options.SystemProfile < 0 || conf.Options.SystemProfile > 65535 { return fmt.Errorf("SystemProfile[%v] should in [0, 65535]", conf.Options.SystemProfile) - } else if conf.Options.SystemProfile == 0 { + } else if conf.Options.SystemProfile == 0 { // set to default when not set conf.Options.SystemProfile = defaultSystemPort } if conf.Options.SenderSize < 0 || conf.Options.SenderSize >= 1073741824 { return fmt.Errorf("SenderSize[%v] should in [0, 1073741824]", conf.Options.SenderSize) - } else if conf.Options.SenderSize == 0 { + } else if conf.Options.SenderSize == 0 { // set to default when not set conf.Options.SenderSize = defaultSenderSize } if conf.Options.SenderCount < 0 || conf.Options.SenderCount >= 100000 { return fmt.Errorf("SenderCount[%v] should in [0, 100000]", conf.Options.SenderCount) - } else if conf.Options.SenderCount == 0 { + } else if conf.Options.SenderCount == 0 { // set to default when not set conf.Options.SenderCount = defaultSenderCount } if tp == TypeRestore || tp == TypeSync { // get target redis version and set TargetReplace. - if conf.Options.TargetRedisVersion, err = utils.GetRedisVersion(conf.Options.TargetAddress, - conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw); err != nil { - return fmt.Errorf("get target redis version failed[%v]", err) - } else { - if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") || - strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") { - conf.Options.TargetReplace = true + for _, address := range conf.Options.TargetAddress { + if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType, + conf.Options.TargetPasswordRaw); err != nil { + return fmt.Errorf("get target redis version failed[%v]", err) + } else if conf.Options.TargetRedisVersion != "" && conf.Options.TargetRedisVersion != v { + return fmt.Errorf("target redis version is different: [%v %v]", conf.Options.TargetRedisVersion, v) } else { - conf.Options.TargetReplace = false + conf.Options.TargetRedisVersion = v } } + if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") || + strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") || + strings.HasPrefix(conf.Options.TargetRedisVersion, "5.") { + conf.Options.TargetReplace = true + } else { + conf.Options.TargetReplace = false + } } if tp == TypeRump { @@ -365,7 +399,7 @@ func sanitizeOptions(tp string) error { } if conf.Options.ScanSpecialCloud != "" && conf.Options.ScanSpecialCloud != scanner.TencentCluster && - conf.Options.ScanSpecialCloud != scanner.AliyunCluster { + conf.Options.ScanSpecialCloud != scanner.AliyunCluster { return fmt.Errorf("special cloud type[%s] is not supported", conf.Options.ScanSpecialCloud) } diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index 7878fa7..ae2348a 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -17,12 +17,10 @@ import ( "redis-shake/common" "strconv" "redis-shake/base" + "sync" ) type CmdRestore struct { - rbytes, ebytes, nentry, ignore atomic2.Int64 - - forward, nbypass atomic2.Int64 } type cmdRestoreStat struct { @@ -31,31 +29,57 @@ type cmdRestoreStat struct { forward, nbypass int64 } -func (cmd *CmdRestore) Stat() *cmdRestoreStat { - return &cmdRestoreStat{ - rbytes: cmd.rbytes.Get(), - ebytes: cmd.ebytes.Get(), - nentry: cmd.nentry.Get(), - ignore: cmd.ignore.Get(), - - forward: cmd.forward.Get(), - nbypass: cmd.nbypass.Get(), - } -} - func (cmd *CmdRestore) GetDetailedInfo() interface{} { return nil } func (cmd *CmdRestore) Main() { - log.Infof("restore from '%s' to '%s'\n", conf.Options.InputRdb, conf.Options.TargetAddress) + log.Infof("restore from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.TargetAddress) + type restoreNode struct { + id int + input string + } base.Status = "waitRestore" - for _, input := range conf.Options.InputRdb { - // restore one by one - cmd.restore(input) + total := utils.GetTotalLink() + restoreChan := make(chan restoreNode, total) + + for i, rdb := range conf.Options.RdbInput { + restoreChan <- restoreNode{id: i, input: rdb} } + var wg sync.WaitGroup + wg.Add(len(conf.Options.RdbInput)) + for i := 0; i < conf.Options.RdbParallel; i++ { + go func() { + for { + node, ok := <-restoreChan + if !ok { + break + } + + // round-robin pick + pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddress)) + target := conf.Options.TargetAddress[pick] + + dr := &dbRestorer{ + id: node.id, + input: node.input, + target: target, + targetPassword: conf.Options.TargetPasswordRaw, + } + dr.restore() + log.Infof("routine[%v] starts restoring data from %v to %v", + dr.id, dr.input, dr.target) + + wg.Done() + } + }() + } + + wg.Wait() + close(restoreChan) + if conf.Options.HttpProfile > 0 { //fake status if set http_port. and wait forever base.Status = "incr" @@ -64,25 +88,50 @@ func (cmd *CmdRestore) Main() { } } -func (cmd *CmdRestore) restore(input string) { - readin, nsize := utils.OpenReadFile(input) +/*------------------------------------------------------*/ +// one restore link corresponding to one dbRestorer +type dbRestorer struct { + id int // id + input string // input rdb + target string + targetPassword string + + // metric + rbytes, ebytes, nentry, ignore atomic2.Int64 + forward, nbypass atomic2.Int64 +} + +func (dr *dbRestorer) Stat() *cmdRestoreStat { + return &cmdRestoreStat{ + rbytes: dr.rbytes.Get(), + ebytes: dr.ebytes.Get(), + nentry: dr.nentry.Get(), + ignore: dr.ignore.Get(), + + forward: dr.forward.Get(), + nbypass: dr.nbypass.Get(), + } +} + +func (dr *dbRestorer) restore() { + readin, nsize := utils.OpenReadFile(dr.input) defer readin.Close() base.Status = "restore" reader := bufio.NewReaderSize(readin, utils.ReaderBufferSize) - cmd.restoreRDBFile(reader, conf.Options.TargetAddress, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, + dr.restoreRDBFile(reader, dr.target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, nsize) base.Status = "extra" - if conf.Options.ExtraInfo && (nsize == 0 || nsize != cmd.rbytes.Get()) { - cmd.restoreCommand(reader, conf.Options.TargetAddress, conf.Options.TargetAuthType, + if conf.Options.ExtraInfo && (nsize == 0 || nsize != dr.rbytes.Get()) { + dr.restoreCommand(reader, dr.target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) } } -func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { - pipe := utils.NewRDBLoader(reader, &cmd.rbytes, base.RDBPipeSize) +func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { + pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize) wait := make(chan struct{}) go func() { defer close(wait) @@ -97,9 +146,9 @@ func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, p var lastdb uint32 = 0 for e := range pipe { if !base.AcceptDB(e.DB) { - cmd.ignore.Incr() + dr.ignore.Incr() } else { - cmd.nentry.Incr() + dr.nentry.Incr() if conf.Options.TargetDB != -1 { if conf.Options.TargetDB != int(lastdb) { lastdb = uint32(conf.Options.TargetDB) @@ -127,12 +176,12 @@ func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, p done = true case <-time.After(time.Second): } - stat := cmd.Stat() + stat := dr.Stat() var b bytes.Buffer if nsize != 0 { - fmt.Fprintf(&b, "total = %d - %12d [%3d%%]", nsize, stat.rbytes, 100*stat.rbytes/nsize) + fmt.Fprintf(&b, "routine[%v] total = %d - %12d [%3d%%]", dr.id, nsize, stat.rbytes, 100*stat.rbytes/nsize) } else { - fmt.Fprintf(&b, "total = %12d", stat.rbytes) + fmt.Fprintf(&b, "routine[%v] total = %12d", dr.id, stat.rbytes) } fmt.Fprintf(&b, " entry=%-12d", stat.nentry) if stat.ignore != 0 { @@ -140,10 +189,10 @@ func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, p } log.Info(b.String()) } - log.Info("restore: rdb done") + log.Info("routine[%v] restore: rdb done", dr.id) } -func (cmd *CmdRestore) restoreCommand(reader *bufio.Reader, target, auth_type, passwd string) { +func (dr *dbRestorer) restoreCommand(reader *bufio.Reader, target, auth_type, passwd string) { c := utils.OpenNetConn(target, auth_type, passwd) defer c.Close() @@ -162,35 +211,35 @@ func (cmd *CmdRestore) restoreCommand(reader *bufio.Reader, target, auth_type, p for { resp := redis.MustDecode(reader) if scmd, args, err := redis.ParseArgs(resp); err != nil { - log.PanicError(err, "parse command arguments failed") + log.PanicError(err, "routine[%v] parse command arguments failed", dr.id) } else if scmd != "ping" { if scmd == "select" { if len(args) != 1 { - log.Panicf("select command len(args) = %d", len(args)) + log.Panicf("routine[%v] select command len(args) = %d", dr.id, len(args)) } s := string(args[0]) n, err := strconv.Atoi(s) if err != nil { - log.PanicErrorf(err, "parse db = %s failed", s) + log.PanicErrorf(err, "routine[%v] parse db = %s failed", dr.id, s) } bypass = !base.AcceptDB(uint32(n)) } if bypass { - cmd.nbypass.Incr() + dr.nbypass.Incr() continue } } - cmd.forward.Incr() + dr.forward.Incr() redis.MustEncode(writer, resp) utils.FlushWriter(writer) } }() - for lstat := cmd.Stat(); ; { + for lstat := dr.Stat(); ; { time.Sleep(time.Second) - nstat := cmd.Stat() + nstat := dr.Stat() var b bytes.Buffer - fmt.Fprintf(&b, "restore: ") + fmt.Fprintf(&b, "routine[%v] restore: ", dr.id) fmt.Fprintf(&b, " +forward=%-6d", nstat.forward-lstat.forward) fmt.Fprintf(&b, " +nbypass=%-6d", nstat.nbypass-lstat.nbypass) log.Info(b.String()) diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 0700d39..e5f7243 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -40,7 +40,8 @@ func (cr *CmdRump) Main() { for i, address := range conf.Options.SourceAddress { cr.sourceConn[i] = utils.OpenRedisConn(address, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) } - cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType, + // TODO, current only support write data into 1 db or proxy + cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress[0], conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) // init two channels diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 27907ff..94943e6 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -80,11 +80,15 @@ func (cmd *CmdSync) Main() { syncChan := make(chan syncNode, total) cmd.dbSyncers = make([]*dbSyncer, total) for i, source := range conf.Options.SourceAddress { + // round-robin pick + pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddress)) + target := conf.Options.TargetAddress[pick] + nd := syncNode{ id: i, source: source, sourcePassword: conf.Options.SourcePasswordRaw, - target: conf.Options.TargetAddress, // todo, target address load balance + target: target, targetPassword: conf.Options.TargetPasswordRaw, } syncChan <- nd @@ -125,7 +129,7 @@ func (cmd *CmdSync) Main() { } /*------------------------------------------------------*/ -// one sync tunnel corresponding to one dbSyncer +// one sync link corresponding to one dbSyncer func NewDbSyncer(id int, source, sourcePassword, target, targetPassword string, httpPort int) *dbSyncer { ds := &dbSyncer{ id: id, From 7affc7dea77b109063164cae9b20d6e3228e069e Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 8 May 2019 19:12:57 +0800 Subject: [PATCH 09/11] update .gitignore --- .gitignore | 1 - README.md | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index c2f7e30..40c2ac9 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,6 @@ logs tags result.db.* bin/* -!/bin/redis-shake.* conf/* !conf/redis-shake.conf diff --git a/README.md b/README.md index 894dd36..d77f3c8 100644 --- a/README.md +++ b/README.md @@ -65,11 +65,11 @@ Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0. # Usage --- -Run `./bin/redis-shake.darwin64` or `redis-shake.linux64` which is built in OSX and Linux respectively.
-Or you can build redis-shake yourself according to the following steps: +You can use the binary in the release package.
+You can also build redis-shake yourself according to the following steps, the `go` and `govendor` must be installed before compile: * git clone https://github.com/alibaba/RedisShake.git * cd RedisShake -* export GOPATH=\`pwd\`/../.. +* export GOPATH=\`pwd\` * cd src/sync * govendor sync #please note: must install govendor first and then pull all dependencies: `go get -u github.com/kardianos/govendor` * cd ../../ && ./build.sh From f4a8f979a9f4b0d9760fd7bab56fe8ede3c1697f Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 8 May 2019 19:22:15 +0800 Subject: [PATCH 10/11] remove binarya --- .gitignore | 1 + README.md | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 40c2ac9..c2f7e30 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ logs tags result.db.* bin/* +!/bin/redis-shake.* conf/* !conf/redis-shake.conf diff --git a/README.md b/README.md index d77f3c8..bfe3611 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,9 @@ RedisShake is mainly used to synchronize data from one redis database to another Thanks to the Douyu's WSD team for the support.
* [中文文档](https://yq.aliyun.com/articles/691794) -* [English tutorial](https://github.com/alibaba/RedisShake/wiki/tutorial-about-how-to-set-up) -* [中文使用文档](https://github.com/alibaba/RedisShake/wiki/%E7%AC%AC%E4%B8%80%E6%AC%A1%E4%BD%BF%E7%94%A8%EF%BC%8C%E5%A6%82%E4%BD%95%E8%BF%9B%E8%A1%8C%E9%85%8D%E7%BD%AE%EF%BC%9F) +* [English tutorial](https://github.com/alibaba/RedisShake/wiki/tutorial-about-how-to-set-up) +* [中文使用文档](https://github.com/alibaba/RedisShake/wiki/%E7%AC%AC%E4%B8%80%E6%AC%A1%E4%BD%BF%E7%94%A8%EF%BC%8C%E5%A6%82%E4%BD%95%E8%BF%9B%E8%A1%8C%E9%85%8D%E7%BD%AE%EF%BC%9F) +* [Release package](https://github.com/alibaba/RedisShake/releases) # Redis-Shake ---