unstable: middle stage

v4
vinllen 6 years ago
parent 2bb5e2d71b
commit 020db40b0c
  1. 68
      src/redis-shake/common/cluster.go
  2. 36
      src/redis-shake/common/command.go
  3. 53
      src/redis-shake/common/configure.go
  4. 31
      src/redis-shake/common/utils.go
  5. 57
      src/redis-shake/common/utils_test.go
  6. 13
      src/redis-shake/configure/configure.go
  7. 2
      src/redis-shake/dump.go
  8. 30
      src/redis-shake/main/main.go
  9. 27
      src/redis-shake/restore.go
  10. 7
      src/redis-shake/rump.go
  11. 22
      src/redis-shake/sync.go
  12. 18
      src/vendor/vendor.json

@ -0,0 +1,68 @@
package utils
import (
redigoCluster "github.com/chasex/redis-go-cluster"
redigo "github.com/garyburd/redigo/redis"
)
/* implement redigo.Conn(https://github.com/garyburd/redigo)
* Embed redis-go-cluster(https://github.com/chasex/redis-go-cluster)
* The reason I create this struct is that redis-go-cluster isn't fulfill redigo.Conn
* interface. So I implement "Err", "Send", "Flush" and "Receive" interfaces.
*/
type ClusterConn struct {
client *redigoCluster.Cluster
recvChan chan reply
batcher *redigoCluster.Batch
}
type reply struct {
answer interface{}
err error
}
func NewClusterConn(clusterClient *redigoCluster.Cluster, recvChanSize int) redigo.Conn {
return &ClusterConn{
client: clusterClient,
recvChan: make(chan reply, recvChanSize),
}
}
func (cc *ClusterConn) Close() error {
cc.client.Close()
return nil
}
func (cc *ClusterConn) Err() error {
return nil
}
func (cc *ClusterConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
return cc.client.Do(commandName, args...)
}
// just add into batcher
func (cc *ClusterConn) Send(commandName string, args ...interface{}) error {
if cc.batcher == nil {
cc.batcher = cc.client.NewBatch()
}
return cc.batcher.Put(commandName, args...)
}
// send batcher and put the return into recvChan
func (cc *ClusterConn) Flush() error {
ret, err := cc.client.RunBatch(cc.batcher)
cc.batcher = nil // reset batcher
cc.recvChan <- reply{
answer: ret,
err: err,
}
return err
}
// read recvChan
func (cc *ClusterConn) Receive() (reply interface{}, err error) {
ret := <- cc.recvChan
return ret.answer, ret.err
}

