You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
155 lines
3.8 KiB
155 lines
3.8 KiB
package aof
|
|
|
|
import (
|
|
"bufio"
|
|
"io"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"RedisShake/internal/entry"
|
|
"RedisShake/internal/log"
|
|
)
|
|
|
|
const (
|
|
AOFNotExist = 1
|
|
AOFOpenErr = 3
|
|
AOFOK = 0
|
|
AOFEmpty = 2
|
|
AOFFailed = 4
|
|
AOFTruncated = 5
|
|
SizeMax = 128
|
|
)
|
|
|
|
type Loader struct {
|
|
filPath string
|
|
ch chan *entry.Entry
|
|
}
|
|
|
|
func NewLoader(filPath string, ch chan *entry.Entry) *Loader {
|
|
ld := new(Loader)
|
|
ld.ch = ch
|
|
ld.filPath = filPath
|
|
return ld
|
|
}
|
|
|
|
func ReadCompleteLine(reader *bufio.Reader) ([]byte, error) {
|
|
line, isPrefix, err := reader.ReadLine()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for isPrefix {
|
|
var additional []byte
|
|
additional, isPrefix, err = reader.ReadLine()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
line = append(line, additional...)
|
|
}
|
|
|
|
return line, err
|
|
}
|
|
|
|
func (ld *Loader) LoadSingleAppendOnlyFile(AOFTimeStamp int64) int {
|
|
ret := AOFOK
|
|
AOFFilepath := ld.filPath
|
|
fp, err := os.Open(AOFFilepath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
if _, err := os.Stat(AOFFilepath); err == nil || !os.IsNotExist(err) {
|
|
log.Infof("Fatal error: can't open the append log File %v for reading: %v", AOFFilepath, err.Error())
|
|
return AOFOpenErr
|
|
} else {
|
|
log.Infof("The append log File %v doesn't exist: %v", AOFFilepath, err.Error())
|
|
return AOFNotExist
|
|
}
|
|
|
|
}
|
|
defer fp.Close()
|
|
|
|
stat, _ := fp.Stat()
|
|
if stat.Size() == 0 {
|
|
return AOFEmpty
|
|
}
|
|
}
|
|
reader := bufio.NewReader(fp)
|
|
for {
|
|
|
|
line, err := ReadCompleteLine(reader)
|
|
{
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
} else {
|
|
log.Infof("Unrecoverable error reading the append only File %v: %v", AOFFilepath, err)
|
|
ret = AOFFailed
|
|
return ret
|
|
}
|
|
} else {
|
|
_, errs := fp.Seek(0, io.SeekCurrent)
|
|
if errs != nil {
|
|
log.Infof("Unrecoverable error reading the append only File %v: %v", AOFFilepath, errs)
|
|
ret = AOFFailed
|
|
return ret
|
|
}
|
|
}
|
|
|
|
if line[0] == '#' {
|
|
if AOFTimeStamp != 0 && strings.HasPrefix(string(line), "#TS:") {
|
|
var ts int64
|
|
ts, err = strconv.ParseInt(strings.TrimPrefix(string(line), "#TS:"), 10, 64)
|
|
if err != nil {
|
|
log.Panicf("Invalid timestamp annotation")
|
|
}
|
|
|
|
if ts > AOFTimeStamp {
|
|
ret = AOFTruncated
|
|
log.Infof("Reached recovery timestamp: %s, subsequent data will no longer be read.", line)
|
|
return ret
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
if line[0] != '*' {
|
|
log.Panicf("Bad File format reading the append only File %v:make a backup of your AOF File, then use ./redis-check-AOF --fix <FileName.manifest>", AOFFilepath)
|
|
}
|
|
argc, _ := strconv.ParseInt(string(line[1:]), 10, 64)
|
|
if argc < 1 {
|
|
log.Panicf("Bad File format reading the append only File %v:make a backup of your AOF File, then use ./redis-check-AOF --fix <FileName.manifest>", AOFFilepath)
|
|
}
|
|
if argc > int64(SizeMax) {
|
|
log.Panicf("Bad File format reading the append only File %v:make a backup of your AOF File, then use ./redis-check-AOF --fix <FileName.manifest>", AOFFilepath)
|
|
}
|
|
e := entry.NewEntry()
|
|
var argv []string
|
|
|
|
for j := 0; j < int(argc); j++ {
|
|
line, err := ReadCompleteLine(reader)
|
|
if err != nil || line[0] != '$' {
|
|
log.Infof("Bad File format reading the append only File %v:make a backup of your AOF File, then use ./redis-check-AOF --fix <FileName.manifest>", AOFFilepath)
|
|
ret = AOFFailed
|
|
return ret
|
|
}
|
|
v64, _ := strconv.ParseInt(string(line[1:]), 10, 64)
|
|
argString := make([]byte, v64+2)
|
|
argString, err = ReadCompleteLine(reader)
|
|
if err != nil {
|
|
log.Infof("Unrecoverable error reading the append only File %v: %v", AOFFilepath, err)
|
|
ret = AOFFailed
|
|
return ret
|
|
}
|
|
argString = argString[:v64]
|
|
argv = append(argv, string(argString))
|
|
}
|
|
|
|
for _, value := range argv {
|
|
e.Argv = append(e.Argv, value)
|
|
}
|
|
ld.ch <- e
|
|
|
|
}
|
|
|
|
}
|
|
return ret
|
|
}
|
|
|