From 55cb059e473336e5dbe09e46fd4f4d95a41e9e32 Mon Sep 17 00:00:00 2001 From: "zhuzhao.cx" Date: Thu, 14 Mar 2019 17:27:14 +0800 Subject: [PATCH] support 5.0 stream --- src/pkg/rdb/loader.go | 4 +- src/pkg/rdb/reader.go | 147 +++++++++++++++++++++++++++++--- src/redis-shake/common/utils.go | 14 ++- 3 files changed, 148 insertions(+), 17 deletions(-) diff --git a/src/pkg/rdb/loader.go b/src/pkg/rdb/loader.go index b174463..03997d9 100644 --- a/src/pkg/rdb/loader.go +++ b/src/pkg/rdb/loader.go @@ -154,12 +154,13 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) { default: var key []byte if l.remainMember == 0 { + // first time visit this key. rkey, err := l.ReadString() if err != nil { return nil, err } key = rkey - entry.NeedReadLen = 1 + entry.NeedReadLen = 1 // read value length when it's the first time. } else { key = l.lastEntry.Key } @@ -178,6 +179,7 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) { if l.lastReadCount == l.totMemberCount { entry.RealMemberCount = 0 } else { + // RealMemberCount > 0 means this is big entry which also is a split key. entry.RealMemberCount = l.lastReadCount } l.lastEntry = entry diff --git a/src/pkg/rdb/reader.go b/src/pkg/rdb/reader.go index aad9f41..f608b00 100644 --- a/src/pkg/rdb/reader.go +++ b/src/pkg/rdb/reader.go @@ -27,12 +27,13 @@ const ( RdbTypeHash = 4 RdbTypeZSet2 = 5 - RdbTypeHashZipmap = 9 - RdbTypeListZiplist = 10 - RdbTypeSetIntset = 11 - RdbTypeZSetZiplist = 12 - RdbTypeHashZiplist = 13 - RdbTypeQuicklist = 14 + RdbTypeHashZipmap = 9 + RdbTypeListZiplist = 10 + RdbTypeSetIntset = 11 + RdbTypeZSetZiplist = 12 + RdbTypeHashZiplist = 13 + RdbTypeQuicklist = 14 + RDBTypeStreamListPacks = 15 // stream rdbFlagOnlyValue = 0xf9 rdbFlagAUX = 0xfa @@ -46,7 +47,8 @@ const ( const ( rdb6bitLen = 0 rdb14bitLen = 1 - rdb32bitLen = 2 + rdb32bitLen = 0x80 + rdb64bitLen = 0x81 rdbEncVal = 3 rdbEncInt8 = 0 @@ -91,7 +93,7 @@ func (r *rdbReader) offset() int64 { func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) { var b bytes.Buffer - r = NewRdbReader(io.TeeReader(r, &b)) + r = NewRdbReader(io.TeeReader(r, &b)) // the result will be written into b when calls r.Read() lr := l.rdbReader switch t { default: @@ -181,6 +183,112 @@ func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) { if lr.lastReadCount == n { lr.remainMember = 0 } + case RDBTypeStreamListPacks: + // TODO, need to judge big key + lr.lastReadCount, lr.remainMember, lr.totMemberCount = 0, 0, 0 + // list pack length + nListPacks, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nListPacks); i++ { + // read twice + if _, err := r.ReadString(); err != nil { + return nil, err + } + if _, err := r.ReadString(); err != nil { + return nil, err + } + } + + // items + if _, err := r.ReadLength(); err != nil { + return nil, err + } + // last_entry_id timestamp second + if _, err := r.ReadLength(); err != nil { + return nil, err + } + // last_entry_id timestamp millisecond + if _, err := r.ReadLength(); err != nil { + return nil, err + } + + // cgroups length + nCgroups, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nCgroups); i++ { + // cname + if _, err := r.ReadString(); err != nil { + return nil, err + } + + // last_cg_entry_id timestamp second + if _, err := r.ReadLength(); err != nil { + return nil, err + } + // last_cg_entry_id timestamp millisecond + if _, err := r.ReadLength(); err != nil { + return nil, err + } + + // pending number + nPending, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nPending); i++ { + // eid, read 16 bytes + b := make([]byte, 16) + if err := r.readFull(b); err != nil { + return nil, err + } + + // seen_time + b = make([]byte, 8) + if err := r.readFull(b); err != nil { + return nil, err + } + + // delivery_count + if _, err := r.ReadLength(); err != nil { + return nil, err + } + } + + // consumers + nConsumers, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nConsumers); i++ { + // cname + if _, err := r.ReadString(); err != nil { + return nil, err + } + + // seen_time + b := make([]byte, 8) + if err := r.readFull(b); err != nil { + return nil, err + } + + // pending + nPending2, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nPending2); i++ { + // eid, read 16 bytes + b := make([]byte, 16) + if err := r.readFull(b); err != nil { + return nil, err + } + } + } + } } return b.Bytes(), nil } @@ -226,16 +334,25 @@ func (r *rdbReader) readEncodedLength() (length uint32, encoded bool, err error) if err != nil { return } - length = uint32(u & 0x3f) switch u >> 6 { case rdb6bitLen: + length = uint32(u & 0x3f) case rdb14bitLen: - u, err = r.readUint8() - length = (length << 8) + uint32(u) + var u2 uint8 + u2, err = r.readUint8() + length = (uint32(u & 0x3f) << 8) + uint32(u2) case rdbEncVal: encoded = true + length = uint32(u & 0x3f) default: - length, err = r.readUint32BigEndian() + switch u { + case rdb32bitLen: + length, err = r.readUint32BigEndian() + case rdb64bitLen: + length, err = r.readUint64BigEndian() + default: + length, err = 0, fmt.Errorf("unknown encoding length[%v]", u) + } } return } @@ -325,6 +442,12 @@ func (r *rdbReader) readUint32BigEndian() (uint32, error) { return binary.BigEndian.Uint32(b), err } +func (r *rdbReader) readUint64BigEndian() (uint32, error) { + b := r.buf[:8] + err := r.readFull(b) + return binary.BigEndian.Uint32(b), err +} + func (r *rdbReader) readInt8() (int8, error) { u, err := r.readUint8() return int8(u), err diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 98d5bd4..3be3bc9 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -301,15 +301,16 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) { if err != nil { log.PanicError(err, "read rdb ") } - //log.Info("restore quicklist key: ", string(e.Key), ", type: ", t) + // log.Info("restore quicklist key: ", string(e.Key), ", type: ", e.Type) count := 0 if n, err := r.ReadLength(); err != nil { log.PanicError(err, "read rdb ") } else { - //log.Info("quicklist item size: ", int(n)) + // log.Info("quicklist item size: ", int(n)) for i := 0; i < int(n); i++ { ziplist, err := r.ReadString() + // log.Info("zipList: ", ziplist) if err != nil { log.PanicError(err, "read rdb ") } @@ -317,12 +318,13 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) { if zln, err := r.ReadZiplistLength(buf); err != nil { log.PanicError(err, "read rdb") } else { - //log.Info("ziplist one of quicklist, size: ", int(zln)) + // log.Info("ziplist one of quicklist, size: ", int(zln)) for i := int64(0); i < zln; i++ { entry, err := r.ReadZiplistEntry(buf) if err != nil { log.PanicError(err, "read rdb ") } + // log.Info("rpush key: ", e.Key, " value: ", entry) count++ c.Send("RPUSH", e.Key, entry) if count == 100 { @@ -674,7 +676,9 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { return } - if uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0 { + // TODO, need to judge big key + if e.Type != rdb.RDBTypeStreamListPacks && + (uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0) { //use command if conf.Options.Rewrite && e.NeedReadLen == 1 { if !conf.Options.Metric { @@ -694,6 +698,8 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { } return } + + // fmt.Printf("kkkey: %v, value: %v\n", string(e.Key), e.Value) s, err := redigo.String(c.Do("restore", e.Key, ttlms, e.Value)) if err != nil { /*The reply value of busykey in 2.8 kernel is "target key name is busy",