Merge pull request #32 from alibaba/feature-1.2

Feature 1.2
v4
Vinllen Chen 6 years ago committed by GitHub
commit 92e98571f5
  1. 3
      ChangeLog
  2. 4
      conf/redis-shake.conf
  3. 32
      src/redis-shake/common/common.go
  4. 23
      src/redis-shake/common/utils.go
  5. 2
      src/redis-shake/main/main.go
  6. 6
      src/redis-shake/sync.go

@ -1,3 +1,6 @@
2019-04-13 Alibaba Cloud.
* version: 1.2.3
* IMPROVE: polish log print to print more error info.
2019-04-03 Alibaba Cloud. 2019-04-03 Alibaba Cloud.
* version: 1.2.2 * version: 1.2.2
* BUGFIX: support 5.0 rdb RDB_OPCODE_MODULE_AUX, RDB_OPCODE_IDLE and * BUGFIX: support 5.0 rdb RDB_OPCODE_MODULE_AUX, RDB_OPCODE_IDLE and

@ -16,8 +16,8 @@ http_profile = 9320
# runtime.GOMAXPROCS, 0 means use cpu core number: runtime.NumCPU() # runtime.GOMAXPROCS, 0 means use cpu core number: runtime.NumCPU()
ncpu = 0 ncpu = 0
# parallel routines number used in RDB file syncing. # parallel routines number used in RDB file syncing. default is 64.
parallel = 4 parallel = 32
# input RDB file. read from stdin, default is stdin ('/dev/stdin'). # input RDB file. read from stdin, default is stdin ('/dev/stdin').
# used in `decode` and `restore`. # used in `decode` and `restore`.

