From 24c160af6537664164d2c9e051c703b6f71b76c8 Mon Sep 17 00:00:00 2001 From: vinllen Date: Fri, 9 Aug 2019 20:14:11 +0800 Subject: [PATCH] bugfix: big key in rump all expired --- src/redis-shake/common/split.go | 9 ++++++--- src/redis-shake/common/utils.go | 36 ++++++++++++++++++++++++++++++++- src/redis-shake/rump.go | 4 ++-- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/redis-shake/common/split.go b/src/redis-shake/common/split.go index 10ffdd6..27011e9 100644 --- a/src/redis-shake/common/split.go +++ b/src/redis-shake/common/split.go @@ -10,6 +10,7 @@ import ( func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int, preDb *int) { if db != *preDb { + log.Infof("RestoreBigkey select db[%v]", db) if _, err := client.Do("select", db); err != nil { log.Panicf("send select db[%v] failed[%v]", db, err) } @@ -30,8 +31,10 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db restoreBigRdbEntry(client, &entry) - // pttl - if _, err := client.Do("pexpire", key, pttl); err != nil { - log.Panicf("send key[%v] pexpire failed[%v]", key, err) + if pttl > 0 { + // pttl + if _, err := client.Do("pexpire", key, pttl); err != nil { + log.Panicf("send key[%v] pexpire failed[%v]", key, err) + } } } diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 779a205..b429299 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -678,8 +678,42 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) { } } log.Info("complete restore big hash key: ", string(e.Key), " field:", n) + case rdb.RdbTypeQuicklist: + if n, err := r.ReadLength(); err != nil { + log.PanicError(err, "read rdb ") + } else { + log.Info("quicklist item size: ", int(n)) + for i := 0; i < int(n); i++ { + ziplist, err := r.ReadString() + log.Info("zipList: ", ziplist) + if err != nil { + log.PanicError(err, "read rdb ") + } + buf := rdb.NewSliceBuffer(ziplist) + if zln, err := r.ReadZiplistLength(buf); err != nil { + log.PanicError(err, "read rdb") + } else { + log.Info("ziplist one of quicklist, size: ", int(zln)) + for i := int64(0); i < zln; i++ { + entry, err := r.ReadZiplistEntry(buf) + if err != nil { + log.PanicError(err, "read rdb ") + } + log.Info("rpush key: ", e.Key, " value: ", entry) + count++ + c.Send("RPUSH", e.Key, entry) + if count == 100 { + flushAndCheckReply(c, count) + count = 0 + } + } + flushAndCheckReply(c, count) + count = 0 + } + } + } default: - log.PanicError(fmt.Errorf("cann't deal rdb type:%d", t), "restore big key fail") + log.PanicError(fmt.Errorf("can't deal rdb type:%d", t), "restore big key fail") } } diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index aaf1b25..247ec64 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -349,8 +349,8 @@ func (dre *dbRumperExecutor) writer() { log.Debugf("dbRumper[%v] executor[%v] restore[%s], length[%v]", dre.rumperId, dre.executorId, ele.key, len(ele.value)) if uint64(len(ele.value)) >= conf.Options.BigKeyThreshold { - log.Infof("dbRumper[%v] executor[%v] restore big key[%v] with length[%v]", dre.rumperId, - dre.executorId, ele.key, len(ele.value)) + log.Infof("dbRumper[%v] executor[%v] restore big key[%v] with length[%v], pttl[%v], db[%v]", + dre.rumperId, dre.executorId, ele.key, len(ele.value), ele.pttl, ele.db) // flush previous cache batch = dre.writeSend(batch, &count, &wBytes)