From 2b966397ed2880e629184643054870acfaa531f5 Mon Sep 17 00:00:00 2001 From: vinllen Date: Fri, 9 Aug 2019 17:20:33 +0800 Subject: [PATCH] add limit judge when target type is cluster --- ChangeLog | 8 ++++++++ src/redis-shake/common/split.go | 9 ++++++--- src/redis-shake/main/main.go | 4 ++++ src/redis-shake/rump.go | 4 ++-- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/ChangeLog b/ChangeLog index 82e13b6..ca1fd63 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +2019-08-09 Alibaba Cloud. + * VERSION: 1.6.15 + * IMPROVE: add `target.version` to support some proxy like twemproxy. + * BUGFIX: filter `select` command in `rump` when only db0 supports in some + redis version. + * IMPROVE: remove the cluster limit when target type is rump. + * IMPROVE: add `scan.key_number` limit judge when target type is cluster + in type `rump`. see #136. 2019-08-01 Alibaba Cloud. * VERSION: 1.6.14 * BUGFIX: the `rdb.parallel` parameter limits concurrency without effect. diff --git a/src/redis-shake/common/split.go b/src/redis-shake/common/split.go index bbf53b2..10ffdd6 100644 --- a/src/redis-shake/common/split.go +++ b/src/redis-shake/common/split.go @@ -8,9 +8,12 @@ import ( redigo "github.com/garyburd/redigo/redis" ) -func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int) { - if _, err := client.Do("select", db); err != nil { - log.Panicf("send select db[%v] failed[%v]", db, err) +func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int, preDb *int) { + if db != *preDb { + if _, err := client.Do("select", db); err != nil { + log.Panicf("send select db[%v] failed[%v]", db, err) + } + *preDb = db } entry := rdb.BinEntry{ diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index d0a3824..3ee087c 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -448,6 +448,10 @@ func sanitizeOptions(tp string) error { conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) } + if conf.Options.ScanKeyNumber > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster { + return fmt.Errorf("scan.key_number should less than [%v] when target type is cluster", utils.RecvChanSize) + } + //if len(conf.Options.SourceAddressList) == 1 { // return fmt.Errorf("source address length should == 1 when type is 'rump'") //} diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 87feac8..aaf1b25 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -327,6 +327,7 @@ func (dre *dbRumperExecutor) writer() { // used in QoS bucket := utils.StartQoS(conf.Options.Qps) preDb := 0 + preBigKeyDb := 0 for ele := range dre.keyChan { if filter.FilterKey(ele.key) { continue @@ -354,8 +355,7 @@ func (dre *dbRumperExecutor) writer() { batch = dre.writeSend(batch, &count, &wBytes) // handle big key - utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db) - + utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db, &preBigKeyDb) // all the reply has been handled in RestoreBigkey // dre.resultChan <- ele continue