add limit judge when target type is cluster

v4
vinllen 5 years ago
parent 71f7d7c506
commit 2b966397ed
  1. 8
      ChangeLog
  2. 5
      src/redis-shake/common/split.go
  3. 4
      src/redis-shake/main/main.go
  4. 4
      src/redis-shake/rump.go

@ -1,3 +1,11 @@
2019-08-09 Alibaba Cloud.
* VERSION: 1.6.15
* IMPROVE: add `target.version` to support some proxy like twemproxy.
* BUGFIX: filter `select` command in `rump` when only db0 supports in some
redis version.
* IMPROVE: remove the cluster limit when target type is rump.
* IMPROVE: add `scan.key_number` limit judge when target type is cluster
in type `rump`. see #136.
2019-08-01 Alibaba Cloud. 2019-08-01 Alibaba Cloud.
* VERSION: 1.6.14 * VERSION: 1.6.14
* BUGFIX: the `rdb.parallel` parameter limits concurrency without effect. * BUGFIX: the `rdb.parallel` parameter limits concurrency without effect.

@ -8,10 +8,13 @@ import (
redigo "github.com/garyburd/redigo/redis" redigo "github.com/garyburd/redigo/redis"
) )
func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int) { func RestoreBigkey(client redigo.Conn, key string, value string, pttl int64, db int, preDb *int) {
if db != *preDb {
if _, err := client.Do("select", db); err != nil { if _, err := client.Do("select", db); err != nil {
log.Panicf("send select db[%v] failed[%v]", db, err) log.Panicf("send select db[%v] failed[%v]", db, err)
} }
*preDb = db
}
entry := rdb.BinEntry{ entry := rdb.BinEntry{
DB: uint32(db), DB: uint32(db),

@ -448,6 +448,10 @@ func sanitizeOptions(tp string) error {
conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile) conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile)
} }
if conf.Options.ScanKeyNumber > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster {
return fmt.Errorf("scan.key_number should less than [%v] when target type is cluster", utils.RecvChanSize)
}
//if len(conf.Options.SourceAddressList) == 1 { //if len(conf.Options.SourceAddressList) == 1 {
// return fmt.Errorf("source address length should == 1 when type is 'rump'") // return fmt.Errorf("source address length should == 1 when type is 'rump'")
//} //}

@ -327,6 +327,7 @@ func (dre *dbRumperExecutor) writer() {
// used in QoS // used in QoS
bucket := utils.StartQoS(conf.Options.Qps) bucket := utils.StartQoS(conf.Options.Qps)
preDb := 0 preDb := 0
preBigKeyDb := 0
for ele := range dre.keyChan { for ele := range dre.keyChan {
if filter.FilterKey(ele.key) { if filter.FilterKey(ele.key) {
continue continue
@ -354,8 +355,7 @@ func (dre *dbRumperExecutor) writer() {
batch = dre.writeSend(batch, &count, &wBytes) batch = dre.writeSend(batch, &count, &wBytes)
// handle big key // handle big key
utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db) utils.RestoreBigkey(dre.targetBigKeyClient, ele.key, ele.value, ele.pttl, ele.db, &preBigKeyDb)
// all the reply has been handled in RestoreBigkey // all the reply has been handled in RestoreBigkey
// dre.resultChan <- ele // dre.resultChan <- ele
continue continue

Loading…
Cancel
Save