From 145fcf336b0e5ea83b8c979be7143fbea148faa4 Mon Sep 17 00:00:00 2001 From: Zheng Dayu Date: Mon, 17 Jun 2019 18:56:33 +0800 Subject: [PATCH] support filter by key in restore and rump mode --- src/redis-shake/restore.go | 6 ++++-- src/redis-shake/rump.go | 19 ++++++++++------- src/redis-shake/sync.go | 11 ++++------ src/redis-shake/utils.go | 13 ++++++++++++ src/redis-shake/utils_test.go | 40 +++++++++++++++++++++++++++++++++++ 5 files changed, 72 insertions(+), 17 deletions(-) create mode 100644 src/redis-shake/utils.go create mode 100644 src/redis-shake/utils_test.go diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index e9c3b7f..5eb39a7 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -137,9 +137,8 @@ func (dr *dbRestorer) restore() { } } - func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64, - tlsEnable bool) { + tlsEnable bool) { pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize) wait := make(chan struct{}) go func() { @@ -168,6 +167,9 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth utils.SelectDB(c, lastdb) } } + if len(conf.Options.FilterKey) != 0 && !hasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) { + continue + } utils.RestoreRdbEntry(c, e) } } diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 5caf0cf..4fb6679 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -1,18 +1,18 @@ package run import ( - "strconv" - "sync" "fmt" - "reflect" "math" + "reflect" + "strconv" + "sync" - "pkg/libs/log" "pkg/libs/atomic2" + "pkg/libs/log" "redis-shake/common" "redis-shake/configure" - "redis-shake/scanner" "redis-shake/metric" + "redis-shake/scanner" "github.com/garyburd/redigo/redis" ) @@ -33,7 +33,7 @@ func (cr *CmdRump) GetDetailedInfo() interface{} { // TODO, better to move to the next level metric.AddMetric(0) - return []map[string]interface{} { + return []map[string]interface{}{ { "Details": ret, }, @@ -94,7 +94,7 @@ func (dr *dbRumper) getStats() map[string]interface{} { func (dr *dbRumper) run() { // single connection dr.client = utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType, - conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable) + conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable) // some clouds may have several db under proxy count, err := dr.getNode() @@ -320,6 +320,9 @@ func (dre *dbRumperExecutor) writer() { bucket := utils.StartQoS(conf.Options.Qps) preDb := 0 for ele := range dre.keyChan { + if len(conf.Options.FilterKey) != 0 && !hasAtLeastOnePrefix(ele.key, conf.Options.FilterKey) { + continue + } // QoS, limit the qps <-bucket @@ -495,4 +498,4 @@ func (dre *dbRumperExecutor) doFetch(db int) error { log.Infof("dbRumper[%v] executor[%v] finish fetching db[%v]", dre.rumperId, dre.executorId, db) return nil -} \ No newline at end of file +} diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 3a95b06..78aac48 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -426,11 +426,8 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type } if len(conf.Options.FilterKey) != 0 { - for i := 0; i < len(conf.Options.FilterKey); i++ { - if strings.HasPrefix(string(e.Key), conf.Options.FilterKey[i]) { - utils.RestoreRdbEntry(c, e) - break - } + if hasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) { + utils.RestoreRdbEntry(c, e) } } else if len(conf.Options.FilterSlot) > 0 { for _, slot := range conf.Options.FilterSlot { @@ -473,8 +470,8 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type } func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type, passwd string, tlsEnable bool) { - readeTimeout := time.Duration(10)*time.Minute - writeTimeout := time.Duration(10)*time.Minute + readeTimeout := time.Duration(10) * time.Minute + writeTimeout := time.Duration(10) * time.Minute isCluster := conf.Options.TargetType == conf.RedisTypeCluster c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, readeTimeout, writeTimeout, isCluster, tlsEnable) defer c.Close() diff --git a/src/redis-shake/utils.go b/src/redis-shake/utils.go new file mode 100644 index 0000000..0f21279 --- /dev/null +++ b/src/redis-shake/utils.go @@ -0,0 +1,13 @@ +package run + +import "strings" + +// hasAtLeastOnePrefix checks whether the key has begins with at least one of prefixes. +func hasAtLeastOnePrefix(key string, prefixes []string) bool { + for _, prefix := range prefixes { + if strings.HasPrefix(key, prefix) { + return true + } + } + return false +} diff --git a/src/redis-shake/utils_test.go b/src/redis-shake/utils_test.go new file mode 100644 index 0000000..77a3353 --- /dev/null +++ b/src/redis-shake/utils_test.go @@ -0,0 +1,40 @@ +package run + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHasAtLeastOnePrefix(t *testing.T) { + + cases := []struct { + key string + prefixes []string + expectResult bool + }{ + { + // no prefix provided + "a", + []string{}, + false, + }, + { + // has prefix + "abc", + []string{"ab"}, + true, + }, + { + // does NOT have prefix + "abc", + []string{"edf", "wab"}, + false, + }, + } + + for _, c := range cases { + result := hasAtLeastOnePrefix(c.key, c.prefixes) + assert.Equal(t, c.expectResult, result) + } +}