|
|
|
@ -71,7 +71,7 @@ func (cmd *CmdSync) Main() { |
|
|
|
|
id int |
|
|
|
|
source string |
|
|
|
|
sourcePassword string |
|
|
|
|
target string |
|
|
|
|
target []string |
|
|
|
|
targetPassword string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -80,9 +80,14 @@ func (cmd *CmdSync) Main() { |
|
|
|
|
syncChan := make(chan syncNode, total) |
|
|
|
|
cmd.dbSyncers = make([]*dbSyncer, total) |
|
|
|
|
for i, source := range conf.Options.SourceAddressList { |
|
|
|
|
// round-robin pick
|
|
|
|
|
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList)) |
|
|
|
|
target := conf.Options.TargetAddressList[pick] |
|
|
|
|
var target []string |
|
|
|
|
if conf.Options.TargetType == conf.RedisTypeCluster { |
|
|
|
|
target = conf.Options.TargetAddressList |
|
|
|
|
} else { |
|
|
|
|
// round-robin pick
|
|
|
|
|
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList)) |
|
|
|
|
target = []string{conf.Options.TargetAddressList[pick]} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
nd := syncNode{ |
|
|
|
|
id: i, |
|
|
|
@ -130,13 +135,12 @@ func (cmd *CmdSync) Main() { |
|
|
|
|
|
|
|
|
|
/*------------------------------------------------------*/ |
|
|
|
|
// one sync link corresponding to one dbSyncer
|
|
|
|
|
func NewDbSyncer(id int, source, sourcePassword, target, targetPassword string, httpPort int) *dbSyncer { |
|
|
|
|
func NewDbSyncer(id int, source, sourcePassword string, target []string, targetPassword string, httpPort int) *dbSyncer { |
|
|
|
|
ds := &dbSyncer{ |
|
|
|
|
id: id, |
|
|
|
|
source: source, |
|
|
|
|
sourcePassword: sourcePassword, |
|
|
|
|
//todo
|
|
|
|
|
//target: target,
|
|
|
|
|
target: target, |
|
|
|
|
targetPassword: targetPassword, |
|
|
|
|
httpProfilePort: httpPort, |
|
|
|
|
waitFull: make(chan struct{}), |
|
|
|
@ -239,18 +243,16 @@ func (ds *dbSyncer) sync() { |
|
|
|
|
go heartbeatCtl.Start() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// reader := bufio.NewReaderSize(input, utils.ReaderBufferSize)
|
|
|
|
|
reader := bufio.NewReaderSize(input, utils.ReaderBufferSize) |
|
|
|
|
|
|
|
|
|
// sync rdb
|
|
|
|
|
base.Status = "full" |
|
|
|
|
// todo
|
|
|
|
|
// ds.syncRDBFile(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, nsize)
|
|
|
|
|
ds.syncRDBFile(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, nsize) |
|
|
|
|
|
|
|
|
|
// sync increment
|
|
|
|
|
base.Status = "incr" |
|
|
|
|
close(ds.waitFull) |
|
|
|
|
// todo
|
|
|
|
|
//ds.syncCommand(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword)
|
|
|
|
|
ds.syncCommand(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *dbSyncer) sendSyncCmd(master, auth_type, passwd string) (net.Conn, int64) { |
|
|
|
@ -389,7 +391,7 @@ func (ds *dbSyncer) pSyncPipeCopy(c net.Conn, br *bufio.Reader, bw *bufio.Writer |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) { |
|
|
|
|
func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64) { |
|
|
|
|
pipe := utils.NewRDBLoader(reader, &ds.rbytes, base.RDBPipeSize) |
|
|
|
|
wait := make(chan struct{}) |
|
|
|
|
go func() { |
|
|
|
@ -399,8 +401,7 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target, auth_type, passwd |
|
|
|
|
for i := 0; i < conf.Options.Parallel; i++ { |
|
|
|
|
go func() { |
|
|
|
|
defer wg.Done() |
|
|
|
|
// todo
|
|
|
|
|
c := utils.OpenRedisConn([]string{target}, auth_type, passwd, false) |
|
|
|
|
c := utils.OpenRedisConn(target, auth_type, passwd, conf.Options.TargetType == conf.RedisTypeCluster) |
|
|
|
|
defer c.Close() |
|
|
|
|
var lastdb uint32 = 0 |
|
|
|
|
for e := range pipe { |
|
|
|
@ -467,8 +468,11 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target, auth_type, passwd |
|
|
|
|
log.Infof("dbSyncer[%v] sync rdb done", ds.id) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target, auth_type, passwd string) { |
|
|
|
|
c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) |
|
|
|
|
func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type, passwd string) { |
|
|
|
|
readeTimeout := time.Duration(10)*time.Minute |
|
|
|
|
writeTimeout := time.Duration(10)*time.Minute |
|
|
|
|
isCluster := conf.Options.TargetType == conf.RedisTypeCluster |
|
|
|
|
c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, readeTimeout, writeTimeout, isCluster) |
|
|
|
|
defer c.Close() |
|
|
|
|
|
|
|
|
|
ds.sendBuf = make(chan cmdDetail, conf.Options.SenderCount) |
|
|
|
@ -481,8 +485,8 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target, auth_type, passwd |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
srcConn := utils.OpenRedisConnWithTimeout(ds.source, conf.Options.SourceAuthType, ds.sourcePassword, |
|
|
|
|
time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) |
|
|
|
|
srcConn := utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType, ds.sourcePassword, |
|
|
|
|
readeTimeout, writeTimeout, false) |
|
|
|
|
ticker := time.NewTicker(10 * time.Second) |
|
|
|
|
for range ticker.C { |
|
|
|
|
offset, err := utils.GetFakeSlaveOffset(srcConn) |
|
|
|
@ -493,11 +497,11 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target, auth_type, passwd |
|
|
|
|
|
|
|
|
|
// Reconnect while network error happen
|
|
|
|
|
if err == io.EOF { |
|
|
|
|
srcConn = utils.OpenRedisConnWithTimeout(ds.source, conf.Options.SourceAuthType, |
|
|
|
|
ds.sourcePassword, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) |
|
|
|
|
srcConn = utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType, |
|
|
|
|
ds.sourcePassword, readeTimeout, writeTimeout, false) |
|
|
|
|
} else if _, ok := err.(net.Error); ok { |
|
|
|
|
srcConn = utils.OpenRedisConnWithTimeout(ds.source, conf.Options.SourceAuthType, |
|
|
|
|
ds.sourcePassword, time.Duration(10)*time.Minute, time.Duration(10)*time.Minute) |
|
|
|
|
srcConn = utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType, |
|
|
|
|
ds.sourcePassword, readeTimeout, writeTimeout, false) |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// ds.SyncStat.SetOffset(offset)
|
|
|
|
@ -673,6 +677,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target, auth_type, passwd |
|
|
|
|
noFlushCount += 1 |
|
|
|
|
|
|
|
|
|
ds.forward.Incr() |
|
|
|
|
ds.wbytes.Add(int64(length)) |
|
|
|
|
metric.GetMetric(ds.id).AddPushCmdCount(1) |
|
|
|
|
metric.GetMetric(ds.id).AddNetworkFlow(uint64(length)) |
|
|
|
|
sendId.Incr() |
|
|
|
@ -700,9 +705,9 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target, auth_type, passwd |
|
|
|
|
nstat := ds.Stat() |
|
|
|
|
var b bytes.Buffer |
|
|
|
|
fmt.Fprintf(&b, "dbSyncer[%v] sync: ", ds.id) |
|
|
|
|
fmt.Fprintf(&b, " +forward=%-6d", nstat.forward-lstat.forward) |
|
|
|
|
fmt.Fprintf(&b, " +nbypass=%-6d", nstat.nbypass-lstat.nbypass) |
|
|
|
|
fmt.Fprintf(&b, " +nbytes=%d", nstat.wbytes-lstat.wbytes) // todo
|
|
|
|
|
fmt.Fprintf(&b, " +forwardCommands=%-6d", nstat.forward-lstat.forward) |
|
|
|
|
fmt.Fprintf(&b, " +filterCommands=%-6d", nstat.nbypass-lstat.nbypass) |
|
|
|
|
fmt.Fprintf(&b, " +writeBytes=%d", nstat.wbytes-lstat.wbytes) |
|
|
|
|
log.Info(b.String()) |
|
|
|
|
lstat = nstat |
|
|
|
|
} |
|
|
|
|