Merge pull request #143 from alibaba/feature-1.6

Feature 1.6
v4
Vinllen Chen 5 years ago committed by GitHub
commit a1c65269a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      ChangeLog
  2. 3
      src/redis-shake/common/split.go
  3. 36
      src/redis-shake/common/utils.go
  4. 4
      src/redis-shake/rump.go

@ -1,3 +1,7 @@
2019-08-09 Alibaba Cloud.
* VERSION: 1.6.16
* BUGFIX: big key in `rump` mode all expired.
* BUGFIX: `rump` mode restores quick list failed. see #141.
2019-08-09 Alibaba Cloud. 2019-08-09 Alibaba Cloud.
* VERSION: 1.6.15 * VERSION: 1.6.15
* IMPROVE: add `target.version` to support some proxy like twemproxy. * IMPROVE: add `target.version` to support some proxy like twemproxy.

@ -10,6 +10,7 @@ import (
func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int, preDb *int) { func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int, preDb *int) {
if db != *preDb { if db != *preDb {
log.Infof("RestoreBigkey select db[%v]", db)
if _, err := client.Do("select", db); err != nil { if _, err := client.Do("select", db); err != nil {
log.Panicf("send select db[%v] failed[%v]", db, err) log.Panicf("send select db[%v] failed[%v]", db, err)
} }
@ -30,8 +31,10 @@ func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db
restoreBigRdbEntry(client, &entry) restoreBigRdbEntry(client, &entry)
if pttl > 0 {
// pttl // pttl
if _, err := client.Do("pexpire", key, pttl); err != nil { if _, err := client.Do("pexpire", key, pttl); err != nil {
log.Panicf("send key[%v] pexpire failed[%v]", key, err) log.Panicf("send key[%v] pexpire failed[%v]", key, err)
} }
} }
}

@ -678,8 +678,42 @@ func restoreBigRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
} }
} }
log.Info("complete restore big hash key: ", string(e.Key), " field:", n) log.Info("complete restore big hash key: ", string(e.Key), " field:", n)
case rdb.RdbTypeQuicklist:
if n, err := r.ReadLength(); err != nil {
log.PanicError(err, "read rdb ")
} else {
log.Info("quicklist item size: ", int(n))
for i := 0; i < int(n); i++ {
ziplist, err := r.ReadString()
log.Info("zipList: ", ziplist)
if err != nil {
log.PanicError(err, "read rdb ")
}
buf := rdb.NewSliceBuffer(ziplist)
if zln, err := r.ReadZiplistLength(buf); err != nil {
log.PanicError(err, "read rdb")
} else {
log.Info("ziplist one of quicklist, size: ", int(zln))
for i := int64(0); i < zln; i++ {
entry, err := r.ReadZiplistEntry(buf)
if err != nil {
log.PanicError(err, "read rdb ")
}
log.Info("rpush key: ", e.Key, " value: ", entry)
count++
c.Send("RPUSH", e.Key, entry)
if count == 100 {
flushAndCheckReply(c, count)
count = 0
}
}
flushAndCheckReply(c, count)
count = 0
}
}
}
default: default:
log.PanicError(fmt.Errorf("cann't deal rdb type:%d", t), "restore big key fail") log.PanicError(fmt.Errorf("can't deal rdb type:%d", t), "restore big key fail")
} }
} }

@ -349,8 +349,8 @@ func (dre *dbRumperExecutor) writer() {
log.Debugf("dbRumper[%v] executor[%v] restore[%s], length[%v]", dre.rumperId, dre.executorId, ele.key, log.Debugf("dbRumper[%v] executor[%v] restore[%s], length[%v]", dre.rumperId, dre.executorId, ele.key,
len(ele.value)) len(ele.value))
if uint64(len(ele.value)) >= conf.Options.BigKeyThreshold { if uint64(len(ele.value)) >= conf.Options.BigKeyThreshold {
log.Infof("dbRumper[%v] executor[%v] restore big key[%v] with length[%v]", dre.rumperId, log.Infof("dbRumper[%v] executor[%v] restore big key[%v] with length[%v], pttl[%v], db[%v]",
dre.executorId, ele.key, len(ele.value)) dre.rumperId, dre.executorId, ele.key, len(ele.value), ele.pttl, ele.db)
// flush previous cache // flush previous cache
batch = dre.writeSend(batch, &count, &wBytes) batch = dre.writeSend(batch, &count, &wBytes)

Loading…
Cancel
Save