@ -4,7 +4,10 @@ import (
"bytes"
"fmt"
"strconv"
"redis-shake/configure"
redigo "github.com/garyburd/redigo/redis"
)
type ClusterNodeInfo struct {
@ -59,9 +62,13 @@ func ParseKeyspace(content []byte) (map[int32]int64, error) {
* 486e081f8d47968df6a7e43ef9d3ba93b77d03b2 10.1.1.1:21334@31334 slave 75fffcd521738606a919607a7ddd52bcd6d65aa8 0 1557996785258 4 connected
*/
func ParseClusterNode(content []byte) []*ClusterNodeInfo {
lines := bytes.Split(content, []byte("\r\n"))
lines := bytes.Split(content, []byte("\n"))
ret := make([]*ClusterNodeInfo, 0, len(lines))
for _, line := range lines {
if bytes.Compare(line, []byte{}) == 0 {
continue
}
items := bytes.Split(line, []byte(" "))
address := bytes.Split(items[1], []byte{'@'})
@ -92,14 +99,31 @@ func ParseClusterNode(content []byte) []*ClusterNodeInfo {
}
// needMaster: true(master), false(slave)
func ClusterNodeChoose(input []*ClusterNodeInfo, needMaster bool) []*ClusterNodeInfo {
ret := make([]*ClusterNodeInfo, 0, len(input) / 2)
func ClusterNodeChoose(input []*ClusterNodeInfo, role string) []*ClusterNodeInfo {
ret := make([]*ClusterNodeInfo, 0, len(input))
for _, ele := range input {
if ele.Flags == conf.StandAloneRoleMaster && needMaster {
ret = append(ret, ele)
} else if ele.Flags == conf.StandAloneRoleSlave && !needMaster {
if ele.Flags == conf.StandAloneRoleMaster && role == conf.StandAloneRoleMaster ||
ele.Flags == conf.StandAloneRoleSlave && role == conf.StandAloneRoleSlave ||
role == conf.StandAloneRoleAll {
ret = append(ret, ele)
}
}
return ret
}
func GetAllClusterNode(client redigo.Conn, role string) ([]string, error) {
ret, err := client.Do("cluster", "nodes")
if err != nil {
return nil, err
}
nodeList := ParseClusterNode(ret.([]byte))
nodeListChoose := ClusterNodeChoose(nodeList, role)
result := make([]string, 0, len(nodeListChoose))
for _, ele := range nodeListChoose {
result = append(result, ele.Address)
}
return result, nil
}

@ -41,7 +41,7 @@ func ParseAddress(tp string) error {
}
func parseAddress(tp, address, redisType string, isSource bool) error {
addressLen := len(splitCluster(redisType))
addressLen := len(splitCluster(address))
if addressLen == 0 {
return fmt.Errorf("address length[%v] illegal", addressLen)
}
@ -51,13 +51,14 @@ func parseAddress(tp, address, redisType string, isSource bool) error {
fallthrough
case conf.RedisTypeStandalone:
if addressLen != 1 {
return fmt.Errorf("address[%v] length[%v] must == 1 when type is 'standalone'", address, addressLen)
return fmt.Errorf("redis type[%v] address[%v] length[%v] != 1", redisType, address, addressLen)
}
setAddressList(isSource, address)
case conf.RedisTypeSentinel:
arr := strings.Split(address, AddressSplitter)
if len(arr) != 2 {
return fmt.Errorf("address[%v] length[%v] != 2", address, len(arr))
return fmt.Errorf("redis type[%v] address[%v] length[%v] != 2",
conf.RedisTypeStandalone, address, len(arr))
}
var masterName string
@ -94,10 +95,50 @@ func parseAddress(tp, address, redisType string, isSource bool) error {
}
}
case conf.RedisTypeCluster:
if isSource == false {
return fmt.Errorf("target type can't be cluster currently")
if isSource == false && tp == conf.TypeRump {
return fmt.Errorf("target type[%v] can't be cluster when type is 'rump' currently", redisType)
}
if strings.Contains(address, AddressSplitter) {
arr := strings.Split(address, AddressSplitter)
if len(arr) != 2 {
return fmt.Errorf("redis type[%v] address[%v] length[%v] != 2", redisType, address, len(arr))
}
if isSource && arr[0] != conf.StandAloneRoleSlave && arr[0] != conf.StandAloneRoleMaster {
return fmt.Errorf("redis role must be master or slave")
}
if !isSource && arr[0] != "" {
return fmt.Errorf("redis type[%v] leading character must be '@'", redisType)
}
clusterList := strings.Split(arr[1], AddressClusterSplitter)
// get auth type and password
var auth, password string
if isSource {
auth, password = conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw
} else {
auth, password = conf.Options.TargetAuthType, conf.Options.TargetPasswordRaw
}
role := arr[0]
if role == "" {
role = conf.StandAloneRoleAll
}
// create client to fetch
client := OpenRedisConn(clusterList, auth, password, false)
if addressList, err := GetAllClusterNode(client, role); err != nil {
return err
} else {
if isSource {
conf.Options.SourceAddressList = addressList
} else {
conf.Options.TargetAddressList = addressList
}
}
} else {
setAddressList(isSource, address)
}
setAddressList(isSource, address)
case conf.RedisTypeProxy:
if addressLen != 1 {
return fmt.Errorf("address[%v] length[%v] must == 1 when type is 'proxy'", addressLen, addressLen)

@ -26,10 +26,35 @@ import (
"github.com/FZambia/go-sentinel"
redigo "github.com/garyburd/redigo/redis"
redigoCluster "github.com/chasex/redis-go-cluster"
)
func OpenRedisConn(target, auth_type, passwd string) redigo.Conn {
return redigo.NewConn(OpenNetConn(target, auth_type, passwd), 0, 0)
func OpenRedisConn(target []string, auth_type, passwd string, isCluster bool) redigo.Conn {
if isCluster {
cluster, err := redigoCluster.NewCluster(
&redigoCluster.Options{
StartNodes: target,
ConnTimeout: 1 * time.Second,
ReadTimeout: 500 * time.Millisecond,
WriteTimeout: 500 * time.Millisecond,
KeepAlive: 16,
AliveTime: 60 * time.Second,
})
if err != nil {
log.Panicf("create cluster connection error[%v]", err)
return nil
}
_, err = cluster.Do(auth_type, passwd)
if err != nil {
log.Panicf("auth cluster failed[%v]", err)
return nil
}
return NewClusterConn(cluster, 4096)
} else {
return redigo.NewConn(OpenNetConn(target[0], auth_type, passwd), 0, 0)
}
}
func OpenRedisConnWithTimeout(target, auth_type, passwd string, readTimeout, writeTimeout time.Duration) redigo.Conn {
@ -831,7 +856,7 @@ func ParseRedisInfo(content []byte) map[string]string {
}
func GetRedisVersion(target, authType, auth string) (string, error) {
c := OpenRedisConn(target, authType, auth)
c := OpenRedisConn([]string{target}, authType, auth, false)
infoStr, err := redigo.Bytes(c.Do("info", "server"))
if err != nil {
return "", err

@ -0,0 +1,57 @@
package utils
import (
"testing"
"fmt"
"sort"
"github.com/stretchr/testify/assert"
)
func TestGetAllClusterNode(t *testing.T) {
var nr int
{
fmt.Printf("TestGetAllClusterNode case %d.\n", nr)
nr++
client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false)
ret, err := GetAllClusterNode(client, "master")
sort.Strings(ret)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 3, len(ret), "should be equal")
assert.Equal(t, "10.1.1.1:21331", ret[0], "should be equal")
assert.Equal(t, "10.1.1.1:21332", ret[1], "should be equal")
assert.Equal(t, "10.1.1.1:21333", ret[2], "should be equal")
}
{
fmt.Printf("TestGetAllClusterNode case %d.\n", nr)
nr++
client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false)
ret, err := GetAllClusterNode(client, "slave")
sort.Strings(ret)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 3, len(ret), "should be equal")
assert.Equal(t, "10.1.1.1:21334", ret[0], "should be equal")
assert.Equal(t, "10.1.1.1:21335", ret[1], "should be equal")
assert.Equal(t, "10.1.1.1:21336", ret[2], "should be equal")
}
{
fmt.Printf("TestGetAllClusterNode case %d.\n", nr)
nr++
client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false)
ret, err := GetAllClusterNode(client, "all")
sort.Strings(ret)
assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 6, len(ret), "should be equal")
assert.Equal(t, "10.1.1.1:21331", ret[0], "should be equal")
assert.Equal(t, "10.1.1.1:21332", ret[1], "should be equal")
assert.Equal(t, "10.1.1.1:21333", ret[2], "should be equal")
assert.Equal(t, "10.1.1.1:21334", ret[3], "should be equal")
assert.Equal(t, "10.1.1.1:21335", ret[4], "should be equal")
assert.Equal(t, "10.1.1.1:21336", ret[5], "should be equal")
}
}

@ -58,12 +58,12 @@ type Configuration struct {
/*---------------------------------------------------------*/
// generated variables
SourceAddressList []string // source address list
TargetAddressList []string // target address list
HeartbeatIp string // heartbeat ip
ShiftTime time.Duration // shift
TargetRedisVersion string // to_redis_version
TargetReplace bool // to_replace
SourceAddressList []string // source address list
TargetAddressList []string // target address list
HeartbeatIp string // heartbeat ip
ShiftTime time.Duration // shift
TargetRedisVersion string // to_redis_version
TargetReplace bool // to_replace
}
var Options Configuration
@ -76,6 +76,7 @@ const (
StandAloneRoleMaster = "master"
StandAloneRoleSlave = "slave"
StandAloneRoleAll = "all"
TypeDecode = "decode"
TypeRestore = "restore"

@ -177,5 +177,5 @@ func (dd *dbDumper) dumpRDBFile(reader *bufio.Reader, writer *bufio.Writer, nsiz
p := 100 * n / nsize
log.Infof("routine[%v] total = %d - %12d [%3d%%]\n", dd.id, nsize, n, p)
}
log.Info("routine[%v] dump: rdb done", dd.id)
log.Infof("routine[%v] dump: rdb done", dd.id)
}

@ -187,6 +187,21 @@ func sanitizeOptions(tp string) error {
return fmt.Errorf("BigKeyThreshold[%v] should <= 524288000", conf.Options.BigKeyThreshold)
}
// source password
if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" {
return fmt.Errorf("only one of source password_raw or password_encoding should be given")
} else if conf.Options.SourcePasswordEncoding != "" {
sourcePassword := "" // todo, inner version
conf.Options.SourcePasswordRaw = string(sourcePassword)
}
// target password
if conf.Options.TargetPasswordRaw != "" && conf.Options.TargetPasswordEncoding != "" {
return fmt.Errorf("only one of target password_raw or password_encoding should be given")
} else if conf.Options.TargetPasswordEncoding != "" {
targetPassword := "" // todo, inner version
conf.Options.TargetPasswordRaw = string(targetPassword)
}
// parse source and target address and type
if err := utils.ParseAddress(tp); err != nil {
return fmt.Errorf("mode[%v] parse address failed[%v]", tp, err)
@ -211,24 +226,10 @@ func sanitizeOptions(tp string) error {
conf.Options.RdbParallel = len(conf.Options.RdbInput)
}
if conf.Options.SourcePasswordRaw != "" && conf.Options.SourcePasswordEncoding != "" {
return fmt.Errorf("only one of source password_raw or password_encoding should be given")
} else if conf.Options.SourcePasswordEncoding != "" {
sourcePassword := "" // todo, inner version
conf.Options.SourcePasswordRaw = string(sourcePassword)
}
if conf.Options.SourceParallel == 0 || conf.Options.SourceParallel > uint(len(conf.Options.SourceAddressList)) {
conf.Options.SourceParallel = uint(len(conf.Options.SourceAddressList))
}
if conf.Options.TargetPasswordRaw != "" && conf.Options.TargetPasswordEncoding != "" {
return fmt.Errorf("only one of target password_raw or password_encoding should be given")
} else if conf.Options.TargetPasswordEncoding != "" {
targetPassword := "" // todo, inner version
conf.Options.TargetPasswordRaw = string(targetPassword)
}
if conf.Options.LogFile != "" {
//conf.Options.LogFile = fmt.Sprintf("%s.log", conf.Options.Id)
@ -349,6 +350,7 @@ func sanitizeOptions(tp string) error {
if tp == conf.TypeRestore || tp == conf.TypeSync {
// get target redis version and set TargetReplace.
for _, address := range conf.Options.TargetAddressList {
// single connection even if the target is cluster
if v, err := utils.GetRedisVersion(address, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw); err != nil {
return fmt.Errorf("get target redis version failed[%v]", err)

@ -59,9 +59,14 @@ func (cmd *CmdRestore) Main() {
break
}
// round-robin pick
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList))
target := conf.Options.TargetAddressList[pick]
var target []string
if conf.Options.TargetType == conf.RedisTypeCluster {
target = conf.Options.TargetAddressList
} else {
// round-robin pick
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList))
target = []string{conf.Options.TargetAddressList[pick]}
}
dr := &dbRestorer{
id: node.id,
@ -92,9 +97,9 @@ func (cmd *CmdRestore) Main() {
/*------------------------------------------------------*/
// one restore link corresponding to one dbRestorer
type dbRestorer struct {
id int // id
input string // input rdb
target string
id int // id
input string // input rdb
target []string // len >= 1 when target type is cluster, otherwise len == 1
targetPassword string
// metric
@ -126,12 +131,13 @@ func (dr *dbRestorer) restore() {
base.Status = "extra"
if conf.Options.ExtraInfo && (nsize == 0 || nsize != dr.rbytes.Get()) {
// inner usage
dr.restoreCommand(reader, dr.target, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw)
}
}
func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target, auth_type, passwd string, nsize int64) {
func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64) {
pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize)
wait := make(chan struct{})
go func() {
@ -140,7 +146,7 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target, auth_type, pa
for i := 0; i < conf.Options.Parallel; i++ {
go func() {
defer wg.Done()
c := utils.OpenRedisConn(target, auth_type, passwd)
c := utils.OpenRedisConn(target, auth_type, passwd, conf.Options.TargetType == conf.RedisTypeCluster)
defer c.Close()
var lastdb uint32 = 0
for e := range pipe {
@ -190,8 +196,9 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target, auth_type, pa
log.Info("routine[%v] restore: rdb done", dr.id)
}
func (dr *dbRestorer) restoreCommand(reader *bufio.Reader, target, auth_type, passwd string) {
c := utils.OpenNetConn(target, auth_type, passwd)
func (dr *dbRestorer) restoreCommand(reader *bufio.Reader, target []string, auth_type, passwd string) {
// inner usage. only use on targe
c := utils.OpenNetConn(target[0], auth_type, passwd)
defer c.Close()
writer := bufio.NewWriterSize(c, utils.WriterBufferSize)

@ -39,11 +39,12 @@ func (cr *CmdRump) Main() {
cr.sourceConn = make([]redis.Conn, len(conf.Options.SourceAddressList))
for i, address := range conf.Options.SourceAddressList {
cr.sourceConn[i] = utils.OpenRedisConn(address, conf.Options.SourceAuthType, conf.Options.SourcePasswordRaw)
cr.sourceConn[i] = utils.OpenRedisConn([]string{address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false)
}
// TODO, current only support write data into 1 db or proxy
cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddressList[0], conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw)
cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddressList, conf.Options.TargetAuthType,
conf.Options.TargetPasswordRaw, false)
// init two channels
chanSize := int(conf.Options.ScanKeyNumber) * len(conf.Options.SourceAddressList)

@ -135,7 +135,8 @@ func NewDbSyncer(id int, source, sourcePassword, target, targetPassword string,
id: id,
source: source,
sourcePassword: sourcePassword,
target: target,
//todo
//target: target,
targetPassword: targetPassword,
httpProfilePort: httpPort,
waitFull: make(chan struct{}),
@ -150,10 +151,10 @@ func NewDbSyncer(id int, source, sourcePassword, target, targetPassword string,
type dbSyncer struct {
id int // current id in all syncer
source string // source address
sourcePassword string // source password
target string // target address
targetPassword string // target password
source string // source address
sourcePassword string // source password
target []string // target address
targetPassword string // target password
httpProfilePort int // http profile port
@ -238,16 +239,18 @@ func (ds *dbSyncer) sync() {
go heartbeatCtl.Start()
}
reader := bufio.NewReaderSize(input, utils.ReaderBufferSize)
// reader := bufio.NewReaderSize(input, utils.ReaderBufferSize)
// sync rdb
base.Status = "full"
ds.syncRDBFile(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, nsize)
// todo
// ds.syncRDBFile(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword, nsize)
// sync increment
base.Status = "incr"
close(ds.waitFull)
ds.syncCommand(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword)
// todo
//ds.syncCommand(reader, ds.target, conf.Options.TargetAuthType, ds.targetPassword)
}
func (ds *dbSyncer) sendSyncCmd(master, auth_type, passwd string) (net.Conn, int64) {
@ -396,7 +399,8 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target, auth_type, passwd
for i := 0; i < conf.Options.Parallel; i++ {
go func() {
defer wg.Done()
c := utils.OpenRedisConn(target, auth_type, passwd)
// todo
c := utils.OpenRedisConn([]string{target}, auth_type, passwd, false)
defer c.Close()
var lastdb uint32 = 0
for e := range pipe {

@ -26,6 +26,12 @@
"revision": "43ba34106c765f2111c0dc7b74cdf8ee437411e0",
"revisionTime": "2016-08-25T15:38:14Z"
},
{
"checksumSHA1": "CSPbwbyzqA6sfORicn4HFtIhF/c=",
"path": "github.com/davecgh/go-spew/spew",
"revision": "d8f796af33cc11cb798c1aaeb27a4ebc5099927d",
"revisionTime": "2018-08-30T19:11:22Z"
},
{
"checksumSHA1": "VbYBp7hfr7vdfliXjcqwrxSsMVg=",
"path": "github.com/docopt/docopt-go",
@ -62,12 +68,24 @@
"revision": "0ad87eef1443f64d3d8c50da647e2b1552851124",
"revisionTime": "2018-06-18T18:06:23Z"
},
{
"checksumSHA1": "LuFv4/jlrmFNnDb/5SCSEPAM9vU=",
"path": "github.com/pmezard/go-difflib/difflib",
"revision": "5d4384ee4fb2527b0a1256a821ebfc92f91efefc",
"revisionTime": "2018-12-26T10:54:42Z"
},
{
"checksumSHA1": "Tutue3nEgM/87jitUcYv6ODwyNE=",
"path": "github.com/satori/go.uuid",
"revision": "b2ce2384e17bbe0c6d34077efa39dbab3e09123b",
"revisionTime": "2018-10-28T12:50:25Z"
},
{
"checksumSHA1": "IIsn0Wdi5rHU8xca+FzDd+YeYlc=",
"path": "github.com/stretchr/testify/assert",
"revision": "34c6fa2dc70986bccbbffcc6130f6920a924b075",
"revisionTime": "2019-03-04T09:57:49Z"
},
{
"checksumSHA1": "TM3Neoy1xRAKyZYMGzKc41sDFW4=",
"path": "gopkg.in/natefinch/lumberjack.v2",

Loading…
Cancel
Save