Merge pull request #5 from aliyun/feature-1.2

Feature 1.2
v4
zhuzhao.cx 6 years ago committed by GitHub
commit ea21d01af9
  1. 5
      ChangeLog
  2. 16
      conf/redis-shake.conf
  3. 4
      src/pkg/rdb/loader.go
  4. 149
      src/pkg/rdb/reader.go
  5. 19
      src/redis-shake/common/utils.go
  6. 1
      src/redis-shake/configure/configure.go
  7. 2
      src/redis-shake/sync.go

@ -1,3 +1,6 @@
2019-03-11 Alibaba Cloud.
* version: 1.2.0
* redis-shake: support 5.0.
2019-02-21 Alibaba Cloud.
* version: 1.0.0
* mongo-shake: initial release.
* redis-shake: initial release.

@ -19,7 +19,7 @@ parallel = 4
# input RDB file. read from stdin, default is stdin ('/dev/stdin').
# used in `decode` and `restore`.
# 如果是decode或者restore,这个参数表示读取的rdb文件
input_rdb = local_dump
input_rdb = local
# output RDB file. default is stdout ('/dev/stdout').
# used in `decode` and `dump`.
@ -35,8 +35,8 @@ source.address = 127.0.0.1:20441
source.password_raw = 123456
# auth type, don't modify it
source.auth_type = auth
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0)
source.version = 6
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0)
source.version = 9
# target redis configuration. used in `restore` and `sync`.
# used in `restore` and `sync`.
@ -47,7 +47,7 @@ target.address = 127.0.0.1:20551
target.password_raw = 123456
# auth type, don't modify it
target.auth_type = auth
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0)
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0)
target.version = 6
# all the data will come into this db. < 0 means disable.
# used in `restore` and `sync`.
@ -94,7 +94,7 @@ psync = false
metric = true
# print in log
# 是否将metric打印到log中
metric.print_log = true
metric.print_log = false
# heartbeat
# send heartbeat to this url
@ -126,6 +126,12 @@ sender.count = 5000
# 用于metric统计时延的队列
sender.delay_channel_size = 65535
# enable keep_alive option in TCP when connecting redis.
# the unit is second.
# 0 means disable.
# TCP keep-alive保活参数,单位秒,0表示不启用。
keep_alive = 0
# ----------------splitter----------------
# below variables are useless for current opensource version so don't set.

