support 5.0 stream

v4
zhuzhao.cx 6 years ago committed by vinllen
parent 09e64bbf01
commit 55cb059e47
  1. 4
      src/pkg/rdb/loader.go
  2. 147
      src/pkg/rdb/reader.go
  3. 14
      src/redis-shake/common/utils.go

@ -154,12 +154,13 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) {
default:
var key []byte
if l.remainMember == 0 {
// first time visit this key.
rkey, err := l.ReadString()
if err != nil {
return nil, err
}
key = rkey
entry.NeedReadLen = 1
entry.NeedReadLen = 1 // read value length when it's the first time.
} else {
key = l.lastEntry.Key
}
@ -178,6 +179,7 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) {
if l.lastReadCount == l.totMemberCount {
entry.RealMemberCount = 0
} else {
// RealMemberCount > 0 means this is big entry which also is a split key.
entry.RealMemberCount = l.lastReadCount
}
l.lastEntry = entry

@ -27,12 +27,13 @@ const (
RdbTypeHash = 4
RdbTypeZSet2 = 5
RdbTypeHashZipmap = 9
RdbTypeListZiplist = 10
RdbTypeSetIntset = 11
RdbTypeZSetZiplist = 12
RdbTypeHashZiplist = 13
RdbTypeQuicklist = 14
RdbTypeHashZipmap = 9
RdbTypeListZiplist = 10
RdbTypeSetIntset = 11
RdbTypeZSetZiplist = 12
RdbTypeHashZiplist = 13
RdbTypeQuicklist = 14
RDBTypeStreamListPacks = 15 // stream
rdbFlagOnlyValue = 0xf9
rdbFlagAUX = 0xfa
@ -46,7 +47,8 @@ const (
const (
rdb6bitLen = 0
rdb14bitLen = 1
rdb32bitLen = 2
rdb32bitLen = 0x80
rdb64bitLen = 0x81
rdbEncVal = 3
rdbEncInt8 = 0
@ -91,7 +93,7 @@ func (r *rdbReader) offset() int64 {
func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) {
var b bytes.Buffer
r = NewRdbReader(io.TeeReader(r, &b))
r = NewRdbReader(io.TeeReader(r, &b)) // the result will be written into b when calls r.Read()
lr := l.rdbReader
switch t {
default:
@ -181,6 +183,112 @@ func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) {
if lr.lastReadCount == n {
lr.remainMember = 0
}
case RDBTypeStreamListPacks:
// TODO, need to judge big key
lr.lastReadCount, lr.remainMember, lr.totMemberCount = 0, 0, 0
// list pack length
nListPacks, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nListPacks); i++ {
// read twice
if _, err := r.ReadString(); err != nil {
return nil, err
}
if _, err := r.ReadString(); err != nil {
return nil, err
}
}
// items
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// last_entry_id timestamp second
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// last_entry_id timestamp millisecond
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// cgroups length
nCgroups, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nCgroups); i++ {
// cname
if _, err := r.ReadString(); err != nil {
return nil, err
}
// last_cg_entry_id timestamp second
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// last_cg_entry_id timestamp millisecond
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// pending number
nPending, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nPending); i++ {
// eid, read 16 bytes
b := make([]byte, 16)
if err := r.readFull(b); err != nil {
return nil, err
}
// seen_time
b = make([]byte, 8)
if err := r.readFull(b); err != nil {
return nil, err
}
// delivery_count
if _, err := r.ReadLength(); err != nil {
return nil, err
}
}
// consumers
nConsumers, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nConsumers); i++ {
// cname
if _, err := r.ReadString(); err != nil {
return nil, err
}
// seen_time
b := make([]byte, 8)
if err := r.readFull(b); err != nil {
return nil, err
}
// pending
nPending2, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nPending2); i++ {
// eid, read 16 bytes
b := make([]byte, 16)
if err := r.readFull(b); err != nil {
return nil, err
}
}
}
}
}
return b.Bytes(), nil
}
@ -226,16 +334,25 @@ func (r *rdbReader) readEncodedLength() (length uint32, encoded bool, err error)
if err != nil {
return
}
length = uint32(u & 0x3f)
switch u >> 6 {
case rdb6bitLen:
length = uint32(u & 0x3f)
case rdb14bitLen:
u, err = r.readUint8()
length = (length << 8) + uint32(u)
var u2 uint8
u2, err = r.readUint8()
length = (uint32(u & 0x3f) << 8) + uint32(u2)
case rdbEncVal:
encoded = true
length = uint32(u & 0x3f)
default:
length, err = r.readUint32BigEndian()
switch u {
case rdb32bitLen:
length, err = r.readUint32BigEndian()
case rdb64bitLen:
length, err = r.readUint64BigEndian()
default:
length, err = 0, fmt.Errorf("unknown encoding length[%v]", u)
}
}
return
}
@ -325,6 +442,12 @@ func (r *rdbReader) readUint32BigEndian() (uint32, error) {
return binary.BigEndian.Uint32(b), err
}
func (r *rdbReader) readUint64BigEndian() (uint32, error) {
b := r.buf[:8]
err := r.readFull(b)
return binary.BigEndian.Uint32(b), err
}
func (r *rdbReader) readInt8() (int8, error) {
u, err := r.readUint8()
return int8(u), err

@ -301,15 +301,16 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) {
if err != nil {
log.PanicError(err, "read rdb ")
}
//log.Info("restore quicklist key: ", string(e.Key), ", type: ", t)
// log.Info("restore quicklist key: ", string(e.Key), ", type: ", e.Type)
count := 0
if n, err := r.ReadLength(); err != nil {
log.PanicError(err, "read rdb ")
} else {
//log.Info("quicklist item size: ", int(n))
// log.Info("quicklist item size: ", int(n))
for i := 0; i < int(n); i++ {
ziplist, err := r.ReadString()
// log.Info("zipList: ", ziplist)
if err != nil {
log.PanicError(err, "read rdb ")
}
@ -317,12 +318,13 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) {
if zln, err := r.ReadZiplistLength(buf); err != nil {
log.PanicError(err, "read rdb")
} else {
//log.Info("ziplist one of quicklist, size: ", int(zln))
// log.Info("ziplist one of quicklist, size: ", int(zln))
for i := int64(0); i < zln; i++ {
entry, err := r.ReadZiplistEntry(buf)
if err != nil {
log.PanicError(err, "read rdb ")
}
// log.Info("rpush key: ", e.Key, " value: ", entry)
count++
c.Send("RPUSH", e.Key, entry)
if count == 100 {
@ -674,7 +676,9 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
return
}
if uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0 {
// TODO, need to judge big key
if e.Type != rdb.RDBTypeStreamListPacks &&
(uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0) {
//use command
if conf.Options.Rewrite && e.NeedReadLen == 1 {
if !conf.Options.Metric {
@ -694,6 +698,8 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
}
return
}
// fmt.Printf("kkkey: %v, value: %v\n", string(e.Key), e.Value)
s, err := redigo.String(c.Do("restore", e.Key, ttlms, e.Value))
if err != nil {
/*The reply value of busykey in 2.8 kernel is "target key name is busy",

Loading…
Cancel
Save