Merge pull request #31 from alibaba/improve-1.2.3

Improve 1.2.3
v4
Vinllen Chen 6 years ago committed by GitHub
commit ce4722cd94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      ChangeLog
  2. 6
      Dockerfile
  3. BIN
      bin/redis-shake.darwin64
  4. BIN
      bin/redis-shake.linux64
  5. 7
      conf/redis-shake.conf
  6. 38
      src/redis-shake/common/common.go
  7. 13
      src/redis-shake/common/mix.go
  8. 24
      src/redis-shake/common/utils.go
  9. 2
      src/redis-shake/main/main.go
  10. 15
      src/redis-shake/sync.go

@ -1,3 +1,6 @@
2019-04-13 Alibaba Cloud.
* version: 1.2.3
* IMPROVE: polish log print to print more error info.
2019-04-03 Alibaba Cloud.
* version: 1.2.2
* BUGFIX: support 5.0 rdb RDB_OPCODE_MODULE_AUX, RDB_OPCODE_IDLE and

@ -0,0 +1,6 @@
FROM busybox
COPY ./bin/redis-shake /usr/local/app/redis-shake
COPY ./conf/redis-shake.conf /usr/local/app/redis-shake.conf
ENV TYPE sync
CMD /usr/local/app/redis-shake -type=${TYPE} -conf=/usr/local/app/redis-shake.conf

Binary file not shown.

Binary file not shown.

@ -4,7 +4,8 @@
id = redis-shake
# log file,日志文件,不配置将打印到stdout (e.g. /var/log/redis-shake.log )
log_file =
# pid path,进程文件存储地址,不配置将输出到项目目录下 (e.g. /var/run/ )
# pid path,进程文件存储地址(e.g. /var/run/),不配置将默认输出到执行下面,
# 注意这个是目录,真正的pid是`{pid_path}/{id}.pid`
pid_path =
# pprof port
@ -15,8 +16,8 @@ http_profile = 9320
# runtime.GOMAXPROCS, 0 means use cpu core number: runtime.NumCPU()
ncpu = 0
# parallel routines number used in RDB file syncing.
parallel = 4
# parallel routines number used in RDB file syncing. default is 64.
parallel = 32
# input RDB file. read from stdin, default is stdin ('/dev/stdin').
# used in `decode` and `restore`.

