support 5.0 stream

v4
vinllen 6 years ago
parent 16eb80c426
commit 0db0db1ba8
  1. 4
      src/pkg/rdb/loader.go
  2. 147
      src/pkg/rdb/reader.go
  3. 14
      src/redis-shake/common/utils.go

@ -154,12 +154,13 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) {
default: default:
var key []byte var key []byte
if l.remainMember == 0 { if l.remainMember == 0 {
// first time visit this key.
rkey, err := l.ReadString() rkey, err := l.ReadString()
if err != nil { if err != nil {
return nil, err return nil, err
} }
key = rkey key = rkey
entry.NeedReadLen = 1 entry.NeedReadLen = 1 // read value length when it's the first time.
} else { } else {
key = l.lastEntry.Key key = l.lastEntry.Key
} }
@ -178,6 +179,7 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) {
if l.lastReadCount == l.totMemberCount { if l.lastReadCount == l.totMemberCount {
entry.RealMemberCount = 0 entry.RealMemberCount = 0
} else { } else {
// RealMemberCount > 0 means this is big entry which also is a split key.
entry.RealMemberCount = l.lastReadCount entry.RealMemberCount = l.lastReadCount
} }
l.lastEntry = entry l.lastEntry = entry

@ -27,12 +27,13 @@ const (
RdbTypeHash = 4 RdbTypeHash = 4
RdbTypeZSet2 = 5 RdbTypeZSet2 = 5
RdbTypeHashZipmap = 9 RdbTypeHashZipmap = 9
RdbTypeListZiplist = 10 RdbTypeListZiplist = 10
RdbTypeSetIntset = 11 RdbTypeSetIntset = 11
RdbTypeZSetZiplist = 12 RdbTypeZSetZiplist = 12
RdbTypeHashZiplist = 13 RdbTypeHashZiplist = 13
RdbTypeQuicklist = 14 RdbTypeQuicklist = 14
RDBTypeStreamListPacks = 15 // stream
rdbFlagOnlyValue = 0xf9 rdbFlagOnlyValue = 0xf9
rdbFlagAUX = 0xfa rdbFlagAUX = 0xfa
@ -46,7 +47,8 @@ const (
const ( const (
rdb6bitLen = 0 rdb6bitLen = 0
rdb14bitLen = 1 rdb14bitLen = 1
rdb32bitLen = 2 rdb32bitLen = 0x80
rdb64bitLen = 0x81
rdbEncVal = 3 rdbEncVal = 3
rdbEncInt8 = 0 rdbEncInt8 = 0
@ -91,7 +93,7 @@ func (r *rdbReader) offset() int64 {
func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) { func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) {
var b bytes.Buffer var b bytes.Buffer
r = NewRdbReader(io.TeeReader(r, &b)) r = NewRdbReader(io.TeeReader(r, &b)) // the result will be written into b when calls r.Read()
lr := l.rdbReader lr := l.rdbReader
switch t { switch t {
default: default:
@ -181,6 +183,112 @@ func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) {
if lr.lastReadCount == n { if lr.lastReadCount == n {
lr.remainMember = 0 lr.remainMember = 0
} }
case RDBTypeStreamListPacks:
// TODO, need to judge big key
lr.lastReadCount, lr.remainMember, lr.totMemberCount = 0, 0, 0
// list pack length
nListPacks, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nListPacks); i++ {
// read twice
if _, err := r.ReadString(); err != nil {
return nil, err
}
if _, err := r.ReadString(); err != nil {
return nil, err
}
}
// items
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// last_entry_id timestamp second
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// last_entry_id timestamp millisecond
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// cgroups length
nCgroups, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nCgroups); i++ {
// cname
if _, err := r.ReadString(); err != nil {
return nil, err
}
// last_cg_entry_id timestamp second
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// last_cg_entry_id timestamp millisecond
if _, err := r.ReadLength(); err != nil {
return nil, err
}
// pending number
nPending, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nPending); i++ {
// eid, read 16 bytes
b := make([]byte, 16)
if err := r.readFull(b); err != nil {
return nil, err
}
// seen_time
b = make([]byte, 8)
if err := r.readFull(b); err != nil {
return nil, err
}
// delivery_count
if _, err := r.ReadLength(); err != nil {
return nil, err
}
}
// consumers
nConsumers, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nConsumers); i++ {
// cname
if _, err := r.ReadString(); err != nil {
return nil, err
}
// seen_time
b := make([]byte, 8)
if err := r.readFull(b); err != nil {
return nil, err
}
// pending
nPending2, err := r.ReadLength()
if err != nil {
return nil, err
}
for i := 0; i < int(nPending2); i++ {
// eid, read 16 bytes
b := make([]byte, 16)
if err := r.readFull(b); err != nil {
return nil, err
}
}
}
}
} }
return b.Bytes(), nil return b.Bytes(), nil
} }
@ -226,16 +334,25 @@ func (r *rdbReader) readEncodedLength() (length uint32, encoded bool, err error)
if err != nil { if err != nil {
return return
} }
length = uint32(u & 0x3f)
switch u >> 6 { switch u >> 6 {
case rdb6bitLen: case rdb6bitLen:
length = uint32(u & 0x3f)
case rdb14bitLen: case rdb14bitLen:
u, err = r.readUint8() var u2 uint8
length = (length << 8) + uint32(u) u2, err = r.readUint8()
length = (uint32(u & 0x3f) << 8) + uint32(u2)
case rdbEncVal: case rdbEncVal:
encoded = true encoded = true
length = uint32(u & 0x3f)
default: default:
length, err = r.readUint32BigEndian() switch u {
case rdb32bitLen:
length, err = r.readUint32BigEndian()
case rdb64bitLen:
length, err = r.readUint64BigEndian()
default:
length, err = 0, fmt.Errorf("unknown encoding length[%v]", u)
}
} }
return return
} }
@ -325,6 +442,12 @@ func (r *rdbReader) readUint32BigEndian() (uint32, error) {
return binary.BigEndian.Uint32(b), err return binary.BigEndian.Uint32(b), err
} }
func (r *rdbReader) readUint64BigEndian() (uint32, error) {
b := r.buf[:8]
err := r.readFull(b)
return binary.BigEndian.Uint32(b), err
}
func (r *rdbReader) readInt8() (int8, error) { func (r *rdbReader) readInt8() (int8, error) {
u, err := r.readUint8() u, err := r.readUint8()
return int8(u), err return int8(u), err

@ -301,15 +301,16 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) {
if err != nil { if err != nil {
log.PanicError(err, "read rdb ") log.PanicError(err, "read rdb ")
} }
//log.Info("restore quicklist key: ", string(e.Key), ", type: ", t) // log.Info("restore quicklist key: ", string(e.Key), ", type: ", e.Type)
count := 0 count := 0
if n, err := r.ReadLength(); err != nil { if n, err := r.ReadLength(); err != nil {
log.PanicError(err, "read rdb ") log.PanicError(err, "read rdb ")
} else { } else {
//log.Info("quicklist item size: ", int(n)) // log.Info("quicklist item size: ", int(n))
for i := 0; i < int(n); i++ { for i := 0; i < int(n); i++ {
ziplist, err := r.ReadString() ziplist, err := r.ReadString()
// log.Info("zipList: ", ziplist)
if err != nil { if err != nil {
log.PanicError(err, "read rdb ") log.PanicError(err, "read rdb ")
} }
@ -317,12 +318,13 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) {
if zln, err := r.ReadZiplistLength(buf); err != nil { if zln, err := r.ReadZiplistLength(buf); err != nil {
log.PanicError(err, "read rdb") log.PanicError(err, "read rdb")
} else { } else {
//log.Info("ziplist one of quicklist, size: ", int(zln)) // log.Info("ziplist one of quicklist, size: ", int(zln))
for i := int64(0); i < zln; i++ { for i := int64(0); i < zln; i++ {
entry, err := r.ReadZiplistEntry(buf) entry, err := r.ReadZiplistEntry(buf)
if err != nil { if err != nil {
log.PanicError(err, "read rdb ") log.PanicError(err, "read rdb ")
} }
// log.Info("rpush key: ", e.Key, " value: ", entry)
count++ count++
c.Send("RPUSH", e.Key, entry) c.Send("RPUSH", e.Key, entry)
if count == 100 { if count == 100 {
@ -674,7 +676,9 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
return return
} }
if uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0 { // TODO, need to judge big key
if e.Type != rdb.RDBTypeStreamListPacks &&
(uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0) {
//use command //use command
if conf.Options.Rewrite && e.NeedReadLen == 1 { if conf.Options.Rewrite && e.NeedReadLen == 1 {
if !conf.Options.Metric { if !conf.Options.Metric {
@ -694,6 +698,8 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
} }
return return
} }
// fmt.Printf("kkkey: %v, value: %v\n", string(e.Key), e.Value)
s, err := redigo.String(c.Do("restore", e.Key, ttlms, e.Value)) s, err := redigo.String(c.Do("restore", e.Key, ttlms, e.Value))
if err != nil { if err != nil {
/*The reply value of busykey in 2.8 kernel is "target key name is busy", /*The reply value of busykey in 2.8 kernel is "target key name is busy",

Loading…
Cancel
Save