From cef2bbff346f474fee3a49fac6827db1806eab13 Mon Sep 17 00:00:00 2001 From: vinllen Date: Tue, 30 Apr 2019 10:20:06 +0800 Subject: [PATCH] modify source cluster of dump --- README.md | 11 ++++ conf/redis-shake.conf | 33 +++++++--- src/redis-shake/common/utils.go | 1 + src/redis-shake/configure/configure.go | 83 +++++++++++++------------ src/redis-shake/dump.go | 86 ++++++++++++++++++-------- src/redis-shake/main/main.go | 11 +++- src/redis-shake/rump.go | 5 +- src/redis-shake/sync.go | 19 ++++-- src/vendor/vendor.json | 3 +- 9 files changed, 169 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index e56cd38..ae91ba2 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,9 @@ The type can be one of the followings:
Please check out the `conf/redis-shake.conf` to see the detailed parameters description.
+# Configuration +Redis-shake has several parameters in the configuration(`conf/redis-shake.conf`) that maybe confusing, if this is your first time using, just configure the `source.address` and `target.address` parameters. + # Verification --- User can use [RedisFullCheck](https://github.com/alibaba/RedisFullCheck) to verify correctness.
@@ -30,6 +33,14 @@ Redis-shake offers metrics through restful api and log file.
* restful api: `curl 127.0.0.1:9320/metric`. * log: the metric info will be printed in the log periodically if enable. +m + +# Redis Type +--- +Both the source and target type can be single redis, master-slave architecture redis, redis cluster and codis. Although the architecture patterns of different vendors are different for the cluster architecture, we still support different cloud vendors like alibaba-cloud, tencent-cloud and so on.
+If the target is open source redis cluster, redis-shake uses [redis-go-cluster](https://github.com/chasex/redis-go-cluster) driver to write data, otherwise, [redigo](https://github.com/garyburd/redigo) driver is used to insert data in round robin way.
+If the source is redis cluster, redis-shake launches multiple goroutines for parallel pull. User can control the concurrency.
+The "move slot" operations must be disabled on the source side.
# Code branch rules --- diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index a8f913c..c2a2686 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -28,33 +28,50 @@ parallel = 32 # 如果是decode或者restore,这个参数表示读取的rdb文件 input_rdb = local -# output RDB file. default is stdout ('/dev/stdout'). +# output RDB file prefix. # used in `decode` and `dump`. -# 如果是decode或者dump,这个参数表示输出的rdb +# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是: +# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2 output_rdb = local_dump # source redis configuration. # used in `dump`, `sync` and `rump`. # ip:port -# 源redis地址 +# the source address can be redis db or cluster that has several db nodes split by semicolon(;). +# if the type is `rump`, source type can also be the proxy address. +# 源redis地址(ip:port),源端支持开源cluster以及db节点暴露且有权限的proxy集群模式,不同db以分号(;) +# 分割,例如:ip1:port1;ip2;port2;ip3;port3。 source.address = 127.0.0.1:20441 # password. source.password_raw = 123456 # auth type, don't modify it source.auth_type = auth +# the concurrence of fetching data, default is len(source.address) +# used in `dump` and `sync`. +# 拉取的并发度,默认是source.address中db的个数。假如db节点有5个,但source.parallel=3,那么一次只会 +# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。 +source.parallel = # target redis configuration. used in `restore` and `sync`. # used in `restore`, `sync` and `rump`. # ip:port -# 目的redis地址 +# the source address can be cluster that has several db nodes split by semicolon(;). +# 目的redis地址(ip:port),目的端支持开源的cluster架构和proxy模式,不同db以分号(;)分割,例如: +# ip1:port1;ip2;port2;ip3;port3。 target.address = 127.0.0.1:20551 # password. target.password_raw = 123456 # auth type, don't modify it target.auth_type = auth -# all the data will come into this db. < 0 means disable. -# used in `restore` and `sync`. +# all the data will be written into this db. < 0 means disable. target.db = -1 +# the type of target redis: `cluster`, `opensource`. If the target is open source redis cluster, +# redis-shake uses redis-go-cluster driver to write data, otherwise, redigo driver is used to +# insert data in round robin way. +# 目的redis的类型:`opensource`和`proxy`. `opensource`表示是开源的cluster或者redis db节点; +# `proxy`表示是proxy类型,将会以round robin循环方式写入。对于开源的cluster用redis-go-cluster驱动写入,其余 +# 的则用redigo写入 +# target.type = opensource # use for expire key, set the time gap when source and target timestamp are not the same. # 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值 @@ -126,7 +143,9 @@ sender.size = 104857600 # used in `sync`. # 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送 sender.count = 5000 -# delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also stored in this delay queue. this timestamp will be used to calculate the time delay when receiving ack from target redis. +# delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also +# stored in this delay queue. this timestamp will be used to calculate the time delay when receiving +# ack from target redis. # used in `sync`. # 用于metric统计时延的队列 sender.delay_channel_size = 65535 diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 6c13371..5023cbd 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -22,6 +22,7 @@ import ( "pkg/rdb" "pkg/redis" "redis-shake/configure" + redigo "github.com/garyburd/redigo/redis" ) diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 78033bd..6035480 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -4,47 +4,48 @@ import "time" type Configuration struct { // config file variables - Id string `config:"id"` - LogFile string `config:"log.file"` - LogLevel string `config:"log.level"` - SystemProfile int `config:"system_profile"` - HttpProfile int `config:"http_profile"` - NCpu int `config:"ncpu"` - Parallel int `config:"parallel"` - InputRdb string `config:"input_rdb"` - OutputRdb string `config:"output_rdb"` - SourceAddress string `config:"source.address"` - SourcePasswordRaw string `config:"source.password_raw"` - SourcePasswordEncoding string `config:"source.password_encoding"` - SourceVersion uint `config:"source.version"` - SourceAuthType string `config:"source.auth_type"` - TargetAddress string `config:"target.address"` - TargetPasswordRaw string `config:"target.password_raw"` - TargetPasswordEncoding string `config:"target.password_encoding"` - TargetVersion uint `config:"target.version"` - TargetDB int `config:"target.db"` - TargetAuthType string `config:"target.auth_type"` - FakeTime string `config:"fake_time"` - Rewrite bool `config:"rewrite"` - FilterDB string `config:"filter.db"` - FilterKey []string `config:"filter.key"` - FilterSlot []string `config:"filter.slot"` - BigKeyThreshold uint64 `config:"big_key_threshold"` - Psync bool `config:"psync"` - Metric bool `config:"metric"` - MetricPrintLog bool `config:"metric.print_log"` - HeartbeatUrl string `config:"heartbeat.url"` - HeartbeatInterval uint `config:"heartbeat.interval"` - HeartbeatExternal string `config:"heartbeat.external"` - HeartbeatNetworkInterface string `config:"heartbeat.network_interface"` - SenderSize uint64 `config:"sender.size"` - SenderCount uint `config:"sender.count"` - SenderDelayChannelSize uint `config:"sender.delay_channel_size"` - KeepAlive uint `config:"keep_alive"` - PidPath string `config:"pid_path"` - ScanKeyNumber uint32 `config:"scan.key_number"` - ScanSpecialCloud string `config:"scan.special_cloud"` - ScanKeyFile string `config:"scan.key_file"` + Id string `config:"id"` + LogFile string `config:"log.file"` + LogLevel string `config:"log.level"` + SystemProfile int `config:"system_profile"` + HttpProfile int `config:"http_profile"` + NCpu int `config:"ncpu"` + Parallel int `config:"parallel"` + InputRdb string `config:"input_rdb"` + OutputRdb string `config:"output_rdb"` + SourceAddress []string `config:"source.address"` + SourcePasswordRaw string `config:"source.password_raw"` + SourcePasswordEncoding string `config:"source.password_encoding"` + SourceVersion uint `config:"source.version"` + SourceAuthType string `config:"source.auth_type"` + SourceParallel uint `config:"source.parallel"` + TargetAddress string `config:"target.address"` + TargetPasswordRaw string `config:"target.password_raw"` + TargetPasswordEncoding string `config:"target.password_encoding"` + TargetVersion uint `config:"target.version"` + TargetDB int `config:"target.db"` + TargetAuthType string `config:"target.auth_type"` + FakeTime string `config:"fake_time"` + Rewrite bool `config:"rewrite"` + FilterDB string `config:"filter.db"` + FilterKey []string `config:"filter.key"` + FilterSlot []string `config:"filter.slot"` + BigKeyThreshold uint64 `config:"big_key_threshold"` + Psync bool `config:"psync"` + Metric bool `config:"metric"` + MetricPrintLog bool `config:"metric.print_log"` + HeartbeatUrl string `config:"heartbeat.url"` + HeartbeatInterval uint `config:"heartbeat.interval"` + HeartbeatExternal string `config:"heartbeat.external"` + HeartbeatNetworkInterface string `config:"heartbeat.network_interface"` + SenderSize uint64 `config:"sender.size"` + SenderCount uint `config:"sender.count"` + SenderDelayChannelSize uint `config:"sender.delay_channel_size"` + KeepAlive uint `config:"keep_alive"` + PidPath string `config:"pid_path"` + ScanKeyNumber uint32 `config:"scan.key_number"` + ScanSpecialCloud string `config:"scan.special_cloud"` + ScanKeyFile string `config:"scan.key_file"` // inner variables ReplaceHashTag bool `config:"replace_hash_tag"` diff --git a/src/redis-shake/dump.go b/src/redis-shake/dump.go index f73b450..6517183 100644 --- a/src/redis-shake/dump.go +++ b/src/redis-shake/dump.go @@ -5,10 +5,10 @@ package run import ( "bufio" - "io" "net" - "os" "time" + "fmt" + "sync" "pkg/libs/atomic2" "pkg/libs/log" @@ -17,6 +17,13 @@ import ( ) type CmdDump struct { + dumpChan chan node + wg sync.WaitGroup +} + +type node struct { + source string + output string } func (cmd *CmdDump) GetDetailedInfo() []interface{} { @@ -24,42 +31,71 @@ func (cmd *CmdDump) GetDetailedInfo() []interface{} { } func (cmd *CmdDump) Main() { - from, output := conf.Options.SourceAddress, conf.Options.OutputRdb - if len(from) == 0 { - log.Panic("invalid argument: from") + cmd.dumpChan = make(chan node, len(conf.Options.SourceAddress)) + + for i, source := range conf.Options.SourceAddress { + nd := node{ + source: source, + output: fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i), + } + cmd.dumpChan <- nd } - if len(output) == 0 { - output = "/dev/stdout" + + var ( + reader *bufio.Reader + writer *bufio.Writer + nsize int64 + ) + cmd.wg.Add(len(conf.Options.SourceAddress)) + for i := 0; i < int(conf.Options.SourceParallel); i++ { + go func(idx int) { + log.Infof("start routine[%v]", idx) + for { + select { + case nd, ok := <-cmd.dumpChan: + if !ok { + log.Infof("close routine[%v]", idx) + return + } + reader, writer, nsize = cmd.dump(nd.source, nd.output) + cmd.wg.Done() + } + } + }(i) } - log.Infof("dump from '%s' to '%s'\n", from, output) + cmd.wg.Wait() - var dumpto io.WriteCloser - if output != "/dev/stdout" { - dumpto = utils.OpenWriteFile(output) - defer dumpto.Close() - } else { - dumpto = os.Stdout + close(cmd.dumpChan) + + if len(conf.Options.SourceAddress) != 1 || !conf.Options.ExtraInfo { + return } - master, nsize := cmd.SendCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + // inner usage + cmd.dumpCommand(reader, writer, nsize) +} + +func (cmd *CmdDump) dump(source, output string) (*bufio.Reader, *bufio.Writer, int64) { + log.Infof("dump from '%s' to '%s'\n", source, output) + + dumpto := utils.OpenWriteFile(output) + defer dumpto.Close() + + master, nsize := cmd.sendCmd(source, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) defer master.Close() - log.Infof("rdb file = %d\n", nsize) + log.Infof("source db[%v] dump rdb file-size[%d]\n", source, nsize) reader := bufio.NewReaderSize(master, utils.ReaderBufferSize) writer := bufio.NewWriterSize(dumpto, utils.WriterBufferSize) - cmd.DumpRDBFile(reader, writer, nsize) - - if !conf.Options.ExtraInfo { - return - } + cmd.dumpRDBFile(reader, writer, nsize) - cmd.DumpCommand(reader, writer, nsize) + return reader, writer, nsize } -func (cmd *CmdDump) SendCmd(master, auth_type, passwd string) (net.Conn, int64) { +func (cmd *CmdDump) sendCmd(master, auth_type, passwd string) (net.Conn, int64) { c, wait := utils.OpenSyncConn(master, auth_type, passwd) var nsize int64 for nsize == 0 { @@ -75,7 +111,7 @@ func (cmd *CmdDump) SendCmd(master, auth_type, passwd string) (net.Conn, int64) return c, nsize } -func (cmd *CmdDump) DumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { +func (cmd *CmdDump) dumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { var nread atomic2.Int64 wait := make(chan struct{}) go func() { @@ -102,7 +138,7 @@ func (cmd *CmdDump) DumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsiz log.Info("dump: rdb done") } -func (cmd *CmdDump) DumpCommand(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { +func (cmd *CmdDump) dumpCommand(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { var nread atomic2.Int64 go func() { p := make([]byte, utils.ReaderBufferSize) diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index 14c7609..5e9c2e5 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -196,12 +196,15 @@ func sanitizeOptions(tp string) error { if (tp == TypeRestore || tp == TypeSync) && conf.Options.TargetAddress == "" { return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}") } - if (tp == TypeDump || tp == TypeSync) && conf.Options.SourceAddress == "" { + if (tp == TypeDump || tp == TypeSync) && len(conf.Options.SourceAddress) == 0 { return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}") } - if tp == TypeRump && (conf.Options.SourceAddress == "" || conf.Options.TargetAddress == "") { + if tp == TypeRump && (len(conf.Options.SourceAddress) == 0 || conf.Options.TargetAddress == "") { return fmt.Errorf("source and target address shouldn't be empty when type in {rump}") } + if tp == TypeDump && conf.Options.OutputRdb == "" { + conf.Options.OutputRdb = "output-rdb-dump" + } if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" { return fmt.Errorf("only one of source password_raw or password_encoding should be given") @@ -210,6 +213,10 @@ func sanitizeOptions(tp string) error { conf.Options.SourcePasswordRaw = string(sourcePassword) } + if conf.Options.SourceParallel == 0 || conf.Options.SourceParallel > uint(len(conf.Options.SourceAddress)) { + conf.Options.SourceParallel = uint(len(conf.Options.SourceAddress)) + } + if conf.Options.TargetPasswordRaw != "" && conf.Options.TargetPasswordEncoding != "" { return fmt.Errorf("only one of target password_raw or password_encoding should be given") } else if conf.Options.TargetPasswordEncoding != "" { diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 241bc70..5106899 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -33,8 +33,9 @@ func (cr *CmdRump) GetDetailedInfo() []interface{} { func (cr *CmdRump) Main() { // build connection - cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType, - conf.Options.SourcePasswordRaw) + // todo + //cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType, + // conf.Options.SourcePasswordRaw) cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index d585577..dfe94fc 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -93,6 +93,7 @@ func (cmd *CmdSync) GetDetailedInfo() []interface{} { } func (cmd *CmdSync) Main() { + // todo from, target := conf.Options.SourceAddress, conf.Options.TargetAddress log.Infof("sync from '%s' to '%s' with http-port[%d]\n", from, target, conf.Options.HttpProfile) @@ -108,9 +109,11 @@ func (cmd *CmdSync) Main() { var input io.ReadCloser var nsize int64 if conf.Options.Psync { - input, nsize = cmd.SendPSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + // todo + // input, nsize = cmd.SendPSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) } else { - input, nsize = cmd.SendSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + // todo + // input, nsize = cmd.SendSyncCmd(from, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) } defer input.Close() @@ -377,7 +380,9 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd return } - srcConn := utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + // todo + // srcConn := utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + srcConn := utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress[0], conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) ticker := time.NewTicker(10 * time.Second) for range ticker.C { @@ -388,10 +393,14 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd // Reconnect while network error happen if err == io.EOF { - srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + // todo + // srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress[0], conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) } else if _, ok := err.(net.Error); ok { - srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + // todo + // srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress, conf.Options.SourceAuthType, + srcConn = utils.OpenRedisConnWithTimeout(conf.Options.SourceAddress[0], conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) } } else { diff --git a/src/vendor/vendor.json b/src/vendor/vendor.json index 3d15e8e..92a5ba2 100644 --- a/src/vendor/vendor.json +++ b/src/vendor/vendor.json @@ -62,5 +62,6 @@ "revision": "a96e63847dc3c67d17befa69c303767e2f84e54f", "revisionTime": "2017-05-31T16:03:50Z" } - ] + ], + "rootPath": "/Users/vinllen-ali/code/redis-shake-inner/redis-shake/src" }