From 1ad17384ea8d4e38ee14f0667207e90965611986 Mon Sep 17 00:00:00 2001 From: Vinllen Chen Date: Tue, 14 May 2019 11:53:08 +0800 Subject: [PATCH 1/2] Improve 1.6.1 (#66) * support redis sentinel for both source and target (#58) * 1. add dbDumper in dump mode 2. polish code * polish conf * judge address logical adjust * polish ChangeLog --- ChangeLog | 4 + conf/redis-shake.conf | 47 +++++---- src/redis-shake/common/configure.go | 127 +++++++++++++++++++++++++ src/redis-shake/common/utils.go | 64 ++++++++++++- src/redis-shake/configure/configure.go | 32 +++++-- src/redis-shake/decode.go | 2 +- src/redis-shake/dump.go | 101 ++++++++++++-------- src/redis-shake/main/main.go | 74 +++++--------- src/redis-shake/restore.go | 28 +++--- src/redis-shake/rump.go | 12 +-- src/redis-shake/sync.go | 40 ++++---- src/vendor/vendor.json | 6 ++ 12 files changed, 380 insertions(+), 157 deletions(-) create mode 100644 src/redis-shake/common/configure.go diff --git a/ChangeLog b/ChangeLog index fe65182..ea5e4a5 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +2019-05-14 Alibaba Cloud. + * VERSION: 1.6.1 + * IMPROVE: support fetching db address from sentinel, the failover + mechanism is not yet supported. 2019-05-08 Alibaba Cloud. * VERSION: 1.6.0 * FEATURE: source address supports cluster. diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index d494a04..1a8f4aa 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -25,40 +25,49 @@ parallel = 32 # source redis configuration. # used in `dump`, `sync` and `rump`. +# source redis type, e.g. "standalone" (default), "sentinel" or "cluster". +# 1. "standalone": standalone db mode. +# 2. "sentinel": the redis address is read from sentinel. +# 3. "cluster": the source redis has several db. +# 4. "proxy": the proxy address, currently, only used in "rump" mode. +# 源端redis的类型,支持standalone,sentinel,cluster和proxy四种模式,注意:目前proxy只用于rump模式。 +source.type = standalone # ip:port -# the source address can be redis db or cluster that has several db nodes split by semicolon(;). -# if the type is `rump`, source type can also be the proxy address. -# 源redis地址(ip:port),源端支持开源cluster以及db节点暴露且有权限的proxy集群模式,不同db以分号(;) -# 分割,例如:ip1:port1;ip2;port2;ip3;port3。 +# the source address can be the following: +# 1. single db address. for "standalone" type. +# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:master@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. +# 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. e.g., 10.1.1.1:20331;10.1.1.2:20441. +# 4. proxy address(used in "rump" mode only). for "proxy" type. +# 源redis地址。对于sentinel模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址" source.address = 127.0.0.1:20441 # password. source.password_raw = 123456 # auth type, don't modify it source.auth_type = auth -# target redis configuration. used in `restore` and `sync`. -# used in `restore`, `sync` and `rump`. +# target redis configuration. used in `restore`, `sync` and `rump`. +# the type of target redis can be "standalone", "proxy" or "cluster". +# 1. "standalone": standalone db mode. +# 2. "sentinel": the redis address is read from sentinel. +# 3. "cluster": open source cluster (not supported currently). +# 4. "proxy": proxy layer ahead redis. Data will be inserted in a round-robin way if more than 1 proxy given. +# 目的redis的类型,支持standalone,sentinel,cluster和proxy四种模式。 +target.type = standalone # ip:port -# the source address can be cluster that has several db nodes split by semicolon(;). -# 目的redis地址(ip:port),目的端支持开源的cluster架构和proxy模式,不同db以分号(;)分割,例如: -# ip1:port1;ip2;port2;ip3;port3。 +# the target address can be the following: +# 1. single db address. for "standalone" type. +# 2. sentinel_master_name@sentinel single/cluster address, e.g., mymaster@127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type. +# 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. +# 4. proxy address(used in "rump" mode only). for "proxy" type. target.address = 127.0.0.1:20551 # password. -target.password_raw = 123456 +target.password_raw = # auth type, don't modify it target.auth_type = auth -# the type of target redis can be `standalone`, `proxy` or `cluster`. -# `standalone`: standalone db mode. -# `proxy`: proxy layer ahead redis. -# `cluster`: open source cluster (not supported currently) -# If the target is proxy, data will be inserted in a round-robin way. -# standalone表示单个db节点进行写入(包括主从模式),proxy表示写入的是代理层,将会以轮回的方式进行写入,1个proxy对应一条 -# 长连接通道(1个源端db的数据只会写入1个proxy),cluster表示开源的集群架构 -target.type = standalone # all the data will be written into this db. < 0 means disable. target.db = -1 -# input RDB file. read from stdin, default is stdin ('/dev/stdin'). +# input RDB file. # used in `decode` and `restore`. # if the input is list split by semicolon(;), redis-shake will restore the list one by one. # 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2 diff --git a/src/redis-shake/common/configure.go b/src/redis-shake/common/configure.go new file mode 100644 index 0000000..a6994b1 --- /dev/null +++ b/src/redis-shake/common/configure.go @@ -0,0 +1,127 @@ +package utils + +import ( + "strings" + "fmt" + + "redis-shake/configure" +) + +const ( + AddressSplitter = "@" + AddressHeaderSplitter = ":" + AddressClusterSplitter = ";" +) + +// parse source address and target address +func ParseAddress(tp string) error { + // check source + if tp == conf.TypeDump || tp == conf.TypeSync || tp == conf.TypeRump { + if err := parseAddress(tp, conf.Options.SourceAddress, conf.Options.SourceType, true); err != nil { + return err + } + + if len(conf.Options.SourceAddressList) == 0 { + return fmt.Errorf("source address shouldn't be empty when type in {dump, sync, rump}") + } + } + + // check target + if tp == conf.TypeRestore || tp == conf.TypeSync || tp == conf.TypeRump { + if err := parseAddress(tp, conf.Options.TargetAddress, conf.Options.TargetType, false); err != nil { + return err + } + + if len(conf.Options.TargetAddressList) == 0 { + return fmt.Errorf("target address shouldn't be empty when type in {restore, sync, rump}") + } + } + + return nil +} + +func parseAddress(tp, address, redisType string, isSource bool) error { + addressLen := len(splitCluster(redisType)) + if addressLen == 0 { + return fmt.Errorf("address length[%v] illegal", addressLen) + } + + switch redisType { + case "": + fallthrough + case conf.RedisTypeStandalone: + if addressLen != 1 { + return fmt.Errorf("address[%v] length[%v] must == 1 when type is 'standalone'", address, addressLen) + } + setAddressList(isSource, address) + case conf.RedisTypeSentinel: + arr := strings.Split(address, AddressSplitter) + if len(arr) != 2 { + return fmt.Errorf("address[%v] length[%v] != 2", address, len(arr)) + } + + var masterName string + var fromMaster bool + if strings.Contains(arr[0], AddressHeaderSplitter) { + arrHeader := strings.Split(arr[0], AddressHeaderSplitter) + if isSource { + masterName = arrHeader[0] + fromMaster = arrHeader[1] == conf.StandAloneRoleMaster + } else { + masterName = arrHeader[0] + fromMaster = true + } + } else { + masterName = arr[0] + fromMaster = true + } + + clusterList := strings.Split(arr[1], AddressClusterSplitter) + + if isSource { + // get real source + if source, err := GetReadableRedisAddressThroughSentinel(clusterList, masterName, fromMaster); err != nil { + return err + } else { + conf.Options.SourceAddressList = []string{source} + } + } else { + // get real target + if target, err := GetWritableRedisAddressThroughSentinel(clusterList, masterName); err != nil { + return err + } else { + conf.Options.TargetAddressList = []string{target} + } + } + case conf.RedisTypeCluster: + if isSource == false { + return fmt.Errorf("target type can't be cluster currently") + } + setAddressList(isSource, address) + case conf.RedisTypeProxy: + if addressLen != 1 { + return fmt.Errorf("address[%v] length[%v] must == 1 when type is 'proxy'", addressLen, addressLen) + } + if isSource && tp != conf.TypeRump { + return fmt.Errorf("source.type == proxy should only happens when mode is 'rump'") + } + + setAddressList(isSource, address) + default: + return fmt.Errorf("unknown type[%v]", redisType) + } + + return nil +} + +func splitCluster(input string) []string { + return strings.Split(input, AddressClusterSplitter) +} + +func setAddressList(isSource bool, address string) { + if isSource { + conf.Options.SourceAddressList = strings.Split(address, AddressClusterSplitter) + } else { + conf.Options.TargetAddressList = strings.Split(address, AddressClusterSplitter) + } +} \ No newline at end of file diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 5023cbd..9660d89 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -9,6 +9,7 @@ import ( "encoding/binary" "fmt" "io" + "math/rand" "net" "os" "strconv" @@ -23,6 +24,7 @@ import ( "pkg/redis" "redis-shake/configure" + "github.com/FZambia/go-sentinel" redigo "github.com/garyburd/redigo/redis" ) @@ -129,6 +131,7 @@ func OpenSyncConn(target string, auth_type, passwd string) (net.Conn, <-chan int return c, waitRdbDump(c) } +// pipeline mode which means we don't wait all dump finish and run the next step func waitRdbDump(r io.Reader) <-chan int64 { size := make(chan int64) // read rdb size @@ -696,7 +699,7 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { // TODO, need to judge big key if e.Type != rdb.RDBTypeStreamListPacks && - (uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0) { + (uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0) { //use command if conf.Options.Rewrite && e.NeedReadLen == 1 { if !conf.Options.Metric { @@ -897,3 +900,62 @@ func GetLocalIp(preferdInterfaces []string) (ip string, interfaceName string, er } return ip, "", fmt.Errorf("fetch local ip failed, interfaces: %s", strings.Join(preferdInterfaces, ",")) } + +// GetReadableRedisAddressThroughSentinel gets readable redis address +// First, the function will pick one from available slaves randomly. +// If there is no available slave, it will pick master. +func GetReadableRedisAddressThroughSentinel(sentinelAddrs []string, sentinelMasterName string, fromMaster bool) (string, error) { + sentinelGroup := sentinel.Sentinel{ + Addrs: sentinelAddrs, + MasterName: sentinelMasterName, + Dial: defaultDialFunction, + } + if fromMaster == false { + if slaves, err := sentinelGroup.Slaves(); err == nil { + if addr, err := getAvailableSlaveAddress(slaves); err == nil { + return addr, nil + } else { + return "", err + } + } else { + return "", err + } + } + return sentinelGroup.MasterAddr() +} + +// getAvailableSlaveAddress picks a slave address randomly. +func getAvailableSlaveAddress(slaves []*sentinel.Slave) (string, error) { + for { + length := len(slaves) + if length == 0 { + break + } + randSlaveIndex := rand.Intn(length) + if slave := slaves[randSlaveIndex]; slave.Available() { + return slave.Addr(), nil + } + slaves = append(slaves[:randSlaveIndex], slaves[randSlaveIndex+1:]...) + } + return "", fmt.Errorf("there is no available slave") +} + +// getWritableRedisAddressThroughSentinel gets writable redis address +// The function will return redis master address. +func GetWritableRedisAddressThroughSentinel(sentinelAddrs []string, sentinelMasterName string) (string, error) { + sentinelGroup := sentinel.Sentinel{ + Addrs: sentinelAddrs, + MasterName: sentinelMasterName, + Dial: defaultDialFunction, + } + return sentinelGroup.MasterAddr() +} + +var defaultDialFunction = func(addr string) (redigo.Conn, error) { + timeout := 500 * time.Millisecond + c, err := redigo.DialTimeout("tcp", addr, timeout, timeout, timeout) + if err != nil { + return nil, err + } + return c, nil +} \ No newline at end of file diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index afc3642..59d10d0 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -11,13 +11,14 @@ type Configuration struct { HttpProfile int `config:"http_profile"` NCpu int `config:"ncpu"` Parallel int `config:"parallel"` - SourceAddress []string `config:"source.address"` + SourceType string `config:"source.type"` + SourceAddress string `config:"source.address"` SourcePasswordRaw string `config:"source.password_raw"` SourcePasswordEncoding string `config:"source.password_encoding"` SourceVersion uint `config:"source.version"` SourceAuthType string `config:"source.auth_type"` SourceParallel uint `config:"source.parallel"` - TargetAddress []string `config:"target.address"` + TargetAddress string `config:"target.address"` TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordEncoding string `config:"target.password_encoding"` TargetVersion uint `config:"target.version"` @@ -55,11 +56,30 @@ type Configuration struct { SockFileName string `config:"sock.file_name"` SockFileSize uint `config:"sock.file_size"` + /*---------------------------------------------------------*/ // generated variables - HeartbeatIp string - ShiftTime time.Duration // shift - TargetRedisVersion string // to_redis_version - TargetReplace bool // to_replace + SourceAddressList []string // source address list + TargetAddressList []string // target address list + HeartbeatIp string // heartbeat ip + ShiftTime time.Duration // shift + TargetRedisVersion string // to_redis_version + TargetReplace bool // to_replace } var Options Configuration + +const ( + RedisTypeStandalone = "standalone" + RedisTypeSentinel = "sentinel" + RedisTypeCluster = "cluster" + RedisTypeProxy = "proxy" + + StandAloneRoleMaster = "master" + StandAloneRoleSlave = "slave" + + TypeDecode = "decode" + TypeRestore = "restore" + TypeDump = "dump" + TypeSync = "sync" + TypeRump = "rump" +) diff --git a/src/redis-shake/decode.go b/src/redis-shake/decode.go index e71dbf7..880a9d5 100644 --- a/src/redis-shake/decode.go +++ b/src/redis-shake/decode.go @@ -43,7 +43,7 @@ func (cmd *CmdDecode) Main() { log.Infof("decode from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.RdbOutput) for i, input := range conf.Options.RdbInput { - // decode one by one + // decode one by one. By now, we don't support decoding concurrence. output := fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i) cmd.decode(input, output) } diff --git a/src/redis-shake/dump.go b/src/redis-shake/dump.go index 4303174..d2fcf7a 100644 --- a/src/redis-shake/dump.go +++ b/src/redis-shake/dump.go @@ -12,16 +12,16 @@ import ( "pkg/libs/atomic2" "pkg/libs/log" - "redis-shake/configure" "redis-shake/common" + "redis-shake/configure" ) type CmdDump struct { dumpChan chan node - wg sync.WaitGroup } type node struct { + id int source string output string } @@ -31,10 +31,11 @@ func (cmd *CmdDump) GetDetailedInfo() interface{} { } func (cmd *CmdDump) Main() { - cmd.dumpChan = make(chan node, len(conf.Options.SourceAddress)) + cmd.dumpChan = make(chan node, len(conf.Options.SourceAddressList)) - for i, source := range conf.Options.SourceAddress { + for i, source := range conf.Options.SourceAddressList { nd := node{ + id: i, source: source, output: fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i), } @@ -45,8 +46,9 @@ func (cmd *CmdDump) Main() { reader *bufio.Reader writer *bufio.Writer nsize int64 + wg sync.WaitGroup ) - cmd.wg.Add(len(conf.Options.SourceAddress)) + wg.Add(len(conf.Options.SourceAddressList)) for i := 0; i < int(conf.Options.SourceParallel); i++ { go func(idx int) { log.Infof("start routine[%v]", idx) @@ -57,18 +59,26 @@ func (cmd *CmdDump) Main() { log.Infof("close routine[%v]", idx) return } - reader, writer, nsize = cmd.dump(nd.source, nd.output) - cmd.wg.Done() + + dd := &dbDumper{ + id: nd.id, + source: nd.source, + sourcePassword: conf.Options.SourcePasswordRaw, + output: nd.output, + } + reader, writer, nsize = dd.dump() + wg.Done() } } }(i) } - cmd.wg.Wait() + wg.Wait() + // all dump finish close(cmd.dumpChan) - if len(conf.Options.SourceAddress) != 1 || !conf.Options.ExtraInfo { + if len(conf.Options.SourceAddressList) != 1 || !conf.Options.ExtraInfo { return } @@ -76,44 +86,75 @@ func (cmd *CmdDump) Main() { cmd.dumpCommand(reader, writer, nsize) } -func (cmd *CmdDump) dump(source, output string) (*bufio.Reader, *bufio.Writer, int64) { - log.Infof("dump from '%s' to '%s'\n", source, output) +func (cmd *CmdDump) dumpCommand(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { + var nread atomic2.Int64 + go func() { + p := make([]byte, utils.ReaderBufferSize) + for { + ncopy := int64(utils.Iocopy(reader, writer, p, len(p))) + nread.Add(ncopy) + utils.FlushWriter(writer) + } + }() + + for { + time.Sleep(time.Second) + log.Infof("dump: total = %d\n", nsize+nread.Get()) + } +} + +/*------------------------------------------------------*/ +// one dump link corresponding to one dbDumper +type dbDumper struct { + id int // id + source string // source address + sourcePassword string + output string // output +} + +func (dd *dbDumper) dump() (*bufio.Reader, *bufio.Writer, int64) { + log.Infof("routine[%v] dump from '%s' to '%s'\n", dd.id, dd.source, dd.output) - dumpto := utils.OpenWriteFile(output) + dumpto := utils.OpenWriteFile(dd.output) defer dumpto.Close() - master, nsize := cmd.sendCmd(source, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) + // send command and get the returned channel + master, nsize := dd.sendCmd(dd.source, conf.Options.SourceAuthType, dd.sourcePassword) defer master.Close() - log.Infof("source db[%v] dump rdb file-size[%d]\n", source, nsize) + log.Infof("routine[%v] source db[%v] dump rdb file-size[%d]\n", dd.id, dd.source, nsize) reader := bufio.NewReaderSize(master, utils.ReaderBufferSize) writer := bufio.NewWriterSize(dumpto, utils.WriterBufferSize) - cmd.dumpRDBFile(reader, writer, nsize) + dd.dumpRDBFile(reader, writer, nsize) return reader, writer, nsize } -func (cmd *CmdDump) sendCmd(master, auth_type, passwd string) (net.Conn, int64) { +func (dd *dbDumper) sendCmd(master, auth_type, passwd string) (net.Conn, int64) { c, wait := utils.OpenSyncConn(master, auth_type, passwd) var nsize int64 + + // wait rdb dump finish for nsize == 0 { select { case nsize = <-wait: if nsize == 0 { - log.Info("+") + log.Infof("routine[%v] +", dd.id) } case <-time.After(time.Second): - log.Info("-") + log.Infof("routine[%v] -", dd.id) } } return c, nsize } -func (cmd *CmdDump) dumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { +func (dd *dbDumper) dumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { var nread atomic2.Int64 wait := make(chan struct{}) + + // read from reader and write into writer int stream way go func() { defer close(wait) p := make([]byte, utils.WriterBufferSize) @@ -125,6 +166,7 @@ func (cmd *CmdDump) dumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsiz } }() + // print stat for done := false; !done; { select { case <-wait: @@ -133,24 +175,7 @@ func (cmd *CmdDump) dumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsiz } n := nread.Get() p := 100 * n / nsize - log.Infof("total = %d - %12d [%3d%%]\n", nsize, n, p) - } - log.Info("dump: rdb done") -} - -func (cmd *CmdDump) dumpCommand(reader *bufio.Reader, writer *bufio.Writer, nsize int64) { - var nread atomic2.Int64 - go func() { - p := make([]byte, utils.ReaderBufferSize) - for { - ncopy := int64(utils.Iocopy(reader, writer, p, len(p))) - nread.Add(ncopy) - utils.FlushWriter(writer) - } - }() - - for { - time.Sleep(time.Second) - log.Infof("dump: total = %d\n", nsize+nread.Get()) + log.Infof("routine[%v] total = %d - %12d [%3d%%]\n", dd.id, nsize, n, p) } + log.Info("routine[%v] dump: rdb done", dd.id) } diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index e744df9..814b7b2 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -27,21 +27,15 @@ import ( "redis-shake/configure" "redis-shake/metric" "redis-shake/restful" + "redis-shake/scanner" "github.com/gugemichael/nimo4go" logRotate "gopkg.in/natefinch/lumberjack.v2" - "redis-shake/scanner" ) type Exit struct{ Code int } const ( - TypeDecode = "decode" - TypeRestore = "restore" - TypeDump = "dump" - TypeSync = "sync" - TypeRump = "rump" - defaultHttpPort = 20881 defaultSystemPort = 20882 defaultSenderSize = 65535 @@ -97,15 +91,15 @@ func main() { // create runner var runner base.Runner switch *tp { - case TypeDecode: + case conf.TypeDecode: runner = new(run.CmdDecode) - case TypeRestore: + case conf.TypeRestore: runner = new(run.CmdRestore) - case TypeDump: + case conf.TypeDump: runner = new(run.CmdDump) - case TypeSync: + case conf.TypeSync: runner = new(run.CmdSync) - case TypeRump: + case conf.TypeRump: runner = new(run.CmdRump) } @@ -165,7 +159,7 @@ func startHttpServer() { // sanitize options func sanitizeOptions(tp string) error { var err error - if tp != TypeDecode && tp != TypeRestore && tp != TypeDump && tp != TypeSync && tp != TypeRump { + if tp != conf.TypeDecode && tp != conf.TypeRestore && tp != conf.TypeDump && tp != conf.TypeSync && tp != conf.TypeRump { return fmt.Errorf("unknown type[%v]", tp) } @@ -193,47 +187,27 @@ func sanitizeOptions(tp string) error { return fmt.Errorf("BigKeyThreshold[%v] should <= 524288000", conf.Options.BigKeyThreshold) } - if tp == TypeRestore || tp == TypeSync || tp == TypeRump { - if len(conf.Options.TargetAddress) == 0 { - return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}") - } - - switch conf.Options.TargetType { - case "standalone": - if len(conf.Options.TargetAddress) != 1 { - return fmt.Errorf("the source address[%v] != 1 when target.type is standalone", - len(conf.Options.TargetAddress)) - } - case "proxy": - case "cluster": - if tp == TypeRump || tp == TypeRestore { - // TODO - return fmt.Errorf("{rump, restore} mode doesn't support cluster currently") - } - return fmt.Errorf("coming soon") - default: - return fmt.Errorf("illegal target.type[%v]", conf.Options.TargetType) - } + // parse source and target address and type + if err := utils.ParseAddress(tp); err != nil { + return fmt.Errorf("mode[%v] parse address failed[%v]", tp, err) } - if (tp == TypeDump || tp == TypeSync || tp == TypeRump) && len(conf.Options.SourceAddress) == 0 { - return fmt.Errorf("source address shouldn't be empty when type in {dump, sync, rump}") - } - if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.RdbInput) == 0 { + + if (tp == conf.TypeRestore || tp == conf.TypeDecode) && len(conf.Options.RdbInput) == 0 { return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}") } - if tp == TypeDump && conf.Options.RdbOutput == "" { + if tp == conf.TypeDump && conf.Options.RdbOutput == "" { conf.Options.RdbOutput = "output-rdb-dump" } if conf.Options.RdbParallel == 0 { - if tp == TypeDump || tp == TypeSync { - conf.Options.RdbParallel = len(conf.Options.SourceAddress) - } else if tp == TypeRestore { + if tp == conf.TypeDump || tp == conf.TypeSync { + conf.Options.RdbParallel = len(conf.Options.SourceAddressList) + } else if tp == conf.TypeRestore { conf.Options.RdbParallel = len(conf.Options.RdbInput) } } - if tp == TypeRestore && conf.Options.RdbParallel > len(conf.Options.RdbInput) { + if tp == conf.TypeRestore && conf.Options.RdbParallel > len(conf.Options.RdbInput) { conf.Options.RdbParallel = len(conf.Options.RdbInput) } @@ -244,8 +218,8 @@ func sanitizeOptions(tp string) error { conf.Options.SourcePasswordRaw = string(sourcePassword) } - if conf.Options.SourceParallel == 0 || conf.Options.SourceParallel > uint(len(conf.Options.SourceAddress)) { - conf.Options.SourceParallel = uint(len(conf.Options.SourceAddress)) + if conf.Options.SourceParallel == 0 || conf.Options.SourceParallel > uint(len(conf.Options.SourceAddressList)) { + conf.Options.SourceParallel = uint(len(conf.Options.SourceAddressList)) } if conf.Options.TargetPasswordRaw != "" && conf.Options.TargetPasswordEncoding != "" { @@ -372,9 +346,9 @@ func sanitizeOptions(tp string) error { conf.Options.SenderCount = defaultSenderCount } - if tp == TypeRestore || tp == TypeSync { + if tp == conf.TypeRestore || tp == conf.TypeSync { // get target redis version and set TargetReplace. - for _, address := range conf.Options.TargetAddress { + for _, address := range conf.Options.TargetAddressList { if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw); err != nil { return fmt.Errorf("get target redis version failed[%v]", err) @@ -393,7 +367,7 @@ func sanitizeOptions(tp string) error { } } - if tp == TypeRump { + if tp == conf.TypeRump { if conf.Options.ScanKeyNumber == 0 { conf.Options.ScanKeyNumber = 100 } @@ -408,7 +382,7 @@ func sanitizeOptions(tp string) error { conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) } - if (conf.Options.ScanSpecialCloud != "" || conf.Options.ScanKeyFile != "") && len(conf.Options.SourceAddress) > 1 { + if (conf.Options.ScanSpecialCloud != "" || conf.Options.ScanKeyFile != "") && len(conf.Options.SourceAddressList) > 1 { return fmt.Errorf("source address should <= 1 when scan.special_cloud[%v] or scan.key_file[%v] given", conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) } @@ -429,4 +403,4 @@ func handleExit() { } panic(e) } -} +} \ No newline at end of file diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index d2294af..675eef9 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -8,16 +8,17 @@ import ( "bytes" "fmt" "io/ioutil" - "time" "strconv" + "time" + "sync" "pkg/libs/atomic2" "pkg/libs/log" "pkg/redis" + "redis-shake/configure" "redis-shake/common" "redis-shake/base" - "sync" ) type CmdRestore struct { @@ -34,7 +35,7 @@ func (cmd *CmdRestore) GetDetailedInfo() interface{} { } func (cmd *CmdRestore) Main() { - log.Infof("restore from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.TargetAddress) + log.Infof("restore from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.TargetAddressList) type restoreNode struct { id int @@ -59,8 +60,8 @@ func (cmd *CmdRestore) Main() { } // round-robin pick - pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddress)) - target := conf.Options.TargetAddress[pick] + pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList)) + target := conf.Options.TargetAddressList[pick] dr := &dbRestorer{ id: node.id, @@ -84,7 +85,7 @@ func (cmd *CmdRestore) Main() { //fake status if set http_port. and wait forever base.Status = "incr" log.Infof("Enabled http stats, set status (incr), and wait forever.") - select{} + select {} } } @@ -134,13 +135,11 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target, auth_type, pa pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize) wait := make(chan struct{}) go func() { - defer close(wait) - group := make(chan int, conf.Options.Parallel) - for i := 0; i < cap(group); i++ { + var wg sync.WaitGroup + wg.Add(conf.Options.Parallel) + for i := 0; i < conf.Options.Parallel; i++ { go func() { - defer func() { - group <- 0 - }() + defer wg.Done() c := utils.OpenRedisConn(target, auth_type, passwd) defer c.Close() var lastdb uint32 = 0 @@ -165,9 +164,8 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target, auth_type, pa } }() } - for i := 0; i < cap(group); i++ { - <-group - } + wg.Wait() + close(wait) }() for done := false; !done; { diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index e5f7243..6948342 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -3,13 +3,13 @@ package run import ( "pkg/libs/log" "strconv" + "sync" "redis-shake/common" "redis-shake/configure" + "redis-shake/scanner" "github.com/garyburd/redigo/redis" - "redis-shake/scanner" - "sync" ) type CmdRump struct { @@ -36,16 +36,16 @@ func (cr *CmdRump) GetDetailedInfo() interface{} { func (cr *CmdRump) Main() { // build connection - cr.sourceConn = make([]redis.Conn, len(conf.Options.SourceAddress)) - for i, address := range conf.Options.SourceAddress { + cr.sourceConn = make([]redis.Conn, len(conf.Options.SourceAddressList)) + for i, address := range conf.Options.SourceAddressList { cr.sourceConn[i] = utils.OpenRedisConn(address, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) } // TODO, current only support write data into 1 db or proxy - cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress[0], conf.Options.TargetAuthType, + cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddressList[0], conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) // init two channels - chanSize := int(conf.Options.ScanKeyNumber) * len(conf.Options.SourceAddress) + chanSize := int(conf.Options.ScanKeyNumber) * len(conf.Options.SourceAddressList) cr.keyChan = make(chan *KeyNode, chanSize) cr.resultChan = make(chan *KeyNode, chanSize) diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 94943e6..21ec967 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -20,10 +20,10 @@ import ( "pkg/libs/io/pipe" "pkg/libs/log" "pkg/redis" + "redis-shake/base" + "redis-shake/command" "redis-shake/common" "redis-shake/configure" - "redis-shake/command" - "redis-shake/base" "redis-shake/heartbeat" "redis-shake/metric" ) @@ -40,8 +40,8 @@ type syncerStat struct { } type cmdDetail struct { - Cmd string - Args [][]byte + Cmd string + Args [][]byte } func (c *cmdDetail) String() string { @@ -79,10 +79,10 @@ func (cmd *CmdSync) Main() { total := utils.GetTotalLink() syncChan := make(chan syncNode, total) cmd.dbSyncers = make([]*dbSyncer, total) - for i, source := range conf.Options.SourceAddress { + for i, source := range conf.Options.SourceAddressList { // round-robin pick - pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddress)) - target := conf.Options.TargetAddress[pick] + pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList)) + target := conf.Options.TargetAddressList[pick] nd := syncNode{ id: i, @@ -391,12 +391,11 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target, auth_type, passwd wait := make(chan struct{}) go func() { defer close(wait) - group := make(chan int, conf.Options.Parallel) - for i := 0; i < cap(group); i++ { + var wg sync.WaitGroup + wg.Add(conf.Options.Parallel) + for i := 0; i < conf.Options.Parallel; i++ { go func() { - defer func() { - group <- 0 - }() + defer wg.Done() c := utils.OpenRedisConn(target, auth_type, passwd) defer c.Close() var lastdb uint32 = 0 @@ -439,9 +438,8 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target, auth_type, passwd } }() } - for i := 0; i < cap(group); i++ { - <-group - } + + wg.Wait() }() var stat *syncerStat @@ -566,9 +564,9 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target, auth_type, passwd var bypass bool = false var isselect bool = false - var scmd string - var argv, new_argv [][]byte - var err error + var scmd string + var argv, new_argv [][]byte + var err error decoder := redis.NewDecoder(reader) @@ -716,9 +714,9 @@ func (ds *dbSyncer) addDelayChan(id int64) { */ used := cap(ds.delayChannel) - len(ds.delayChannel) if used >= 4096 || - used >= 1024 && id % 10 == 0 || - used >= 128 && id % 100 == 0 || - id % 1000 == 0 { + used >= 1024 && id%10 == 0 || + used >= 128 && id%100 == 0 || + id%1000 == 0 { // non-blocking add select { case ds.delayChannel <- &delayNode{t: time.Now(), id: id}: diff --git a/src/vendor/vendor.json b/src/vendor/vendor.json index 92a5ba2..e4b7d1f 100644 --- a/src/vendor/vendor.json +++ b/src/vendor/vendor.json @@ -2,6 +2,12 @@ "comment": "", "ignore": "test", "package": [ + { + "checksumSHA1": "hBqmLpr3P88FPgBk27hzpa5N0mM=", + "path": "github.com/FZambia/go-sentinel", + "revision": "76bd05e8e22f9f8f5e1dd6d3a85b7951da7cce57", + "revisionTime": "2017-12-04T08:54:13Z" + }, { "checksumSHA1": "1/Kf0ihi6RtfNt0JjAtZLKvfqJY=", "path": "github.com/cupcake/rdb", From a7ae4861720f6067dddbefa21bc9a93d040b9283 Mon Sep 17 00:00:00 2001 From: zhuhuikang Date: Thu, 16 May 2019 11:51:07 +0800 Subject: [PATCH 2/2] fix(sync.go):fix wg.Add SourceAddress bug --- src/redis-shake/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 21ec967..603c2ac 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -95,7 +95,7 @@ func (cmd *CmdSync) Main() { } var wg sync.WaitGroup - wg.Add(len(conf.Options.SourceAddress)) + wg.Add(len(conf.Options.SourceAddressList)) for i := 0; i < int(conf.Options.SourceParallel); i++ { go func() {