Youlin Feng 12 months ago committed by suxb201
parent 001c33680a
commit a194c59601
  1. 239
      internal/rdb/types/mbbloom.go
  2. 5
      internal/rdb/types/module2.go

@ -0,0 +1,239 @@
package types
import (
"RedisShake/internal/rdb/structure"
"io"
"strconv"
"unsafe"
)
// BloomObject for MBbloom--
type BloomObject struct {
encver int
key string
sb chain
}
type chain struct {
filters []link
size uint64
nfilters uint64
options uint64
growth uint64
}
type link struct {
inner bloom
size uint64
}
type bloom struct {
hashes uint64
n2 uint64
entries uint64
err float64
bpe float64
bf string
bits uint64
}
type dumpedChainHeader struct {
size uint64
nfilters uint32
options uint32
growth uint32
}
type dumpedChainLink struct {
bytes uint64
bits uint64
size uint64
err float64
bpe float64
hashes uint32
entries uint32
enthigh uint32
n2 uint8
}
type dumpedChainHeaderV3 struct {
size uint64
nfilters uint32
options uint32
}
type dumpedChainLinkV3 struct {
bytes uint64
bits uint64
size uint64
err float64
bpe float64
hashes uint32
entries uint32
n2 uint8
}
const (
BF_MIN_OPTIONS_ENC = 2
BF_MIN_GROWTH_ENC = 4
)
const (
DUMPED_CHAIN_HEADER_SIZE = 20
DUMPED_CHAIN_LINK_SIZE = 53
DUMPED_CHAIN_HEADER_SIZE_V3 = 16
DUMPED_CHAIN_LINK_SIZE_V3 = 49
)
const MAX_SCANDUMP_SIZE = 10485760 // 10MB
func (o *BloomObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) {
o.key = key
var sb chain
sb.size = readUnsigned(rd)
sb.nfilters = readUnsigned(rd)
if o.encver >= BF_MIN_OPTIONS_ENC {
sb.options = readUnsigned(rd)
}
if o.encver >= BF_MIN_GROWTH_ENC {
sb.growth = readUnsigned(rd)
} else {
sb.growth = 2
}
for i := uint64(0); i < sb.nfilters; i++ {
var lb link
bm := &lb.inner
bm.entries = readUnsigned(rd)
bm.err = readDouble(rd)
bm.hashes = readUnsigned(rd)
bm.bpe = readDouble(rd)
if o.encver == 0 {
bm.bits = uint64(float64(bm.entries) * bm.bpe)
} else {
bm.bits = readUnsigned(rd)
bm.n2 = readUnsigned(rd)
}
bm.bf = structure.ReadModuleString(rd)
lb.size = readUnsigned(rd)
sb.filters = append(sb.filters, lb)
}
o.sb = sb
structure.ReadModuleEof(rd)
return
}
func readUnsigned(rd io.Reader) uint64 {
v := structure.ReadModuleUnsigned(rd)
u, err := strconv.ParseUint(v, 10, 64)
if err != nil {
panic(err)
}
return u
}
func readDouble(rd io.Reader) float64 {
v := structure.ReadModuleDouble(rd)
f, err := strconv.ParseFloat(v, 64)
if err != nil {
panic(err)
}
return f
}
func (o *BloomObject) Rewrite() []RedisCmd {
var cs []RedisCmd
var h string
if o.encver < BF_MIN_GROWTH_ENC {
h = getEncodedHeaderV3(&o.sb)
} else {
h = getEncodedHeader(&o.sb)
}
cmd := RedisCmd{"BF.LOADCHUNK", o.key, "1", h}
cs = append(cs, cmd)
curIter := uint64(1)
for {
c := getEncodedChunk(&o.sb, &curIter, MAX_SCANDUMP_SIZE)
if c == "" {
break
}
cmd := RedisCmd{"BF.LOADCHUNK", o.key, strconv.FormatUint(curIter, 10), c}
cs = append(cs, cmd)
}
return cs
}
func getEncodedHeader(sb *chain) string {
h := make([]byte, DUMPED_CHAIN_LINK_SIZE*sb.nfilters+DUMPED_CHAIN_HEADER_SIZE)
ph := (*dumpedChainHeader)(unsafe.Pointer(&h[0]))
ph.size = sb.size
ph.nfilters = uint32(sb.nfilters)
ph.options = uint32(sb.options)
ph.growth = uint32(sb.growth)
for i := uint64(0); i < sb.nfilters; i++ {
pl := (*dumpedChainLink)(unsafe.Add(unsafe.Pointer(&h[0]), DUMPED_CHAIN_HEADER_SIZE+DUMPED_CHAIN_LINK_SIZE*i))
sl := sb.filters[i]
pl.bytes = uint64(len(sl.inner.bf))
pl.bits = sl.inner.bits
pl.size = sl.size
pl.err = sl.inner.err
pl.hashes = uint32(sl.inner.hashes)
pl.bpe = sl.inner.bpe
*(*uint64)(unsafe.Pointer(&pl.entries)) = sl.inner.entries
pl.n2 = uint8(sl.inner.n2)
}
return *(*string)(unsafe.Pointer(&h))
}
func getEncodedHeaderV3(sb *chain) string {
h := make([]byte, DUMPED_CHAIN_LINK_SIZE_V3*sb.nfilters+DUMPED_CHAIN_HEADER_SIZE_V3)
ph := (*dumpedChainHeaderV3)(unsafe.Pointer(&h[0]))
ph.size = sb.size
ph.nfilters = uint32(sb.nfilters)
ph.options = uint32(sb.options)
for i := uint64(0); i < sb.nfilters; i++ {
pl := (*dumpedChainLinkV3)(unsafe.Add(unsafe.Pointer(&h[0]), DUMPED_CHAIN_HEADER_SIZE_V3+DUMPED_CHAIN_LINK_SIZE_V3*i))
sl := sb.filters[i]
pl.bytes = uint64(len(sl.inner.bf))
pl.bits = sl.inner.bits
pl.size = sl.size
pl.err = sl.inner.err
pl.hashes = uint32(sl.inner.hashes)
pl.bpe = sl.inner.bpe
pl.entries = uint32(sl.inner.entries)
pl.n2 = uint8(sl.inner.n2)
}
return *(*string)(unsafe.Pointer(&h))
}
func getEncodedChunk(sb *chain, curIter *uint64, maxChunkSize uint64) string {
pl, off := getLinkPos(sb, *curIter)
if pl == nil {
*curIter = 0
return ""
}
l := maxChunkSize
lr := uint64(len(pl.inner.bf)) - off
if lr < l {
l = lr
}
*curIter += l
return pl.inner.bf[off : off+l]
}
func getLinkPos(sb *chain, curIter uint64) (pl *link, offset uint64) {
curIter--
var seekPos uint64
for i := uint64(0); i < sb.nfilters; i++ {
if seekPos+uint64(len(sb.filters[i].inner.bf)) > curIter {
pl = &sb.filters[i]
break
} else {
seekPos += uint64(len(sb.filters[i].inner.bf))
}
}
if pl == nil {
return
}
offset = curIter - seekPos
return
}

@ -30,6 +30,11 @@ func PareseModuleType(rd io.Reader, key string, typeByte byte) ModuleObject {
o := new(TairZsetObject)
o.LoadFromBuffer(rd, key, typeByte)
return o
case "MBbloom--":
o := new(BloomObject)
o.encver = int(moduleId & 1023)
o.LoadFromBuffer(rd, key, typeByte)
return o
default:
log.Panicf("unsupported module type: %s", moduleName)
return nil

Loading…
Cancel
Save