|
|
|
@ -25,6 +25,8 @@ type SyncReaderOptions struct { |
|
|
|
|
Username string `mapstructure:"username" default:""` |
|
|
|
|
Password string `mapstructure:"password" default:""` |
|
|
|
|
Tls bool `mapstructure:"tls" default:"false"` |
|
|
|
|
SyncRdb bool `mapstructure:"sync_rdb" default:"true"` |
|
|
|
|
SyncAof bool `mapstructure:"sync_aof" default:"true"` |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type State string |
|
|
|
@ -38,6 +40,7 @@ const ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type syncStandaloneReader struct { |
|
|
|
|
opts *SyncReaderOptions |
|
|
|
|
client *client.Redis |
|
|
|
|
|
|
|
|
|
ch chan *entry.Entry |
|
|
|
@ -72,6 +75,7 @@ type syncStandaloneReader struct { |
|
|
|
|
|
|
|
|
|
func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader { |
|
|
|
|
r := new(syncStandaloneReader) |
|
|
|
|
r.opts = opts |
|
|
|
|
r.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) |
|
|
|
|
r.rd = r.client.BufioReader() |
|
|
|
|
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) |
|
|
|
@ -91,9 +95,14 @@ func (r *syncStandaloneReader) StartRead() chan *entry.Entry { |
|
|
|
|
r.receiveRDB() |
|
|
|
|
startOffset := r.stat.AofReceivedOffset |
|
|
|
|
go r.receiveAOF(r.rd) |
|
|
|
|
r.sendRDB() |
|
|
|
|
r.stat.Status = kSyncAof |
|
|
|
|
r.sendAOF(startOffset) |
|
|
|
|
if r.opts.SyncRdb { |
|
|
|
|
r.sendRDB() |
|
|
|
|
} |
|
|
|
|
if r.opts.SyncAof { |
|
|
|
|
r.stat.Status = kSyncAof |
|
|
|
|
r.sendAOF(startOffset) |
|
|
|
|
} |
|
|
|
|
close(r.ch) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
return r.ch |
|
|
|
@ -103,12 +112,9 @@ func (r *syncStandaloneReader) sendReplconfListenPort() { |
|
|
|
|
// use status_port as redis-shake port
|
|
|
|
|
argv := []string{"replconf", "listening-port", strconv.Itoa(config.Opt.Advanced.StatusPort)} |
|
|
|
|
r.client.Send(argv...) |
|
|
|
|
reply, err := r.client.Receive() |
|
|
|
|
_, err := r.client.Receive() |
|
|
|
|
if err != nil { |
|
|
|
|
log.Warnf("[%s] send replconf command to redis server failed. reply=[%s], error=[%v]", r.stat.Name, reply, err) |
|
|
|
|
} |
|
|
|
|
if reply != "OK" { |
|
|
|
|
log.Warnf("[%s] send replconf command to redis server failed. reply=[%s]", r.stat.Name, reply) |
|
|
|
|
log.Warnf("[%s] send replconf command to redis server failed. error=[%v]", r.stat.Name, err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|