package rdb import ( "RedisShake/internal/config" "RedisShake/internal/entry" "RedisShake/internal/log" "RedisShake/internal/rdb/structure" "RedisShake/internal/rdb/types" "RedisShake/internal/utils" "bufio" "bytes" "encoding/binary" "io" "os" "strconv" "time" ) const ( kFlagFunction2 = 245 // function library data kFlagFunction = 246 // old function library data for 7.0 rc1 and rc2 kFlagModuleAux = 247 // Module auxiliary data. kFlagIdle = 0xf8 // LRU idle time. kFlagFreq = 0xf9 // LFU frequency. kFlagAUX = 0xfa // RDB aux field. kFlagResizeDB = 0xfb // Hash table resize hint. kFlagExpireMs = 0xfc // Expire time in milliseconds. kFlagExpire = 0xfd // Old expire time in seconds. kFlagSelect = 0xfe // DB number of the following keys. kEOF = 0xff // End of the RDB file. ) type Loader struct { replStreamDbId int // https://RedisShake/pull/430#issuecomment-1099014464 nowDBId int expireMs int64 idle int64 freq int64 filPath string fp *os.File ch chan *entry.Entry dumpBuffer bytes.Buffer name string updateFunc func(int64) } func NewLoader(name string, updateFunc func(int64), filPath string, ch chan *entry.Entry) *Loader { ld := new(Loader) ld.ch = ch ld.filPath = filPath ld.name = name ld.updateFunc = updateFunc return ld } // ParseRDB parse rdb file // return repl stream db id func (ld *Loader) ParseRDB() int { var err error ld.fp, err = os.OpenFile(ld.filPath, os.O_RDONLY, 0666) if err != nil { log.Panicf("open file failed. file_path=[%s], error=[%s]", ld.filPath, err) } defer func() { err = ld.fp.Close() if err != nil { log.Panicf("close file failed. file_path=[%s], error=[%s]", ld.filPath, err) } }() rd := bufio.NewReader(ld.fp) // magic + version buf := make([]byte, 9) _, err = io.ReadFull(rd, buf) if err != nil { log.Panicf(err.Error()) } if !bytes.Equal(buf[:5], []byte("REDIS")) { log.Panicf("verify magic string, invalid file format. bytes=[%v]", buf[:5]) } version, err := strconv.Atoi(string(buf[5:])) if err != nil { log.Panicf(err.Error()) } log.Debugf("[%s] RDB version: %d", ld.name, version) // read entries ld.parseRDBEntry(rd) return ld.replStreamDbId } func (ld *Loader) parseRDBEntry(rd *bufio.Reader) { // for stat updateProcessSize := func() { if ld.updateFunc == nil { return } offset, err := ld.fp.Seek(0, io.SeekCurrent) if err != nil { log.Panicf(err.Error()) } ld.updateFunc(offset) } defer updateProcessSize() // read one entry tick := time.Tick(time.Second * 1) for true { typeByte := structure.ReadByte(rd) switch typeByte { case kFlagIdle: ld.idle = int64(structure.ReadLength(rd)) case kFlagFreq: ld.freq = int64(structure.ReadByte(rd)) case kFlagAUX: key := structure.ReadString(rd) value := structure.ReadString(rd) if key == "repl-stream-db" { var err error ld.replStreamDbId, err = strconv.Atoi(value) if err != nil { log.Panicf(err.Error()) } log.Debugf("[%s] RDB repl-stream-db: [%s]", ld.name, value) } else if key == "lua" { e := entry.NewEntry() e.Argv = []string{"script", "load", value} ld.ch <- e log.Debugf("[%s] LUA script: [%s]", ld.name, value) } else { log.Debugf("[%s] RDB AUX: key=[%s], value=[%s]", ld.name, key, value) } case kFlagResizeDB: dbSize := structure.ReadLength(rd) expireSize := structure.ReadLength(rd) log.Debugf("[%s] RDB resize db: db_size=[%d], expire_size=[%d]", ld.name, dbSize, expireSize) case kFlagExpireMs: ld.expireMs = int64(structure.ReadUint64(rd)) - time.Now().UnixMilli() if ld.expireMs < 0 { ld.expireMs = 1 } case kFlagExpire: ld.expireMs = int64(structure.ReadUint32(rd))*1000 - time.Now().UnixMilli() if ld.expireMs < 0 { ld.expireMs = 1 } case kFlagSelect: ld.nowDBId = int(structure.ReadLength(rd)) case kEOF: return default: key := structure.ReadString(rd) var value bytes.Buffer anotherReader := io.TeeReader(rd, &value) o := types.ParseObject(anotherReader, typeByte, key) if uint64(value.Len()) > config.Opt.Advanced.TargetRedisProtoMaxBulkLen { cmds := o.Rewrite() for _, cmd := range cmds { e := entry.NewEntry() e.DbId = ld.nowDBId e.Argv = cmd ld.ch <- e } if ld.expireMs != 0 { e := entry.NewEntry() e.DbId = ld.nowDBId e.Argv = []string{"PEXPIRE", key, strconv.FormatInt(ld.expireMs, 10)} ld.ch <- e } } else { e := entry.NewEntry() e.DbId = ld.nowDBId v := ld.createValueDump(typeByte, value.Bytes()) e.Argv = []string{"restore", key, strconv.FormatInt(ld.expireMs, 10), v} if config.Opt.Advanced.RDBRestoreCommandBehavior == "rewrite" { //if config.Opt.Target.Version < 3.0 { // log.Panicf("RDB restore command behavior is rewrite, but target redis version is %f, not support REPLACE modifier", config.Config.Target.Version) //} e.Argv = append(e.Argv, "replace") } //if ld.idle != 0 && config.Config.Target.Version >= 5.0 { // e.Argv = append(e.Argv, "idletime", strconv.FormatInt(ld.idle, 10)) //} //if ld.freq != 0 && config.Config.Target.Version >= 5.0 { // e.Argv = append(e.Argv, "freq", strconv.FormatInt(ld.freq, 10)) //} ld.ch <- e } ld.expireMs = 0 ld.idle = 0 ld.freq = 0 } select { case <-tick: updateProcessSize() default: } } } func (ld *Loader) createValueDump(typeByte byte, val []byte) string { ld.dumpBuffer.Reset() _, _ = ld.dumpBuffer.Write([]byte{typeByte}) _, _ = ld.dumpBuffer.Write(val) _ = binary.Write(&ld.dumpBuffer, binary.LittleEndian, uint16(6)) // calc crc sum64 := utils.CalcCRC64(ld.dumpBuffer.Bytes()) _ = binary.Write(&ld.dumpBuffer, binary.LittleEndian, sum64) return ld.dumpBuffer.String() }