From 2f548e11acbdb8116cb62988b9b4660e4c39723a Mon Sep 17 00:00:00 2001 From: suxb201 Date: Wed, 31 May 2023 11:48:14 +0800 Subject: [PATCH] bugfix: handle the reply of select command --- internal/writer/redis.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal/writer/redis.go b/internal/writer/redis.go index 996f835..c350245 100644 --- a/internal/writer/redis.go +++ b/internal/writer/redis.go @@ -9,6 +9,7 @@ import ( "github.com/alibaba/RedisShake/internal/log" "github.com/alibaba/RedisShake/internal/statistics" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -57,6 +58,10 @@ func (w *redisWriter) Write(e *entry.Entry) { func (w *redisWriter) switchDbTo(newDbId int) { w.client.Send("select", strconv.Itoa(newDbId)) w.DbId = newDbId + w.chWaitReply <- &entry.Entry{ + Argv: []string{"select", strconv.Itoa(newDbId)}, + CmdName: "select", + } } func (w *redisWriter) flushInterval() { @@ -75,6 +80,9 @@ func (w *redisWriter) flushInterval() { log.Panicf("redisWriter received error. error=[%v], argv=%v, slots=%v, reply=[%v]", err, e.Argv, e.Slots, reply) } } + if strings.EqualFold(e.CmdName, "select") { // skip select command + continue + } atomic.AddUint64(&w.UpdateUnansweredBytesCount, ^(e.EncodedSize - 1)) statistics.UpdateAOFAppliedOffset(uint64(e.Offset)) statistics.UpdateUnansweredBytesCount(atomic.LoadUint64(&w.UpdateUnansweredBytesCount))