@ -1,8 +1,13 @@
package utils package utils
import ( import (
logRotate "gopkg.in/natefinch/lumberjack.v2" "net"
"fmt"
"strings"
"pkg/libs/bytesize" "pkg/libs/bytesize"
logRotate "gopkg.in/natefinch/lumberjack.v2"
) )
const ( const (
@ -17,3 +22,28 @@ var(
LogRotater *logRotate.Logger LogRotater *logRotate.Logger
StartTime string StartTime string
) )
// read until hit the end of RESP: "\r\n"
func ReadRESPEnd(c net.Conn) (string, error) {
var ret string
for {
b := make([]byte, 1)
if _, err := c.Read(b); err != nil {
return "", fmt.Errorf("read error[%v], current return[%s]", err, ret)
}
ret += string(b)
if strings.HasSuffix(ret, "\r\n") {
break
}
}
return ret, nil
}
func RemoveRESPEnd(input string) string {
length := len(input)
if length >= 2 {
return input[: length - 2]
}
return input
}

@ -21,8 +21,8 @@ import (
"pkg/libs/stats" "pkg/libs/stats"
"pkg/rdb" "pkg/rdb"
"pkg/redis" "pkg/redis"
redigo "github.com/garyburd/redigo/redis"
"redis-shake/configure" "redis-shake/configure"
redigo "github.com/garyburd/redigo/redis"
) )
func OpenRedisConn(target, auth_type, passwd string) redigo.Conn { func OpenRedisConn(target, auth_type, passwd string) redigo.Conn {
@ -42,6 +42,7 @@ func OpenNetConn(target, auth_type, passwd string) net.Conn {
log.PanicErrorf(err, "cannot connect to '%s'", target) log.PanicErrorf(err, "cannot connect to '%s'", target)
} }
log.Infof("try to auth address[%v] with type[%v]", target, auth_type)
AuthPassword(c, auth_type, passwd) AuthPassword(c, auth_type, passwd)
return c return c
} }
@ -88,17 +89,19 @@ func SendPSyncListeningPort(c net.Conn, port int) {
if err != nil { if err != nil {
log.PanicError(errors.Trace(err), "write replconf listening-port failed") log.PanicError(errors.Trace(err), "write replconf listening-port failed")
} }
var b = make([]byte, 5)
if _, err := io.ReadFull(c, b); err != nil { ret, err := ReadRESPEnd(c)
if err != nil {
log.PanicError(errors.Trace(err), "read auth response failed") log.PanicError(errors.Trace(err), "read auth response failed")
} }
if strings.ToUpper(string(b)) != "+OK\r\n" { if strings.ToUpper(ret) != "+OK\r\n" {
log.Panic("repl listening-port failed: ", string(b)) log.Panicf("repl listening-port failed[%v]", RemoveRESPEnd(ret))
} }
} }
func AuthPassword(c net.Conn, auth_type, passwd string) { func AuthPassword(c net.Conn, auth_type, passwd string) {
if passwd == "" { if passwd == "" {
log.Infof("input password is empty, skip auth address[%v] with type[%v].", c.RemoteAddr(), auth_type)
return return
} }
@ -106,12 +109,13 @@ func AuthPassword(c net.Conn, auth_type, passwd string) {
if err != nil { if err != nil {
log.PanicError(errors.Trace(err), "write auth command failed") log.PanicError(errors.Trace(err), "write auth command failed")
} }
var b = make([]byte, 5)
if _, err := io.ReadFull(c, b); err != nil { ret, err := ReadRESPEnd(c)
if err != nil {
log.PanicError(errors.Trace(err), "read auth response failed") log.PanicError(errors.Trace(err), "read auth response failed")
} }
if strings.ToUpper(string(b)) != "+OK\r\n" { if strings.ToUpper(ret) != "+OK\r\n" {
log.Panicf("auth failed[%s]", string(b)) log.Panicf("auth failed[%v]", RemoveRESPEnd(ret))
} }
} }
@ -125,6 +129,7 @@ func OpenSyncConn(target string, auth_type, passwd string) (net.Conn, <-chan int
func waitRdbDump(r io.Reader) <-chan int64 { func waitRdbDump(r io.Reader) <-chan int64 {
size := make(chan int64) size := make(chan int64)
// read rdb size
go func() { go func() {
var rsp string var rsp string
for { for {

@ -178,7 +178,7 @@ func sanitizeOptions(tp string) error {
} }
if conf.Options.Parallel == 0 { // not set if conf.Options.Parallel == 0 { // not set
conf.Options.Parallel = 1 conf.Options.Parallel = 64 // default is 64
} else if conf.Options.Parallel > 1024 { } else if conf.Options.Parallel > 1024 {
return fmt.Errorf("parallel[%v] should in (0, 1024]", conf.Options.Parallel) return fmt.Errorf("parallel[%v] should in (0, 1024]", conf.Options.Parallel)
} else { } else {

@ -119,7 +119,7 @@ func (cmd *CmdSync) Main() {
} }
defer input.Close() defer input.Close()
log.Infof("rdb file = %d\n", nsize) log.Infof("rdb file size = %d\n", nsize)
if sockfile != nil { if sockfile != nil {
r, w := pipe.NewFilePipe(int(conf.Options.SockFileSize), sockfile) r, w := pipe.NewFilePipe(int(conf.Options.SockFileSize), sockfile)
@ -134,6 +134,7 @@ func (cmd *CmdSync) Main() {
input = r input = r
} }
// start heartbeat
if len(conf.Options.HeartbeatUrl) > 0 { if len(conf.Options.HeartbeatUrl) > 0 {
heartbeatCtl := heartbeat.HeartbeatController{ heartbeatCtl := heartbeat.HeartbeatController{
ServerUrl: conf.Options.HeartbeatUrl, ServerUrl: conf.Options.HeartbeatUrl,
@ -144,9 +145,11 @@ func (cmd *CmdSync) Main() {
reader := bufio.NewReaderSize(input, utils.ReaderBufferSize) reader := bufio.NewReaderSize(input, utils.ReaderBufferSize)
// sync rdb
base.Status = "full" base.Status = "full"
cmd.SyncRDBFile(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, nsize) cmd.SyncRDBFile(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, nsize)
// sync increment
base.Status = "incr" base.Status = "incr"
close(cmd.wait_full) close(cmd.wait_full)
cmd.SyncCommand(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) cmd.SyncCommand(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw)
@ -183,6 +186,7 @@ func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader,
cmd.targetOffset.Set(offset) cmd.targetOffset.Set(offset)
log.Infof("psync runid = %s offset = %d, fullsync", runid, offset) log.Infof("psync runid = %s offset = %d, fullsync", runid, offset)
// get rdb file size
var nsize int64 var nsize int64
for nsize == 0 { for nsize == 0 {
select { select {

Loading…
Cancel
Save