redis-shake工具
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

214 lines
5.7 KiB

package rdb
import (
"RedisShake/internal/config"
"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/rdb/structure"
"RedisShake/internal/rdb/types"
"RedisShake/internal/utils"
"bufio"
"bytes"
"encoding/binary"
"io"
"os"
"strconv"
"time"
)
const (
kFlagFunction2 = 245 // function library data
kFlagFunction = 246 // old function library data for 7.0 rc1 and rc2
kFlagModuleAux = 247 // Module auxiliary data.
kFlagIdle = 0xf8 // LRU idle time.
kFlagFreq = 0xf9 // LFU frequency.
kFlagAUX = 0xfa // RDB aux field.
kFlagResizeDB = 0xfb // Hash table resize hint.
kFlagExpireMs = 0xfc // Expire time in milliseconds.
kFlagExpire = 0xfd // Old expire time in seconds.
kFlagSelect = 0xfe // DB number of the following keys.
kEOF = 0xff // End of the RDB file.
)
type Loader struct {
replStreamDbId int // https://RedisShake/pull/430#issuecomment-1099014464
nowDBId int
expireMs int64
idle int64
freq int64
filPath string
fp *os.File
ch chan *entry.Entry
dumpBuffer bytes.Buffer
name string
updateFunc func(int64)
}
func NewLoader(name string, updateFunc func(int64), filPath string, ch chan *entry.Entry) *Loader {
ld := new(Loader)
ld.ch = ch
ld.filPath = filPath
ld.name = name
ld.updateFunc = updateFunc
return ld
}
// ParseRDB parse rdb file
// return repl stream db id
func (ld *Loader) ParseRDB() int {
var err error
ld.fp, err = os.OpenFile(ld.filPath, os.O_RDONLY, 0666)
if err != nil {
log.Panicf("open file failed. file_path=[%s], error=[%s]", ld.filPath, err)
}
defer func() {
err = ld.fp.Close()
if err != nil {
log.Panicf("close file failed. file_path=[%s], error=[%s]", ld.filPath, err)
}
}()
rd := bufio.NewReader(ld.fp)
// magic + version
buf := make([]byte, 9)
_, err = io.ReadFull(rd, buf)
if err != nil {
log.Panicf(err.Error())
}
if !bytes.Equal(buf[:5], []byte("REDIS")) {
log.Panicf("verify magic string, invalid file format. bytes=[%v]", buf[:5])
}
version, err := strconv.Atoi(string(buf[5:]))
if err != nil {
log.Panicf(err.Error())
}
log.Debugf("[%s] RDB version: %d", ld.name, version)
// read entries
ld.parseRDBEntry(rd)
return ld.replStreamDbId
}
func (ld *Loader) parseRDBEntry(rd *bufio.Reader) {
// for stat
updateProcessSize := func() {
if ld.updateFunc == nil {
return
}
offset, err := ld.fp.Seek(0, io.SeekCurrent)
if err != nil {
log.Panicf(err.Error())
}
ld.updateFunc(offset)
}
defer updateProcessSize()
// read one entry
tick := time.Tick(time.Second * 1)
for true {
typeByte := structure.ReadByte(rd)
switch typeByte {
case kFlagIdle:
ld.idle = int64(structure.ReadLength(rd))
case kFlagFreq:
ld.freq = int64(structure.ReadByte(rd))
case kFlagAUX:
key := structure.ReadString(rd)
value := structure.ReadString(rd)
if key == "repl-stream-db" {
var err error
ld.replStreamDbId, err = strconv.Atoi(value)
if err != nil {
log.Panicf(err.Error())
}
log.Debugf("[%s] RDB repl-stream-db: [%s]", ld.name, value)
} else if key == "lua" {
e := entry.NewEntry()
e.Argv = []string{"script", "load", value}
ld.ch <- e
log.Debugf("[%s] LUA script: [%s]", ld.name, value)
} else {
log.Debugf("[%s] RDB AUX: key=[%s], value=[%s]", ld.name, key, value)
}
case kFlagResizeDB:
dbSize := structure.ReadLength(rd)
expireSize := structure.ReadLength(rd)
log.Debugf("[%s] RDB resize db: db_size=[%d], expire_size=[%d]", ld.name, dbSize, expireSize)
case kFlagExpireMs:
ld.expireMs = int64(structure.ReadUint64(rd)) - time.Now().UnixMilli()
if ld.expireMs < 0 {
ld.expireMs = 1
}
case kFlagExpire:
ld.expireMs = int64(structure.ReadUint32(rd))*1000 - time.Now().UnixMilli()
if ld.expireMs < 0 {
ld.expireMs = 1
}
case kFlagSelect:
ld.nowDBId = int(structure.ReadLength(rd))
case kEOF:
return
default:
key := structure.ReadString(rd)
var value bytes.Buffer
anotherReader := io.TeeReader(rd, &value)
o := types.ParseObject(anotherReader, typeByte, key)
if uint64(value.Len()) > config.Opt.Advanced.TargetRedisProtoMaxBulkLen {
cmds := o.Rewrite()
for _, cmd := range cmds {
e := entry.NewEntry()
e.DbId = ld.nowDBId
e.Argv = cmd
ld.ch <- e
}
if ld.expireMs != 0 {
e := entry.NewEntry()
e.DbId = ld.nowDBId
e.Argv = []string{"PEXPIRE", key, strconv.FormatInt(ld.expireMs, 10)}
ld.ch <- e
}
} else {
e := entry.NewEntry()
e.DbId = ld.nowDBId
v := ld.createValueDump(typeByte, value.Bytes())
e.Argv = []string{"restore", key, strconv.FormatInt(ld.expireMs, 10), v}
if config.Opt.Advanced.RDBRestoreCommandBehavior == "rewrite" {
//if config.Opt.Target.Version < 3.0 {
// log.Panicf("RDB restore command behavior is rewrite, but target redis version is %f, not support REPLACE modifier", config.Config.Target.Version)
//}
e.Argv = append(e.Argv, "replace")
}
//if ld.idle != 0 && config.Config.Target.Version >= 5.0 {
// e.Argv = append(e.Argv, "idletime", strconv.FormatInt(ld.idle, 10))
//}
//if ld.freq != 0 && config.Config.Target.Version >= 5.0 {
// e.Argv = append(e.Argv, "freq", strconv.FormatInt(ld.freq, 10))
//}
ld.ch <- e
}
ld.expireMs = 0
ld.idle = 0
ld.freq = 0
}
select {
case <-tick:
updateProcessSize()
default:
}
}
}
func (ld *Loader) createValueDump(typeByte byte, val []byte) string {
ld.dumpBuffer.Reset()
_, _ = ld.dumpBuffer.Write([]byte{typeByte})
_, _ = ld.dumpBuffer.Write(val)
_ = binary.Write(&ld.dumpBuffer, binary.LittleEndian, uint16(6))
// calc crc
sum64 := utils.CalcCRC64(ld.dumpBuffer.Bytes())
_ = binary.Write(&ld.dumpBuffer, binary.LittleEndian, sum64)
return ld.dumpBuffer.String()
}