@ -1,19 +1,49 @@
package utils
import(
logRotate "gopkg.in/natefinch/lumberjack.v2"
import (
"net"
"fmt"
"strings"
"pkg/libs/bytesize"
logRotate "gopkg.in/natefinch/lumberjack.v2"
)
const(
const (
GolangSecurityTime = "2006-01-02T15:04:05Z"
// GolangSecurityTime = "2006-01-02 15:04:05"
ReaderBufferSize = bytesize.MB * 32
WriterBufferSize = bytesize.MB * 8
)
var(
var (
Version = "$"
LogRotater *logRotate.Logger
StartTime string
)
// read until hit the end of RESP: "\r\n"
func ReadRESPEnd(c net.Conn) (string, error) {
var ret string
for {
b := make([]byte, 1)
if _, err := c.Read(b); err != nil {
return "", fmt.Errorf("read error[%v], current return[%s]", err, ret)
}
ret += string(b)
if strings.HasSuffix(ret, "\r\n") {
break
}
}
return ret, nil
}
func RemoveRESPEnd(input string) string {
length := len(input)
if length >= 2 {
return input[: length - 2]
}
return input
}

@ -24,11 +24,22 @@ func WritePid(id string) (err error) {
func WritePidById(id string, path string) error {
var dir string
var err error
if path == "" {
dir, _ = os.Getwd()
if dir, err = os.Getwd(); err != nil {
return err
}
} else {
dir = path
if _, err := os.Stat(dir); os.IsNotExist(err) {
os.Mkdir(dir, os.ModePerm)
}
}
if dir, err = filepath.Abs(dir); err != nil {
return err
}
pidfile := filepath.Join(dir, id) + ".pid"
if err := WritePid(pidfile); err != nil {
return err

@ -21,8 +21,8 @@ import (
"pkg/libs/stats"
"pkg/rdb"
"pkg/redis"
redigo "github.com/garyburd/redigo/redis"
"redis-shake/configure"
redigo "github.com/garyburd/redigo/redis"
)
func OpenRedisConn(target, auth_type, passwd string) redigo.Conn {
@ -42,6 +42,7 @@ func OpenNetConn(target, auth_type, passwd string) net.Conn {
log.PanicErrorf(err, "cannot connect to '%s'", target)
}
log.Infof("try to auth address[%v] with type[%v]", target, auth_type)
AuthPassword(c, auth_type, passwd)
return c
}
@ -84,22 +85,23 @@ func OpenReadWriteFile(name string) *os.File {
}
func SendPSyncListeningPort(c net.Conn, port int) {
_, err := c.Write(redis.MustEncodeToBytes(redis.NewCommand("replconf", "listening-port", port)))
if err != nil {
log.PanicError(errors.Trace(err), "write replconf listening-port failed")
}
var b = make([]byte, 5)
if _, err := io.ReadFull(c, b); err != nil {
ret, err := ReadRESPEnd(c)
if err != nil {
log.PanicError(errors.Trace(err), "read auth response failed")
}
if strings.ToUpper(string(b)) != "+OK\r\n" {
log.Panic("repl listening-port failed: ", string(b))
if strings.ToUpper(ret) != "+OK\r\n" {
log.Panicf("repl listening-port failed[%v]", RemoveRESPEnd(ret))
}
}
func AuthPassword(c net.Conn, auth_type, passwd string) {
if passwd == "" {
log.Infof("input password is empty, skip auth address[%v] with type[%v].", c.RemoteAddr(), auth_type)
return
}
@ -107,12 +109,13 @@ func AuthPassword(c net.Conn, auth_type, passwd string) {
if err != nil {
log.PanicError(errors.Trace(err), "write auth command failed")
}
var b = make([]byte, 5)
if _, err := io.ReadFull(c, b); err != nil {
ret, err := ReadRESPEnd(c)
if err != nil {
log.PanicError(errors.Trace(err), "read auth response failed")
}
if strings.ToUpper(string(b)) != "+OK\r\n" {
log.Panicf("auth failed[%s]", string(b))
if strings.ToUpper(ret) != "+OK\r\n" {
log.Panicf("auth failed[%v]", RemoveRESPEnd(ret))
}
}
@ -126,6 +129,7 @@ func OpenSyncConn(target string, auth_type, passwd string) (net.Conn, <-chan int
func waitRdbDump(r io.Reader) <-chan int64 {
size := make(chan int64)
// read rdb size
go func() {
var rsp string
for {

@ -178,7 +178,7 @@ func sanitizeOptions(tp string) error {
}
if conf.Options.Parallel == 0 { // not set
conf.Options.Parallel = 1
conf.Options.Parallel = 64 // default is 64
} else if conf.Options.Parallel > 1024 {
return fmt.Errorf("parallel[%v] should in (0, 1024]", conf.Options.Parallel)
} else {

@ -100,10 +100,8 @@ func (cmd *CmdSync) Main() {
log.Panic("invalid argument: target")
}
log.Infof("sync from '%s' to '%s' http '%d'\n", from, target, conf.Options.HttpProfile)
log.Infof("sync from '%s' to '%s' with http-port[%d]\n", from, target, conf.Options.HttpProfile)
cmd.wait_full = make(chan struct{})
log.Infof("sync from '%s' to '%s'\n", from, target)
var sockfile *os.File
if len(conf.Options.SockFileName) != 0 {
@ -121,7 +119,7 @@ func (cmd *CmdSync) Main() {
}
defer input.Close()
log.Infof("rdb file = %d\n", nsize)
log.Infof("rdb file size = %d\n", nsize)
if sockfile != nil {
r, w := pipe.NewFilePipe(int(conf.Options.SockFileSize), sockfile)
@ -136,6 +134,7 @@ func (cmd *CmdSync) Main() {
input = r
}
// start heartbeat
if len(conf.Options.HeartbeatUrl) > 0 {
heartbeatCtl := heartbeat.HeartbeatController{
ServerUrl: conf.Options.HeartbeatUrl,
@ -146,9 +145,11 @@ func (cmd *CmdSync) Main() {
reader := bufio.NewReaderSize(input, utils.ReaderBufferSize)
// sync rdb
base.Status = "full"
cmd.SyncRDBFile(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw, nsize)
// sync increment
base.Status = "incr"
close(cmd.wait_full)
cmd.SyncCommand(reader, target, conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw)
@ -172,14 +173,20 @@ func (cmd *CmdSync) SendSyncCmd(master, auth_type, passwd string) (net.Conn, int
func (cmd *CmdSync) SendPSyncCmd(master, auth_type, passwd string) (pipe.Reader, int64) {
c := utils.OpenNetConn(master, auth_type, passwd)
log.Infof("psync connect '%v' with auth type[%v] OK!", master, auth_type)
utils.SendPSyncListeningPort(c, conf.Options.HttpProfile)
log.Infof("psync send listening port[%v] OK!", conf.Options.HttpProfile)
br := bufio.NewReaderSize(c, utils.ReaderBufferSize)
bw := bufio.NewWriterSize(c, utils.WriterBufferSize)
log.Infof("try to send 'psync' command")
runid, offset, wait := utils.SendPSyncFullsync(br, bw)
cmd.targetOffset.Set(offset)
log.Infof("psync runid = %s offset = %d, fullsync", runid, offset)
// get rdb file size
var nsize int64
for nsize == 0 {
select {

Loading…
Cancel
Save