@ -154,12 +154,13 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) {
default:
var key []byte
if l.remainMember == 0 {
// first time visit this key.
rkey, err := l.ReadString()
if err != nil {
return nil, err
}
key = rkey
entry.NeedReadLen = 1
entry.NeedReadLen = 1 // read value length when it's the first time.
} else {
key = l.lastEntry.Key
}
@ -178,6 +179,7 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) {
if l.lastReadCount == l.totMemberCount {
entry.RealMemberCount = 0
} else {
// RealMemberCount > 0 means this is big entry which also is a split key.
entry.RealMemberCount = l.lastReadCount
}
l.lastEntry = entry

@ -16,7 +16,7 @@ import (
// "libs/log"
)
var FromVersion int64 = 8
var FromVersion int64 = 9
var ToVersion int64 = 6
const (
@ -27,12 +27,13 @@ const (
RdbTypeHash = 4
RdbTypeZSet2 = 5
RdbTypeHashZipmap = 9
RdbTypeListZiplist = 10
RdbTypeSetIntset = 11
RdbTypeZSetZiplist = 12
RdbTypeHashZiplist = 13
RdbTypeQuicklist = 14
RdbTypeHashZipmap = 9
RdbTypeListZiplist = 10
RdbTypeSetIntset = 11
RdbTypeZSetZiplist = 12
RdbTypeHashZiplist = 13
RdbTypeQuicklist = 14
RDBTypeStreamListPacks = 15 // stream
rdbFlagOnlyValue = 0xf9
rdbFlagAUX = 0xfa
@ -46,7 +47,8 @@ const (
const (
rdb6bitLen = 0
rdb14bitLen = 1
rdb32bitLen = 2
rdb32bitLen = 0x80
rdb64bitLen = 0x81
rdbEncVal = 3
rdbEncInt8 = 0
@ -91,7 +93,7 @@ func (r *rdbReader) offset() int64 {
func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) {
var b bytes.Buffer
r = NewRdbReader(io.TeeReader(r, &b))
r = NewRdbReader(io.TeeReader(r, &b)) // the result will be written into b when calls r.Read()
lr := l.rdbReader
switch t {
default:
@ -181,6 +183,112 @@ func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) {
if lr.lastReadCount == n {
lr.remainMember = 0
}
case RDBTypeStreamListPacks:
// TODO, need to judge big key
lr.lastReadCount, lr.remainMember, lr.totMemberCount = 0, 0, 0
// list pack length
nListPacks, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nListPacks); i++ {
// read twice
if _, err := r.ReadString(); err != nil {
return nil, err
}
if _, err := r.ReadString(); err != nil {
return nil, err
}
}
// items
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// last_entry_id timestamp second
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// last_entry_id timestamp millisecond
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// cgroups length
nCgroups, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nCgroups); i++ {
// cname
if _, err := r.ReadString(); err != nil {
return nil, err
}
// last_cg_entry_id timestamp second
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// last_cg_entry_id timestamp millisecond
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// pending number
nPending, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nPending); i++ {
// eid, read 16 bytes
b := make([]byte, 16)
if err := r.readFull(b); err != nil {
return nil, err
}
// seen_time
b = make([]byte, 8)
if err := r.readFull(b); err != nil {
return nil, err
}
// delivery_count
if _, err := r.ReadLength(); err != nil {
return nil, err
}
}
// consumers
nConsumers, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nConsumers); i++ {
// cname
if _, err := r.ReadString(); err != nil {
return nil, err
}
// seen_time
b := make([]byte, 8)
if err := r.readFull(b); err != nil {
return nil, err
}
// pending
nPending2, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nPending2); i++ {
// eid, read 16 bytes
b := make([]byte, 16)
if err := r.readFull(b); err != nil {
return nil, err
}
}
}
}
}
return b.Bytes(), nil
}
@ -226,16 +334,25 @@ func (r *rdbReader) readEncodedLength() (length uint32, encoded bool, err error)
if err != nil {
return
}
length = uint32(u & 0x3f)
switch u >> 6 {
case rdb6bitLen:
length = uint32(u & 0x3f)
case rdb14bitLen:
u, err = r.readUint8()
length = (length << 8) + uint32(u)
var u2 uint8
u2, err = r.readUint8()
length = (uint32(u & 0x3f) << 8) + uint32(u2)
case rdbEncVal:
encoded = true
length = uint32(u & 0x3f)
default:
length, err = r.readUint32BigEndian()
switch u {
case rdb32bitLen:
length, err = r.readUint32BigEndian()
case rdb64bitLen:
length, err = r.readUint64BigEndian()
default:
length, err = 0, fmt.Errorf("unknown encoding length[%v]", u)
}
}
return
}
@ -325,6 +442,12 @@ func (r *rdbReader) readUint32BigEndian() (uint32, error) {
return binary.BigEndian.Uint32(b), err
}
func (r *rdbReader) readUint64BigEndian() (uint32, error) {
b := r.buf[:8]
err := r.readFull(b)
return binary.BigEndian.Uint32(b), err
}
func (r *rdbReader) readInt8() (int8, error) {
u, err := r.readUint8()
return int8(u), err

@ -34,7 +34,10 @@ func OpenRedisConnWithTimeout(target, auth_type, passwd string, readTimeout, wri
}
func OpenNetConn(target, auth_type, passwd string) net.Conn {
c, err := net.Dial("tcp", target)
d := net.Dialer{
KeepAlive: time.Duration(conf.Options.KeepAlive) * time.Second,
}
c, err := d.Dial("tcp", target)
if err != nil {
log.PanicErrorf(err, "cannot connect to '%s'", target)
}
@ -301,15 +304,16 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) {
if err != nil {
log.PanicError(err, "read rdb ")
}
//log.Info("restore quicklist key: ", string(e.Key), ", type: ", t)
// log.Info("restore quicklist key: ", string(e.Key), ", type: ", e.Type)
count := 0
if n, err := r.ReadLength(); err != nil {
log.PanicError(err, "read rdb ")
} else {
//log.Info("quicklist item size: ", int(n))
// log.Info("quicklist item size: ", int(n))
for i := 0; i < int(n); i++ {
ziplist, err := r.ReadString()
// log.Info("zipList: ", ziplist)
if err != nil {
log.PanicError(err, "read rdb ")
}
@ -317,12 +321,13 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) {
if zln, err := r.ReadZiplistLength(buf); err != nil {
log.PanicError(err, "read rdb")
} else {
//log.Info("ziplist one of quicklist, size: ", int(zln))
// log.Info("ziplist one of quicklist, size: ", int(zln))
for i := int64(0); i < zln; i++ {
entry, err := r.ReadZiplistEntry(buf)
if err != nil {
log.PanicError(err, "read rdb ")
}
// log.Info("rpush key: ", e.Key, " value: ", entry)
count++
c.Send("RPUSH", e.Key, entry)
if count == 100 {
@ -674,7 +679,9 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
return
}
if uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0 {
// TODO, need to judge big key
if e.Type != rdb.RDBTypeStreamListPacks &&
(uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0) {
//use command
if conf.Options.Rewrite && e.NeedReadLen == 1 {
if !conf.Options.Metric {
@ -694,6 +701,8 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
}
return
}
// fmt.Printf("kkkey: %v, value: %v\n", string(e.Key), e.Value)
s, err := redigo.String(c.Do("restore", e.Key, ttlms, e.Value))
if err != nil {
/*The reply value of busykey in 2.8 kernel is "target key name is busy",

@ -39,6 +39,7 @@ type Configuration struct {
SenderSize uint64 `config:"sender.size"`
SenderCount uint `config:"sender.count"`
SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
KeepAlive uint `config:"keep_alive"`
// inner variables
ReplaceHashTag bool `config:"replace_hash_tag"`

@ -361,7 +361,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd
offset, err := utils.GetFakeSlaveOffset(srcConn)
if err != nil {
// log.PurePrintf("%s\n", NewLogItem("GetFakeSlaveOffsetFail", "WARN", NewErrorLogDetail("", err.Error())))
log.Warnf("Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s", conf.Options.Id, err.Error())
log.Warnf("Event:GetFakeSlaveOffsetFail\tId:%s\tWarn:%s", conf.Options.Id, err.Error())
// Reconnect while network error happen
if err == io.EOF {

Loading…
Cancel
Save