|
|
|
@ -22,23 +22,19 @@ type psyncReader struct { |
|
|
|
|
ch chan *entry.Entry |
|
|
|
|
DbId int |
|
|
|
|
|
|
|
|
|
rd *bufio.Reader |
|
|
|
|
receivedOffset int64 |
|
|
|
|
rd *bufio.Reader |
|
|
|
|
receivedOffset int64 |
|
|
|
|
elastiCachePSync string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewPSyncReader(address string, username string, password string, isTls bool) Reader { |
|
|
|
|
func NewPSyncReader(address string, username string, password string, isTls bool, ElastiCachePSync string) Reader { |
|
|
|
|
r := new(psyncReader) |
|
|
|
|
r.init(address, username, password, isTls) |
|
|
|
|
return r |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *psyncReader) init(address string, username string, password string, isTls bool) { |
|
|
|
|
r.address = address |
|
|
|
|
standalone := client.NewRedisClient(address, username, password, isTls) |
|
|
|
|
|
|
|
|
|
r.client = standalone |
|
|
|
|
r.elastiCachePSync = ElastiCachePSync |
|
|
|
|
r.client = client.NewRedisClient(address, username, password, isTls) |
|
|
|
|
r.rd = r.client.BufioReader() |
|
|
|
|
log.Infof("psyncReader connected to redis successful. address=[%s]", address) |
|
|
|
|
return r |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *psyncReader) StartRead() chan *entry.Entry { |
|
|
|
@ -86,6 +82,9 @@ func (r *psyncReader) saveRDB() { |
|
|
|
|
|
|
|
|
|
// send psync
|
|
|
|
|
argv = []string{"PSYNC", "?", "-1"} |
|
|
|
|
if r.elastiCachePSync != "" { |
|
|
|
|
argv = []string{r.elastiCachePSync, "?", "-1"} |
|
|
|
|
} |
|
|
|
|
r.client.Send(argv...) |
|
|
|
|
log.Infof("send %v", argv) |
|
|
|
|
// format: \n\n\n$<reply>\r\n
|
|
|
|
@ -98,8 +97,16 @@ func (r *psyncReader) saveRDB() { |
|
|
|
|
if b == '\n' { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if b == '-' { |
|
|
|
|
reply, err := r.rd.ReadString('\n') |
|
|
|
|
if err != nil { |
|
|
|
|
log.PanicError(err) |
|
|
|
|
} |
|
|
|
|
reply = strings.TrimSpace(reply) |
|
|
|
|
log.Panicf("psync error. address=[%s], reply=[%s]", r.address, reply) |
|
|
|
|
} |
|
|
|
|
if b != '+' { |
|
|
|
|
log.Panicf("invalid rdb format. address=[%s], b=[%s]", r.address, string(b)) |
|
|
|
|
log.Panicf("invalid psync reply. address=[%s], b=[%s]", r.address, string(b)) |
|
|
|
|
} |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|