1.source address supports cluster 2.target address supports several proxies to write data in a round-robin way.

v4
vinllen 6 years ago
parent 9478aae0c1
commit 647804b6fc
  1. 5
      ChangeLog
  2. 51
      conf/redis-shake.conf
  3. 22
      src/redis-shake/common/common.go
  4. 8
      src/redis-shake/configure/configure.go
  5. 6
      src/redis-shake/decode.go
  6. 2
      src/redis-shake/dump.go
  7. 80
      src/redis-shake/main/main.go
  8. 131
      src/redis-shake/restore.go
  9. 3
      src/redis-shake/rump.go
  10. 8
      src/redis-shake/sync.go

@ -1,3 +1,8 @@
2019-04-21 Alibaba Cloud.
* VERSION: 1.6.0
* FEATURE: source address supports cluster.
* FEATURE: target address supports several proxies to write data in
a round-robin way.
2019-04-24 Alibaba Cloud. 2019-04-24 Alibaba Cloud.
* VERSION: 1.4.2 * VERSION: 1.4.2
* IMPROVE: improve rump to support fetching data from given keys in * IMPROVE: improve rump to support fetching data from given keys in

@ -23,19 +23,6 @@ ncpu = 0
# parallel routines number used in RDB file syncing. default is 64. # parallel routines number used in RDB file syncing. default is 64.
parallel = 32 parallel = 32
# input RDB file. read from stdin, default is stdin ('/dev/stdin').
# used in `decode` and `restore`.
# if the input is list split by semicolon(;), redis-shake will restore the list one by one.
# 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2
# redis-shake将会挨个进行恢复。
input_rdb = local
# output RDB file prefix.
# used in `decode` and `dump`.
# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是:
# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2
output_rdb = local_dump
# source redis configuration. # source redis configuration.
# used in `dump`, `sync` and `rump`. # used in `dump`, `sync` and `rump`.
# ip:port # ip:port
@ -48,11 +35,6 @@ source.address = 127.0.0.1:20441
source.password_raw = 123456 source.password_raw = 123456
# auth type, don't modify it # auth type, don't modify it
source.auth_type = auth source.auth_type = auth
# the concurrence of fetching data, default is len(source.address)
# used in `dump` and `sync`.
# 拉取的并发度,默认是source.address中db的个数。假如db节点有5个,但source.parallel=3,那么一次只会
# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。
source.parallel =
# target redis configuration. used in `restore` and `sync`. # target redis configuration. used in `restore` and `sync`.
# used in `restore`, `sync` and `rump`. # used in `restore`, `sync` and `rump`.
@ -65,15 +47,34 @@ target.address = 127.0.0.1:20551
target.password_raw = 123456 target.password_raw = 123456
# auth type, don't modify it # auth type, don't modify it
target.auth_type = auth target.auth_type = auth
# the type of target redis can be `standalone`, `proxy` or `cluster`.
# `standalone`: standalone db mode.
# `proxy`: proxy layer ahead redis.
# `cluster`: open source cluster (not supported currently)
# If the target is proxy, data will be inserted in a round-robin way.
# standalone表示单个db节点进行写入(包括主从模式),proxy表示写入的是代理层,将会以轮回的方式进行写入,1个proxy对应一条
# 长连接通道(1个源端db的数据只会写入1个proxy),cluster表示开源的集群架构
target.type = standalone
# all the data will be written into this db. < 0 means disable. # all the data will be written into this db. < 0 means disable.
target.db = -1 target.db = -1
# the type of target redis: `cluster`, `opensource`. If the target is open source redis cluster,
# redis-shake uses redis-go-cluster driver to write data, otherwise, redigo driver is used to # input RDB file. read from stdin, default is stdin ('/dev/stdin').
# insert data in round robin way. # used in `decode` and `restore`.
# 目的redis的类型:`opensource`和`proxy`. `opensource`表示是开源的cluster或者redis db节点; # if the input is list split by semicolon(;), redis-shake will restore the list one by one.
# `proxy`表示是proxy类型,将会以round robin循环方式写入。对于开源的cluster用redis-go-cluster驱动写入,其余 # 如果是decode或者restore,这个参数表示读取的rdb文件。支持输入列表,例如:rdb.0;rdb.1;rdb.2
# 的则用redigo写入 # redis-shake将会挨个进行恢复。
# target.type = opensource rdb.input = local
# output RDB file prefix.
# used in `decode` and `dump`.
# 如果是decode或者dump,这个参数表示输出的rdb前缀,比如输入有3个db,那么dump分别是:
# ${output_rdb}.0, ${output_rdb}.1, ${output_rdb}.2
rdb.output = local_dump
# the concurrence of fetching data, default is len(source.address) or len(rdb.input).
# used in `dump`, `sync` and `restore`.
# 拉取的并发度,如果是`dump`或者`sync`,默认是source.address中db的个数,`restore`模式默认len(rdb.input)。
# 假如db节点/输入的rdb有5个,但rdb.parallel=3,那么一次只会
# 并发拉取3个db的全量数据,直到某个db的rdb拉取完毕,才会拉取第4个db节点的rdb,以此类推。
rdb.parallel =
# use for expire key, set the time gap when source and target timestamp are not the same. # use for expire key, set the time gap when source and target timestamp are not the same.
# 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值 # 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值

