From e8d2881a72327c6d2c50b4b3ece704a673378546 Mon Sep 17 00:00:00 2001 From: "suxiaobin.sxb" Date: Wed, 29 Nov 2023 15:26:02 +0800 Subject: [PATCH] feature: Add support for rdbTypeSetListpack and rdbTypeStreamListpacks3 in Redis 7.2 --- internal/rdb/types/interface.go | 8 +++-- internal/rdb/types/set.go | 2 ++ internal/rdb/types/set_test.go | 52 +++++++++++++++++++++++++++++++++ internal/rdb/types/stream.go | 6 ++++ 4 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 internal/rdb/types/set_test.go diff --git a/internal/rdb/types/interface.go b/internal/rdb/types/interface.go index 03ba67e..04cb96a 100644 --- a/internal/rdb/types/interface.go +++ b/internal/rdb/types/interface.go @@ -45,6 +45,8 @@ const ( rdbTypeZSetListpack = 17 // RDB_TYPE_ZSET_LISTPACK rdbTypeListQuicklist2 = 18 // RDB_TYPE_LIST_QUICKLIST_2 https://github.com/redis/redis/pull/9357 rdbTypeStreamListpacks2 = 19 // RDB_TYPE_STREAM_LISTPACKS2 + rdbTypeSetListpack = 20 // RDB_TYPE_SET_LISTPACK + rdbTypeStreamListpacks3 = 21 // RDB_TYPE_STREAM_LISTPACKS_3 moduleTypeNameCharSet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_" @@ -74,7 +76,7 @@ func ParseObject(rd io.Reader, typeByte byte, key string) RedisObject { o := new(ListObject) o.LoadFromBuffer(rd, key, typeByte) return o - case rdbTypeSet, rdbTypeSetIntset: // set + case rdbTypeSet, rdbTypeSetIntset, rdbTypeSetListpack: // set o := new(SetObject) o.LoadFromBuffer(rd, key, typeByte) return o @@ -86,7 +88,7 @@ func ParseObject(rd io.Reader, typeByte byte, key string) RedisObject { o := new(HashObject) o.LoadFromBuffer(rd, key, typeByte) return o - case rdbTypeStreamListpacks, rdbTypeStreamListpacks2: // stream + case rdbTypeStreamListpacks, rdbTypeStreamListpacks2, rdbTypeStreamListpacks3: // stream o := new(StreamObject) o.LoadFromBuffer(rd, key, typeByte) return o @@ -94,7 +96,7 @@ func ParseObject(rd io.Reader, typeByte byte, key string) RedisObject { o := PareseModuleType(rd, key, typeByte) return o } - log.Panicf("unknown type byte: %d", typeByte) + log.Panicf("unknown rdb value type byte. key=[%s], type=[%d]", key, typeByte) return nil } diff --git a/internal/rdb/types/set.go b/internal/rdb/types/set.go index 2f83b84..b66188c 100644 --- a/internal/rdb/types/set.go +++ b/internal/rdb/types/set.go @@ -18,6 +18,8 @@ func (o *SetObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) { o.readSet(rd) case rdbTypeSetIntset: o.elements = structure.ReadIntset(rd) + case rdbTypeSetListpack: + o.elements = structure.ReadListpack(rd) default: log.Panicf("unknown set type. typeByte=[%d]", typeByte) } diff --git a/internal/rdb/types/set_test.go b/internal/rdb/types/set_test.go new file mode 100644 index 0000000..df5a8db --- /dev/null +++ b/internal/rdb/types/set_test.go @@ -0,0 +1,52 @@ +package types + +import ( + "bytes" + "sort" + "testing" +) + +// rdbTypeSet 0 +// rdbTypeSetIntset 11 +// rdbTypeSetListpack 20 + +func testOne(t *testing.T, typeByte byte, setData string, values []string) { + if typeByte != setData[0] { + t.Errorf("typeByte not match. typeByte=[%d]", typeByte) + } + o := new(SetObject) + o.LoadFromBuffer(bytes.NewReader([]byte(setData[1:])), "key", typeByte) + if len(o.elements) != len(values) { + t.Errorf("elements not match. len(o.elements)=[%d], len(values)=[%d]", len(o.elements), len(values)) + } + count := len(o.elements) + sort.Strings(o.elements) + sort.Strings(values) + // check set + for i := 0; i < count; i++ { + if o.elements[i] != values[i] { + t.Errorf("elements not match. o.elements[i]=[%s], values[i]=[%s]", o.elements[i], values[i]) + } + } +} + +// sadd key 1 2 3 4 q w e r +func TestSetListpack(t *testing.T) { + data := "\x14\x1b\x1b\x00\x00\x00\b\x00\x81w\x02\x81r\x02\x04\x01\x81e\x02\x01\x01\x02\x01\x81q\x02\x03\x01\xff\x0b\x00T\xe9)\xf7*\xe0\xe3\xf9" + values := []string{"1", "2", "3", "4", "q", "w", "e", "r"} + testOne(t, rdbTypeSetListpack, data, values) +} + +// sadd key 1 2 3 4 5 6 7 8 +func TestSetIntset(t *testing.T) { + data := "\x0b\x18\x02\x00\x00\x00\b\x00\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\a\x00\b\x00\x0b\x00\xd7\x03\xf0nIZV\x8d" + values := []string{"1", "2", "3", "4", "5", "6", "7", "8"} + testOne(t, rdbTypeSetIntset, data, values) +} + +// sadd key 1 2 3 4 q w e r +func TestSet(t *testing.T) { + data := "\x02\b\xc0\x04\x01r\x01w\xc0\x02\xc0\x01\xc0\x03\x01q\x01e\t\x00r\x99O\xba\x8c\x8f\x00\xcb" + values := []string{"1", "2", "3", "4", "q", "w", "e", "r"} + testOne(t, rdbTypeSet, data, values) +} diff --git a/internal/rdb/types/stream.go b/internal/rdb/types/stream.go index 402affb..f1eb7b2 100644 --- a/internal/rdb/types/stream.go +++ b/internal/rdb/types/stream.go @@ -51,6 +51,8 @@ func (o *StreamObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) { o.readStream(rd, key, typeByte) case rdbTypeStreamListpacks2: o.readStream(rd, key, typeByte) + case rdbTypeStreamListpacks3: + o.readStream(rd, key, typeByte) default: log.Panicf("unknown hash type. typeByte=[%d]", typeByte) } @@ -205,6 +207,10 @@ func (o *StreamObject) readStream(rd io.Reader, masterKey string, typeByte byte) /* Load lastSeenTime */ _ = structure.ReadUint64(rd) + if typeByte >= rdbTypeStreamListpacks3 { + _ = structure.ReadUint64(rd) // consumer->active_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); + } + /* Consumer PEL */ nPEL := int(structure.ReadLength(rd)) for i := 0; i < nPEL; i++ {