|
|
|
@ -9,6 +9,7 @@ import ( |
|
|
|
|
"github.com/alibaba/RedisShake/internal/log" |
|
|
|
|
"github.com/alibaba/RedisShake/internal/statistics" |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
"sync" |
|
|
|
|
"sync/atomic" |
|
|
|
|
"time" |
|
|
|
@ -57,6 +58,10 @@ func (w *redisWriter) Write(e *entry.Entry) { |
|
|
|
|
func (w *redisWriter) switchDbTo(newDbId int) { |
|
|
|
|
w.client.Send("select", strconv.Itoa(newDbId)) |
|
|
|
|
w.DbId = newDbId |
|
|
|
|
w.chWaitReply <- &entry.Entry{ |
|
|
|
|
Argv: []string{"select", strconv.Itoa(newDbId)}, |
|
|
|
|
CmdName: "select", |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *redisWriter) flushInterval() { |
|
|
|
@ -75,6 +80,9 @@ func (w *redisWriter) flushInterval() { |
|
|
|
|
log.Panicf("redisWriter received error. error=[%v], argv=%v, slots=%v, reply=[%v]", err, e.Argv, e.Slots, reply) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if strings.EqualFold(e.CmdName, "select") { // skip select command
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
atomic.AddUint64(&w.UpdateUnansweredBytesCount, ^(e.EncodedSize - 1)) |
|
|
|
|
statistics.UpdateAOFAppliedOffset(uint64(e.Offset)) |
|
|
|
|
statistics.UpdateUnansweredBytesCount(atomic.LoadUint64(&w.UpdateUnansweredBytesCount)) |
|
|
|
|