diff --git a/ChangeLog b/ChangeLog index f6648d0..c2d4540 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,6 @@ +2019-04-13 Alibaba Cloud. + * version: 1.2.3 + * IMPROVE: polish log print to print more error info. 2019-04-03 Alibaba Cloud. * version: 1.2.2 * BUGFIX: support 5.0 rdb RDB_OPCODE_MODULE_AUX, RDB_OPCODE_IDLE and diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..403f955 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +FROM busybox + +COPY ./bin/redis-shake /usr/local/app/redis-shake +COPY ./conf/redis-shake.conf /usr/local/app/redis-shake.conf +ENV TYPE sync +CMD /usr/local/app/redis-shake -type=${TYPE} -conf=/usr/local/app/redis-shake.conf diff --git a/bin/redis-shake.darwin64 b/bin/redis-shake.darwin64 index bc0b97e..e3736e8 100755 Binary files a/bin/redis-shake.darwin64 and b/bin/redis-shake.darwin64 differ diff --git a/bin/redis-shake.linux64 b/bin/redis-shake.linux64 index 0757bd4..249750e 100755 Binary files a/bin/redis-shake.linux64 and b/bin/redis-shake.linux64 differ diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 5d51c18..66f2b35 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -4,8 +4,9 @@ id = redis-shake # log file,日志文件,不配置将打印到stdout (e.g. /var/log/redis-shake.log ) log_file = -# pid path,进程文件存储地址,不配置将输出到项目目录下 (e.g. /var/run/ ) -pid_path = +# pid path,进程文件存储地址(e.g. /var/run/),不配置将默认输出到执行下面, +# 注意这个是目录,真正的pid是`{pid_path}/{id}.pid` +pid_path = # pprof port system_profile = 9310 @@ -15,8 +16,8 @@ http_profile = 9320 # runtime.GOMAXPROCS, 0 means use cpu core number: runtime.NumCPU() ncpu = 0 -# parallel routines number used in RDB file syncing. -parallel = 4 +# parallel routines number used in RDB file syncing. default is 64. +parallel = 32 # input RDB file. read from stdin, default is stdin ('/dev/stdin'). # used in `decode` and `restore`. diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 6bfd69f..b0907cd 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -1,19 +1,49 @@ package utils -import( - logRotate "gopkg.in/natefinch/lumberjack.v2" +import ( + "net" + "fmt" + "strings" + "pkg/libs/bytesize" + + logRotate "gopkg.in/natefinch/lumberjack.v2" ) -const( +const ( GolangSecurityTime = "2006-01-02T15:04:05Z" // GolangSecurityTime = "2006-01-02 15:04:05" ReaderBufferSize = bytesize.MB * 32 WriterBufferSize = bytesize.MB * 8 ) -var( +var ( Version = "$" LogRotater *logRotate.Logger StartTime string -) \ No newline at end of file +) + +// read until hit the end of RESP: "\r\n" +func ReadRESPEnd(c net.Conn) (string, error) { + var ret string + for { + b := make([]byte, 1) + if _, err := c.Read(b); err != nil { + return "", fmt.Errorf("read error[%v], current return[%s]", err, ret) + } + + ret += string(b) + if strings.HasSuffix(ret, "\r\n") { + break + } + } + return ret, nil +} + +func RemoveRESPEnd(input string) string { + length := len(input) + if length >= 2 { + return input[: length - 2] + } + return input +} \ No newline at end of file diff --git a/src/redis-shake/common/mix.go b/src/redis-shake/common/mix.go index 1ad3de8..1f5c0e0 100644 --- a/src/redis-shake/common/mix.go +++ b/src/redis-shake/common/mix.go @@ -24,11 +24,22 @@ func WritePid(id string) (err error) { func WritePidById(id string, path string) error { var dir string + var err error if path == "" { - dir, _ = os.Getwd() + if dir, err = os.Getwd(); err != nil { + return err + } } else { dir = path + if _, err := os.Stat(dir); os.IsNotExist(err) { + os.Mkdir(dir, os.ModePerm) + } } + + if dir, err = filepath.Abs(dir); err != nil { + return err + } + pidfile := filepath.Join(dir, id) + ".pid" if err := WritePid(pidfile); err != nil { return err diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index e4b30df..3f38631 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -21,8 +21,8 @@ import ( "pkg/libs/stats" "pkg/rdb" "pkg/redis" - redigo "github.com/garyburd/redigo/redis" "redis-shake/configure" + redigo "github.com/garyburd/redigo/redis" ) func OpenRedisConn(target, auth_type, passwd string) redigo.Conn { @@ -42,6 +42,7 @@ func OpenNetConn(target, auth_type, passwd string) net.Conn { log.PanicErrorf(err, "cannot connect to '%s'", target) } + log.Infof("try to auth address[%v] with type[%v]", target, auth_type) AuthPassword(c, auth_type, passwd) return c } @@ -84,22 +85,23 @@ func OpenReadWriteFile(name string) *os.File { } func SendPSyncListeningPort(c net.Conn, port int) { - _, err := c.Write(redis.MustEncodeToBytes(redis.NewCommand("replconf", "listening-port", port))) if err != nil { log.PanicError(errors.Trace(err), "write replconf listening-port failed") } - var b = make([]byte, 5) - if _, err := io.ReadFull(c, b); err != nil { + + ret, err := ReadRESPEnd(c) + if err != nil { log.PanicError(errors.Trace(err), "read auth response failed") } - if strings.ToUpper(string(b)) != "+OK\r\n" { - log.Panic("repl listening-port failed: ", string(b)) + if strings.ToUpper(ret) != "+OK\r\n" { + log.Panicf("repl listening-port failed[%v]", RemoveRESPEnd(ret)) } } func AuthPassword(c net.Conn, auth_type, passwd string) { if passwd == "" { + log.Infof("input password is empty, skip auth address[%v] with type[%v].", c.RemoteAddr(), auth_type) return } @@ -107,12 +109,13 @@ func AuthPassword(c net.Conn, auth_type, passwd string) { if err != nil { log.PanicError(errors.Trace(err), "write auth command failed") } - var b = make([]byte, 5) - if _, err := io.ReadFull(c, b); err != nil { + + ret, err := ReadRESPEnd(c) + if err != nil { log.PanicError(errors.Trace(err), "read auth response failed") } - if strings.ToUpper(string(b)) != "+OK\r\n" { - log.Panicf("auth failed[%s]", string(b)) + if strings.ToUpper(ret) != "+OK\r\n" { + log.Panicf("auth failed[%v]", RemoveRESPEnd(ret)) } } @@ -126,6 +129,7 @@ func OpenSyncConn(target string, auth_type, passwd string) (net.Conn, <-chan int func waitRdbDump(r io.Reader) <-chan int64 { size := make(chan int64) + // read rdb size go func() { var rsp string for { diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index f725153..d22dc11 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -178,7 +178,7 @@ func sanitizeOptions(tp string) error { } if conf.Options.Parallel == 0 { // not set - conf.Options.Parallel = 1 + conf.Options.Parallel = 64 // default is 64 } else if conf.Options.Parallel > 1024 { return fmt.Errorf("parallel[%v] should in (0, 1024]", conf.Options.Parallel) } else { diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 6521d90..26a597e 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -100,10 +100,8 @@ func (cmd *CmdSync) Main() { log.Panic("invalid argument: target") } - log.Infof("sync from '%s' to '%s' http '%d'\n", from, target, conf.Options.HttpProfile) - + log.Infof("sync from '%s' to '%s' with http-port[%d]\n", from, target, conf.Options.HttpProfile) cmd.wait_full = make(chan struct{}) - log.Infof("sync from '%s' to '%s'\n", from, target) var sockfile *os.File if len(conf.Options.SockFileName) != 0 { @@ -121,7 +119,7 @@ func (cmd *CmdSync) Main() { } defer input.Close() - log.Infof("rdb file = %d\n", nsize) + log.Infof("rdb file size = %d\n", nsize) if sockfile != nil { r, w := pipe.NewFilePipe(int(conf.Options.SockFileSize), sockfile) @@ -136,6 +134,7 @@ func (cmd *CmdSync) Main() { input = r } + // start heartbeat if len(conf.Options.HeartbeatUrl) > 0 { heartbeatCtl := heartbeat.HeartbeatController{ ServerUrl: conf.Options.HeartbeatUrl, @@ -146,9 +145,11 @@ func (cmd *CmdSync) Main() { reader := bufio.NewReaderSize(input, utils.ReaderBufferSize) + // sync rdb base.Status = "full" cmd.SyncRDBFile(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, nsize) + // sync increment base.Status = "incr" close(cmd.wait_full) cmd.SyncCommand(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw) @@ -172,14 +173,20 @@ func (cmd *CmdSync) SendSyncCmd(master, auth_type, passwd string) (net.Conn, int func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, int64) { c := utils.OpenNetConn(master, auth_type, passwd) + log.Infof("psync connect '%v' with auth type[%v] OK!", master, auth_type) + utils.SendPSyncListeningPort(c, conf.Options.HttpProfile) + log.Infof("psync send listening port[%v] OK!", conf.Options.HttpProfile) + br := bufio.NewReaderSize(c, utils.ReaderBufferSize) bw := bufio.NewWriterSize(c, utils.WriterBufferSize) + log.Infof("try to send 'psync' command") runid, offset, wait := utils.SendPSyncFullsync(br, bw) cmd.targetOffset.Set(offset) log.Infof("psync runid = %s offset = %d, fullsync", runid, offset) + // get rdb file size var nsize int64 for nsize == 0 { select {