|
|
|
@ -100,10 +100,8 @@ func (cmd *CmdSync) Main() { |
|
|
|
|
log.Panic("invalid argument: target") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
log.Infof("sync from '%s' to '%s' http '%d'\n", from, target, conf.Options.HttpProfile) |
|
|
|
|
|
|
|
|
|
log.Infof("sync from '%s' to '%s' with http-port[%d]\n", from, target, conf.Options.HttpProfile) |
|
|
|
|
cmd.wait_full = make(chan struct{}) |
|
|
|
|
log.Infof("sync from '%s' to '%s'\n", from, target) |
|
|
|
|
|
|
|
|
|
var sockfile *os.File |
|
|
|
|
if len(conf.Options.SockFileName) != 0 { |
|
|
|
@ -121,7 +119,7 @@ func (cmd *CmdSync) Main() { |
|
|
|
|
} |
|
|
|
|
defer input.Close() |
|
|
|
|
|
|
|
|
|
log.Infof("rdb file = %d\n", nsize) |
|
|
|
|
log.Infof("rdb file size = %d\n", nsize) |
|
|
|
|
|
|
|
|
|
if sockfile != nil { |
|
|
|
|
r, w := pipe.NewFilePipe(int(conf.Options.SockFileSize), sockfile) |
|
|
|
@ -136,6 +134,7 @@ func (cmd *CmdSync) Main() { |
|
|
|
|
input = r |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// start heartbeat
|
|
|
|
|
if len(conf.Options.HeartbeatUrl) > 0 { |
|
|
|
|
heartbeatCtl := heartbeat.HeartbeatController{ |
|
|
|
|
ServerUrl: conf.Options.HeartbeatUrl, |
|
|
|
@ -146,9 +145,11 @@ func (cmd *CmdSync) Main() { |
|
|
|
|
|
|
|
|
|
reader := bufio.NewReaderSize(input, utils.ReaderBufferSize) |
|
|
|
|
|
|
|
|
|
// sync rdb
|
|
|
|
|
base.Status = "full" |
|
|
|
|
cmd.SyncRDBFile(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, nsize) |
|
|
|
|
|
|
|
|
|
// sync increment
|
|
|
|
|
base.Status = "incr" |
|
|
|
|
close(cmd.wait_full) |
|
|
|
|
cmd.SyncCommand(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) |
|
|
|
@ -172,14 +173,20 @@ func (cmd *CmdSync) SendSyncCmd(master, auth_type, passwd string) (net.Conn, int |
|
|
|
|
|
|
|
|
|
func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, int64) { |
|
|
|
|
c := utils.OpenNetConn(master, auth_type, passwd) |
|
|
|
|
log.Infof("psync connect '%v' with auth type[%v] OK!", master, auth_type) |
|
|
|
|
|
|
|
|
|
utils.SendPSyncListeningPort(c, conf.Options.HttpProfile) |
|
|
|
|
log.Infof("psync send listening port[%v] OK!", conf.Options.HttpProfile) |
|
|
|
|
|
|
|
|
|
br := bufio.NewReaderSize(c, utils.ReaderBufferSize) |
|
|
|
|
bw := bufio.NewWriterSize(c, utils.WriterBufferSize) |
|
|
|
|
|
|
|
|
|
log.Infof("try to send 'psync' command") |
|
|
|
|
runid, offset, wait := utils.SendPSyncFullsync(br, bw) |
|
|
|
|
cmd.targetOffset.Set(offset) |
|
|
|
|
log.Infof("psync runid = %s offset = %d, fullsync", runid, offset) |
|
|
|
|
|
|
|
|
|
// get rdb file size
|
|
|
|
|
var nsize int64 |
|
|
|
|
for nsize == 0 { |
|
|
|
|
select { |
|
|
|
|