@ -8,7 +8,7 @@ import (
"pkg/libs/bytesize" "pkg/libs/bytesize"
"redis-shake/configure" "redis-shake/configure"
logRotate "gopkg.in/natefinch/lumberjack.v2" logRotate "gopkg.in/natefinch/lumberjack.v2"
) )
@ -26,9 +26,10 @@ const (
) )
var ( var (
Version = "$" Version = "$"
LogRotater *logRotate.Logger LogRotater *logRotate.Logger
StartTime string StartTime string
TargetRoundRobin int
) )
// read until hit the end of RESP: "\r\n" // read until hit the end of RESP: "\r\n"
@ -70,5 +71,16 @@ func ParseInfo(content []byte) map[string]string {
} }
func GetTotalLink() int { func GetTotalLink() int {
return len(conf.Options.SourceAddress) if len(conf.Options.SourceAddress) != 0 {
return len(conf.Options.SourceAddress)
} else {
return len(conf.Options.RdbInput)
}
}
func PickTargetRoundRobin(n int) int {
defer func() {
TargetRoundRobin = (TargetRoundRobin + 1) % n
}()
return TargetRoundRobin
} }

@ -11,20 +11,22 @@ type Configuration struct {
HttpProfile int `config:"http_profile"` HttpProfile int `config:"http_profile"`
NCpu int `config:"ncpu"` NCpu int `config:"ncpu"`
Parallel int `config:"parallel"` Parallel int `config:"parallel"`
InputRdb []string `config:"input_rdb"`
OutputRdb string `config:"output_rdb"`
SourceAddress []string `config:"source.address"` SourceAddress []string `config:"source.address"`
SourcePasswordRaw string `config:"source.password_raw"` SourcePasswordRaw string `config:"source.password_raw"`
SourcePasswordEncoding string `config:"source.password_encoding"` SourcePasswordEncoding string `config:"source.password_encoding"`
SourceVersion uint `config:"source.version"` SourceVersion uint `config:"source.version"`
SourceAuthType string `config:"source.auth_type"` SourceAuthType string `config:"source.auth_type"`
SourceParallel uint `config:"source.parallel"` SourceParallel uint `config:"source.parallel"`
TargetAddress string `config:"target.address"` TargetAddress []string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"` TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"` TargetPasswordEncoding string `config:"target.password_encoding"`
TargetVersion uint `config:"target.version"` TargetVersion uint `config:"target.version"`
TargetDB int `config:"target.db"` TargetDB int `config:"target.db"`
TargetAuthType string `config:"target.auth_type"` TargetAuthType string `config:"target.auth_type"`
TargetType string `config:"target.type"`
RdbInput []string `config:"rdb.input"`
RdbOutput string `config:"rdb.output"`
RdbParallel int `config:"rdb.parallel"`
FakeTime string `config:"fake_time"` FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"` Rewrite bool `config:"rewrite"`
FilterDB string `config:"filter.db"` FilterDB string `config:"filter.db"`

@ -40,11 +40,11 @@ func (cmd *CmdDecode) GetDetailedInfo() interface{} {
} }
func (cmd *CmdDecode) Main() { func (cmd *CmdDecode) Main() {
log.Infof("decode from '%s' to '%s'\n", conf.Options.InputRdb, conf.Options.OutputRdb) log.Infof("decode from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.RdbOutput)
for i, input := range conf.Options.InputRdb { for i, input := range conf.Options.RdbInput {
// decode one by one // decode one by one
output := fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i) output := fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i)
cmd.decode(input, output) cmd.decode(input, output)
} }

@ -36,7 +36,7 @@ func (cmd *CmdDump) Main() {
for i, source := range conf.Options.SourceAddress { for i, source := range conf.Options.SourceAddress {
nd := node{ nd := node{
source: source, source: source,
output: fmt.Sprintf("%s.%d", conf.Options.OutputRdb, i), output: fmt.Sprintf("%s.%d", conf.Options.RdbOutput, i),
} }
cmd.dumpChan <- nd cmd.dumpChan <- nd
} }

@ -193,20 +193,48 @@ func sanitizeOptions(tp string) error {
return fmt.Errorf("BigKeyThreshold[%v] should <= 524288000", conf.Options.BigKeyThreshold) return fmt.Errorf("BigKeyThreshold[%v] should <= 524288000", conf.Options.BigKeyThreshold)
} }
if (tp == TypeRestore || tp == TypeSync) && conf.Options.TargetAddress == "" { if tp == TypeRestore || tp == TypeSync || tp == TypeRump {
return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}") if len(conf.Options.TargetAddress) == 0 {
} return fmt.Errorf("target address shouldn't be empty when type in {restore, sync}")
if (tp == TypeDump || tp == TypeSync) && len(conf.Options.SourceAddress) == 0 { }
return fmt.Errorf("source address shouldn't be empty when type in {dump, sync}")
switch conf.Options.TargetType {
case "standalone":
if len(conf.Options.TargetAddress) != 1 {
return fmt.Errorf("the source address[%v] != 1 when target.type is standalone",
len(conf.Options.TargetAddress))
}
case "proxy":
case "cluster":
if tp == TypeRump || tp == TypeRestore {
// TODO
return fmt.Errorf("{rump, restore} mode doesn't support cluster currently")
}
return fmt.Errorf("coming soon")
default:
return fmt.Errorf("illegal target.type[%v]", conf.Options.TargetType)
}
} }
if tp == TypeRump && (len(conf.Options.SourceAddress) == 0 || conf.Options.TargetAddress == "") { if (tp == TypeDump || tp == TypeSync || tp == TypeRump) && len(conf.Options.SourceAddress) == 0 {
return fmt.Errorf("source and target address shouldn't be empty when type in {rump}") return fmt.Errorf("source address shouldn't be empty when type in {dump, sync, rump}")
} }
if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.InputRdb) == 0 { if (tp == TypeRestore || tp == TypeDecode) && len(conf.Options.RdbInput) == 0 {
return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}") return fmt.Errorf("input rdb shouldn't be empty when type in {restore, decode}")
} }
if tp == TypeDump && conf.Options.OutputRdb == "" { if tp == TypeDump && conf.Options.RdbOutput == "" {
conf.Options.OutputRdb = "output-rdb-dump" conf.Options.RdbOutput = "output-rdb-dump"
}
if conf.Options.RdbParallel == 0 {
if tp == TypeDump || tp == TypeSync {
conf.Options.RdbParallel = len(conf.Options.SourceAddress)
} else if tp == TypeRestore {
conf.Options.RdbParallel = len(conf.Options.RdbInput)
}
}
if tp == TypeRestore && conf.Options.RdbParallel > len(conf.Options.RdbInput) {
conf.Options.RdbParallel = len(conf.Options.RdbInput)
} }
if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" { if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" {
@ -318,45 +346,51 @@ func sanitizeOptions(tp string) error {
if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 { if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 {
return fmt.Errorf("HttpProfile[%v] should in [0, 65535]", conf.Options.HttpProfile) return fmt.Errorf("HttpProfile[%v] should in [0, 65535]", conf.Options.HttpProfile)
} else if conf.Options.HttpProfile == 0 { } else if conf.Options.HttpProfile == 0 {
// set to default when not set // set to default when not set
conf.Options.HttpProfile = defaultHttpPort conf.Options.HttpProfile = defaultHttpPort
} }
if conf.Options.SystemProfile < 0 || conf.Options.SystemProfile > 65535 { if conf.Options.SystemProfile < 0 || conf.Options.SystemProfile > 65535 {
return fmt.Errorf("SystemProfile[%v] should in [0, 65535]", conf.Options.SystemProfile) return fmt.Errorf("SystemProfile[%v] should in [0, 65535]", conf.Options.SystemProfile)
} else if conf.Options.SystemProfile == 0 { } else if conf.Options.SystemProfile == 0 {
// set to default when not set // set to default when not set
conf.Options.SystemProfile = defaultSystemPort conf.Options.SystemProfile = defaultSystemPort
} }
if conf.Options.SenderSize < 0 || conf.Options.SenderSize >= 1073741824 { if conf.Options.SenderSize < 0 || conf.Options.SenderSize >= 1073741824 {
return fmt.Errorf("SenderSize[%v] should in [0, 1073741824]", conf.Options.SenderSize) return fmt.Errorf("SenderSize[%v] should in [0, 1073741824]", conf.Options.SenderSize)
} else if conf.Options.SenderSize == 0 { } else if conf.Options.SenderSize == 0 {
// set to default when not set // set to default when not set
conf.Options.SenderSize = defaultSenderSize conf.Options.SenderSize = defaultSenderSize
} }
if conf.Options.SenderCount < 0 || conf.Options.SenderCount >= 100000 { if conf.Options.SenderCount < 0 || conf.Options.SenderCount >= 100000 {
return fmt.Errorf("SenderCount[%v] should in [0, 100000]", conf.Options.SenderCount) return fmt.Errorf("SenderCount[%v] should in [0, 100000]", conf.Options.SenderCount)
} else if conf.Options.SenderCount == 0 { } else if conf.Options.SenderCount == 0 {
// set to default when not set // set to default when not set
conf.Options.SenderCount = defaultSenderCount conf.Options.SenderCount = defaultSenderCount
} }
if tp == TypeRestore || tp == TypeSync { if tp == TypeRestore || tp == TypeSync {
// get target redis version and set TargetReplace. // get target redis version and set TargetReplace.
if conf.Options.TargetRedisVersion, err = utils.GetRedisVersion(conf.Options.TargetAddress, for _, address := range conf.Options.TargetAddress {
conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw); err != nil { if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType,
return fmt.Errorf("get target redis version failed[%v]", err) conf.Options.TargetPasswordRaw); err != nil {
} else { return fmt.Errorf("get target redis version failed[%v]", err)
if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") || } else if conf.Options.TargetRedisVersion != "" && conf.Options.TargetRedisVersion != v {
strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") { return fmt.Errorf("target redis version is different: [%v %v]", conf.Options.TargetRedisVersion, v)
conf.Options.TargetReplace = true
} else { } else {
conf.Options.TargetReplace = false conf.Options.TargetRedisVersion = v
} }
} }
if strings.HasPrefix(conf.Options.TargetRedisVersion, "4.") ||
strings.HasPrefix(conf.Options.TargetRedisVersion, "3.") ||
strings.HasPrefix(conf.Options.TargetRedisVersion, "5.") {
conf.Options.TargetReplace = true
} else {
conf.Options.TargetReplace = false
}
} }
if tp == TypeRump { if tp == TypeRump {
@ -365,7 +399,7 @@ func sanitizeOptions(tp string) error {
} }
if conf.Options.ScanSpecialCloud != "" && conf.Options.ScanSpecialCloud != scanner.TencentCluster && if conf.Options.ScanSpecialCloud != "" && conf.Options.ScanSpecialCloud != scanner.TencentCluster &&
conf.Options.ScanSpecialCloud != scanner.AliyunCluster { conf.Options.ScanSpecialCloud != scanner.AliyunCluster {
return fmt.Errorf("special cloud type[%s] is not supported", conf.Options.ScanSpecialCloud) return fmt.Errorf("special cloud type[%s] is not supported", conf.Options.ScanSpecialCloud)
} }

@ -17,12 +17,10 @@ import (
"redis-shake/common" "redis-shake/common"
"strconv" "strconv"
"redis-shake/base" "redis-shake/base"
"sync"
) )
type CmdRestore struct { type CmdRestore struct {
rbytes, ebytes, nentry, ignore atomic2.Int64
forward, nbypass atomic2.Int64
} }
type cmdRestoreStat struct { type cmdRestoreStat struct {
@ -31,31 +29,57 @@ type cmdRestoreStat struct {
forward, nbypass int64 forward, nbypass int64
} }
func (cmd *CmdRestore) Stat() *cmdRestoreStat {
return &cmdRestoreStat{
rbytes: cmd.rbytes.Get(),
ebytes: cmd.ebytes.Get(),
nentry: cmd.nentry.Get(),
ignore: cmd.ignore.Get(),
forward: cmd.forward.Get(),
nbypass: cmd.nbypass.Get(),
}
}
func (cmd *CmdRestore) GetDetailedInfo() interface{} { func (cmd *CmdRestore) GetDetailedInfo() interface{} {
return nil return nil
} }
func (cmd *CmdRestore) Main() { func (cmd *CmdRestore) Main() {
log.Infof("restore from '%s' to '%s'\n", conf.Options.InputRdb, conf.Options.TargetAddress) log.Infof("restore from '%s' to '%s'\n", conf.Options.RdbInput, conf.Options.TargetAddress)
type restoreNode struct {
id int
input string
}
base.Status = "waitRestore" base.Status = "waitRestore"
for _, input := range conf.Options.InputRdb { total := utils.GetTotalLink()
// restore one by one restoreChan := make(chan restoreNode, total)
cmd.restore(input)
for i, rdb := range conf.Options.RdbInput {
restoreChan <- restoreNode{id: i, input: rdb}
} }
var wg sync.WaitGroup
wg.Add(len(conf.Options.RdbInput))
for i := 0; i < conf.Options.RdbParallel; i++ {
go func() {
for {
node, ok := <-restoreChan
if !ok {
break
}
// round-robin pick
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddress))
target := conf.Options.TargetAddress[pick]
dr := &dbRestorer{
id: node.id,
input: node.input,
target: target,
targetPassword: conf.Options.TargetPasswordRaw,
}
dr.restore()
log.Infof("routine[%v] starts restoring data from %v to %v",
dr.id, dr.input, dr.target)
wg.Done()
}
}()
}
wg.Wait()
close(restoreChan)
if conf.Options.HttpProfile > 0 { if conf.Options.HttpProfile > 0 {
//fake status if set http_port. and wait forever //fake status if set http_port. and wait forever
base.Status = "incr" base.Status = "incr"
@ -64,25 +88,50 @@ func (cmd *CmdRestore) Main() {
} }
} }
func (cmd *CmdRestore) restore(input string) { /*------------------------------------------------------*/
readin, nsize := utils.OpenReadFile(input) // one restore link corresponding to one dbRestorer
type dbRestorer struct {
id int // id
input string // input rdb
target string
targetPassword string
// metric
rbytes, ebytes, nentry, ignore atomic2.Int64
forward, nbypass atomic2.Int64
}
func (dr *dbRestorer) Stat() *cmdRestoreStat {
return &cmdRestoreStat{
rbytes: dr.rbytes.Get(),
ebytes: dr.ebytes.Get(),
nentry: dr.nentry.Get(),
ignore: dr.ignore.Get(),
forward: dr.forward.Get(),
nbypass: dr.nbypass.Get(),
}
}
func (dr *dbRestorer) restore() {
readin, nsize := utils.OpenReadFile(dr.input)
defer readin.Close() defer readin.Close()
base.Status = "restore" base.Status = "restore"
reader := bufio.NewReaderSize(readin, utils.ReaderBufferSize) reader := bufio.NewReaderSize(readin, utils.ReaderBufferSize)
cmd.restoreRDBFile(reader, conf.Options.TargetAddress, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, dr.restoreRDBFile(reader, dr.target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw,
nsize) nsize)
base.Status = "extra" base.Status = "extra"
if conf.Options.ExtraInfo && (nsize == 0 || nsize != cmd.rbytes.Get()) { if conf.Options.ExtraInfo && (nsize == 0 || nsize != dr.rbytes.Get()) {
cmd.restoreCommand(reader, conf.Options.TargetAddress, conf.Options.TargetAuthType, dr.restoreCommand(reader, dr.target, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw) conf.Options.TargetPasswordRaw)
} }
} }
func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) {
pipe := utils.NewRDBLoader(reader, &cmd.rbytes, base.RDBPipeSize) pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize)
wait := make(chan struct{}) wait := make(chan struct{})
go func() { go func() {
defer close(wait) defer close(wait)
@ -97,9 +146,9 @@ func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, p
var lastdb uint32 = 0 var lastdb uint32 = 0
for e := range pipe { for e := range pipe {
if !base.AcceptDB(e.DB) { if !base.AcceptDB(e.DB) {
cmd.ignore.Incr() dr.ignore.Incr()
} else { } else {
cmd.nentry.Incr() dr.nentry.Incr()
if conf.Options.TargetDB != -1 { if conf.Options.TargetDB != -1 {
if conf.Options.TargetDB != int(lastdb) { if conf.Options.TargetDB != int(lastdb) {
lastdb = uint32(conf.Options.TargetDB) lastdb = uint32(conf.Options.TargetDB)
@ -127,12 +176,12 @@ func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, p
done = true done = true
case <-time.After(time.Second): case <-time.After(time.Second):
} }
stat := cmd.Stat() stat := dr.Stat()
var b bytes.Buffer var b bytes.Buffer
if nsize != 0 { if nsize != 0 {
fmt.Fprintf(&b, "total = %d - %12d [%3d%%]", nsize, stat.rbytes, 100*stat.rbytes/nsize) fmt.Fprintf(&b, "routine[%v] total = %d - %12d [%3d%%]", dr.id, nsize, stat.rbytes, 100*stat.rbytes/nsize)
} else { } else {
fmt.Fprintf(&b, "total = %12d", stat.rbytes) fmt.Fprintf(&b, "routine[%v] total = %12d", dr.id, stat.rbytes)
} }
fmt.Fprintf(&b, " entry=%-12d", stat.nentry) fmt.Fprintf(&b, " entry=%-12d", stat.nentry)
if stat.ignore != 0 { if stat.ignore != 0 {
@ -140,10 +189,10 @@ func (cmd *CmdRestore) restoreRDBFile(reader *bufio.Reader, target, auth_type, p
} }
log.Info(b.String()) log.Info(b.String())
} }
log.Info("restore: rdb done") log.Info("routine[%v] restore: rdb done", dr.id)
} }
func (cmd *CmdRestore) restoreCommand(reader *bufio.Reader, target, auth_type, passwd string) { func (dr *dbRestorer) restoreCommand(reader *bufio.Reader, target, auth_type, passwd string) {
c := utils.OpenNetConn(target, auth_type, passwd) c := utils.OpenNetConn(target, auth_type, passwd)
defer c.Close() defer c.Close()
@ -162,35 +211,35 @@ func (cmd *CmdRestore) restoreCommand(reader *bufio.Reader, target, auth_type, p
for { for {
resp := redis.MustDecode(reader) resp := redis.MustDecode(reader)
if scmd, args, err := redis.ParseArgs(resp); err != nil { if scmd, args, err := redis.ParseArgs(resp); err != nil {
log.PanicError(err, "parse command arguments failed") log.PanicError(err, "routine[%v] parse command arguments failed", dr.id)
} else if scmd != "ping" { } else if scmd != "ping" {
if scmd == "select" { if scmd == "select" {
if len(args) != 1 { if len(args) != 1 {
log.Panicf("select command len(args) = %d", len(args)) log.Panicf("routine[%v] select command len(args) = %d", dr.id, len(args))
} }
s := string(args[0]) s := string(args[0])
n, err := strconv.Atoi(s) n, err := strconv.Atoi(s)
if err != nil { if err != nil {
log.PanicErrorf(err, "parse db = %s failed", s) log.PanicErrorf(err, "routine[%v] parse db = %s failed", dr.id, s)
} }
bypass = !base.AcceptDB(uint32(n)) bypass = !base.AcceptDB(uint32(n))
} }
if bypass { if bypass {
cmd.nbypass.Incr() dr.nbypass.Incr()
continue continue
} }
} }
cmd.forward.Incr() dr.forward.Incr()
redis.MustEncode(writer, resp) redis.MustEncode(writer, resp)
utils.FlushWriter(writer) utils.FlushWriter(writer)
} }
}() }()
for lstat := cmd.Stat(); ; { for lstat := dr.Stat(); ; {
time.Sleep(time.Second) time.Sleep(time.Second)
nstat := cmd.Stat() nstat := dr.Stat()
var b bytes.Buffer var b bytes.Buffer
fmt.Fprintf(&b, "restore: ") fmt.Fprintf(&b, "routine[%v] restore: ", dr.id)
fmt.Fprintf(&b, " +forward=%-6d", nstat.forward-lstat.forward) fmt.Fprintf(&b, " +forward=%-6d", nstat.forward-lstat.forward)
fmt.Fprintf(&b, " +nbypass=%-6d", nstat.nbypass-lstat.nbypass) fmt.Fprintf(&b, " +nbypass=%-6d", nstat.nbypass-lstat.nbypass)
log.Info(b.String()) log.Info(b.String())

@ -40,7 +40,8 @@ func (cr *CmdRump) Main() {
for i, address := range conf.Options.SourceAddress { for i, address := range conf.Options.SourceAddress {
cr.sourceConn[i] = utils.OpenRedisConn(address, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw) cr.sourceConn[i] = utils.OpenRedisConn(address, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw)
} }
cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType, // TODO, current only support write data into 1 db or proxy
cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress[0], conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw) conf.Options.TargetPasswordRaw)
// init two channels // init two channels

@ -80,11 +80,15 @@ func (cmd *CmdSync) Main() {
syncChan := make(chan syncNode, total) syncChan := make(chan syncNode, total)
cmd.dbSyncers = make([]*dbSyncer, total) cmd.dbSyncers = make([]*dbSyncer, total)
for i, source := range conf.Options.SourceAddress { for i, source := range conf.Options.SourceAddress {
// round-robin pick
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddress))
target := conf.Options.TargetAddress[pick]
nd := syncNode{ nd := syncNode{
id: i, id: i,
source: source, source: source,
sourcePassword: conf.Options.SourcePasswordRaw, sourcePassword: conf.Options.SourcePasswordRaw,
target: conf.Options.TargetAddress, // todo, target address load balance target: target,
targetPassword: conf.Options.TargetPasswordRaw, targetPassword: conf.Options.TargetPasswordRaw,
} }
syncChan <- nd syncChan <- nd
@ -125,7 +129,7 @@ func (cmd *CmdSync) Main() {
} }
/*------------------------------------------------------*/ /*------------------------------------------------------*/
// one sync tunnel corresponding to one dbSyncer // one sync link corresponding to one dbSyncer
func NewDbSyncer(id int, source, sourcePassword, target, targetPassword string, httpPort int) *dbSyncer { func NewDbSyncer(id int, source, sourcePassword, target, targetPassword string, httpPort int) *dbSyncer {
ds := &dbSyncer{ ds := &dbSyncer{
id: id, id: id,

Loading…
Cancel
Save