From 145fcf336b0e5ea83b8c979be7143fbea148faa4 Mon Sep 17 00:00:00 2001 From: Zheng Dayu Date: Mon, 17 Jun 2019 18:56:33 +0800 Subject: [PATCH 1/3] support filter by key in restore and rump mode --- src/redis-shake/restore.go | 6 ++++-- src/redis-shake/rump.go | 19 ++++++++++------- src/redis-shake/sync.go | 11 ++++------ src/redis-shake/utils.go | 13 ++++++++++++ src/redis-shake/utils_test.go | 40 +++++++++++++++++++++++++++++++++++ 5 files changed, 72 insertions(+), 17 deletions(-) create mode 100644 src/redis-shake/utils.go create mode 100644 src/redis-shake/utils_test.go diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index e9c3b7f..5eb39a7 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -137,9 +137,8 @@ func (dr *dbRestorer) restore() { } } - func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64, - tlsEnable bool) { + tlsEnable bool) { pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize) wait := make(chan struct{}) go func() { @@ -168,6 +167,9 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth utils.SelectDB(c, lastdb) } } + if len(conf.Options.FilterKey) != 0 && !hasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) { + continue + } utils.RestoreRdbEntry(c, e) } } diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 5caf0cf..4fb6679 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -1,18 +1,18 @@ package run import ( - "strconv" - "sync" "fmt" - "reflect" "math" + "reflect" + "strconv" + "sync" - "pkg/libs/log" "pkg/libs/atomic2" + "pkg/libs/log" "redis-shake/common" "redis-shake/configure" - "redis-shake/scanner" "redis-shake/metric" + "redis-shake/scanner" "github.com/garyburd/redigo/redis" ) @@ -33,7 +33,7 @@ func (cr *CmdRump) GetDetailedInfo() interface{} { // TODO, better to move to the next level metric.AddMetric(0) - return []map[string]interface{} { + return []map[string]interface{}{ { "Details": ret, }, @@ -94,7 +94,7 @@ func (dr *dbRumper) getStats() map[string]interface{} { func (dr *dbRumper) run() { // single connection dr.client = utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType, - conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable) + conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable) // some clouds may have several db under proxy count, err := dr.getNode() @@ -320,6 +320,9 @@ func (dre *dbRumperExecutor) writer() { bucket := utils.StartQoS(conf.Options.Qps) preDb := 0 for ele := range dre.keyChan { + if len(conf.Options.FilterKey) != 0 && !hasAtLeastOnePrefix(ele.key, conf.Options.FilterKey) { + continue + } // QoS, limit the qps <-bucket @@ -495,4 +498,4 @@ func (dre *dbRumperExecutor) doFetch(db int) error { log.Infof("dbRumper[%v] executor[%v] finish fetching db[%v]", dre.rumperId, dre.executorId, db) return nil -} \ No newline at end of file +} diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 3a95b06..78aac48 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -426,11 +426,8 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type } if len(conf.Options.FilterKey) != 0 { - for i := 0; i < len(conf.Options.FilterKey); i++ { - if strings.HasPrefix(string(e.Key), conf.Options.FilterKey[i]) { - utils.RestoreRdbEntry(c, e) - break - } + if hasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) { + utils.RestoreRdbEntry(c, e) } } else if len(conf.Options.FilterSlot) > 0 { for _, slot := range conf.Options.FilterSlot { @@ -473,8 +470,8 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type } func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type, passwd string, tlsEnable bool) { - readeTimeout := time.Duration(10)*time.Minute - writeTimeout := time.Duration(10)*time.Minute + readeTimeout := time.Duration(10) * time.Minute + writeTimeout := time.Duration(10) * time.Minute isCluster := conf.Options.TargetType == conf.RedisTypeCluster c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, readeTimeout, writeTimeout, isCluster, tlsEnable) defer c.Close() diff --git a/src/redis-shake/utils.go b/src/redis-shake/utils.go new file mode 100644 index 0000000..0f21279 --- /dev/null +++ b/src/redis-shake/utils.go @@ -0,0 +1,13 @@ +package run + +import "strings" + +// hasAtLeastOnePrefix checks whether the key has begins with at least one of prefixes. +func hasAtLeastOnePrefix(key string, prefixes []string) bool { + for _, prefix := range prefixes { + if strings.HasPrefix(key, prefix) { + return true + } + } + return false +} diff --git a/src/redis-shake/utils_test.go b/src/redis-shake/utils_test.go new file mode 100644 index 0000000..77a3353 --- /dev/null +++ b/src/redis-shake/utils_test.go @@ -0,0 +1,40 @@ +package run + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHasAtLeastOnePrefix(t *testing.T) { + + cases := []struct { + key string + prefixes []string + expectResult bool + }{ + { + // no prefix provided + "a", + []string{}, + false, + }, + { + // has prefix + "abc", + []string{"ab"}, + true, + }, + { + // does NOT have prefix + "abc", + []string{"edf", "wab"}, + false, + }, + } + + for _, c := range cases { + result := hasAtLeastOnePrefix(c.key, c.prefixes) + assert.Equal(t, c.expectResult, result) + } +} From 8a22221f9cf22671ec52baedb63d8f291cc5e403 Mon Sep 17 00:00:00 2001 From: Zheng Dayu Date: Tue, 18 Jun 2019 06:42:05 +0800 Subject: [PATCH 2/3] update redis-shake.conf --- conf/redis-shake.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index d5944f9..d8c6f71 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -111,7 +111,7 @@ filter.db = # filter key with prefix string. multiple keys are separated by ';'. # e.g., a;b;c # default is all. -# used in `restore` and `sync`. +# used in `restore`, `sync` and `rump`. # 支持过滤key,只让指定的key通过,分号分隔 filter.key = # filter given slot, multiple slots are separated by ';'. From 6a02fd2641b9a5b616185902886b0ea9b34dc372 Mon Sep 17 00:00:00 2001 From: Zheng Dayu Date: Thu, 20 Jun 2019 11:44:56 +0800 Subject: [PATCH 3/3] move function HasAtLeastOnePrefix into redis-shake/common --- src/redis-shake/common/utils.go | 12 ++++++- src/redis-shake/common/utils_test.go | 48 +++++++++++++++++++++++----- src/redis-shake/restore.go | 2 +- src/redis-shake/rump.go | 2 +- src/redis-shake/sync.go | 2 +- src/redis-shake/utils.go | 13 -------- src/redis-shake/utils_test.go | 40 ----------------------- 7 files changed, 54 insertions(+), 65 deletions(-) delete mode 100644 src/redis-shake/utils.go delete mode 100644 src/redis-shake/utils_test.go diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 1d5712e..8314cd5 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -35,7 +35,7 @@ func OpenRedisConn(target []string, auth_type, passwd string, isCluster bool, tl } func OpenRedisConnWithTimeout(target []string, auth_type, passwd string, readTimeout, writeTimeout time.Duration, - isCluster bool, tlsEnable bool) redigo.Conn { + isCluster bool, tlsEnable bool) redigo.Conn { // return redigo.NewConn(OpenNetConn(target, auth_type, passwd), readTimeout, writeTimeout) if isCluster { cluster, err := redigoCluster.NewCluster( @@ -1003,3 +1003,13 @@ var defaultDialFunction = func(addr string) (redigo.Conn, error) { } return c, nil } + +// HasAtLeastOnePrefix checks whether the key begins with at least one of prefixes. +func HasAtLeastOnePrefix(key string, prefixes []string) bool { + for _, prefix := range prefixes { + if strings.HasPrefix(key, prefix) { + return true + } + } + return false +} diff --git a/src/redis-shake/common/utils_test.go b/src/redis-shake/common/utils_test.go index e1a571c..88986e5 100644 --- a/src/redis-shake/common/utils_test.go +++ b/src/redis-shake/common/utils_test.go @@ -1,9 +1,9 @@ package utils import ( - "testing" "fmt" "sort" + "testing" "github.com/stretchr/testify/assert" ) @@ -14,8 +14,8 @@ func TestGetAllClusterNode(t *testing.T) { fmt.Printf("TestGetAllClusterNode case %d.\n", nr) nr++ - client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false) - ret, err := GetAllClusterNode(client, "master") + client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false, false) + ret, err := GetAllClusterNode(client, "master", "") sort.Strings(ret) assert.Equal(t, nil, err, "should be equal") assert.Equal(t, 3, len(ret), "should be equal") @@ -28,8 +28,8 @@ func TestGetAllClusterNode(t *testing.T) { fmt.Printf("TestGetAllClusterNode case %d.\n", nr) nr++ - client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false) - ret, err := GetAllClusterNode(client, "slave") + client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false, false) + ret, err := GetAllClusterNode(client, "slave", "") sort.Strings(ret) assert.Equal(t, nil, err, "should be equal") assert.Equal(t, 3, len(ret), "should be equal") @@ -42,8 +42,8 @@ func TestGetAllClusterNode(t *testing.T) { fmt.Printf("TestGetAllClusterNode case %d.\n", nr) nr++ - client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false) - ret, err := GetAllClusterNode(client, "all") + client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false, false) + ret, err := GetAllClusterNode(client, "all", "") sort.Strings(ret) assert.Equal(t, nil, err, "should be equal") assert.Equal(t, 6, len(ret), "should be equal") @@ -54,4 +54,36 @@ func TestGetAllClusterNode(t *testing.T) { assert.Equal(t, "10.1.1.1:21335", ret[4], "should be equal") assert.Equal(t, "10.1.1.1:21336", ret[5], "should be equal") } -} \ No newline at end of file +} + +func TestHasAtLeastOnePrefix(t *testing.T) { + cases := []struct { + key string + prefixes []string + expectResult bool + }{ + { + // no prefix provided + "a", + []string{}, + false, + }, + { + // has prefix + "abc", + []string{"ab"}, + true, + }, + { + // does NOT have prefix + "abc", + []string{"edf", "wab"}, + false, + }, + } + + for _, c := range cases { + result := HasAtLeastOnePrefix(c.key, c.prefixes) + assert.Equal(t, c.expectResult, result) + } +} diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index 5eb39a7..3b63c3a 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -167,7 +167,7 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth utils.SelectDB(c, lastdb) } } - if len(conf.Options.FilterKey) != 0 && !hasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) { + if len(conf.Options.FilterKey) != 0 && !utils.HasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) { continue } utils.RestoreRdbEntry(c, e) diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 4fb6679..e22b0a4 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -320,7 +320,7 @@ func (dre *dbRumperExecutor) writer() { bucket := utils.StartQoS(conf.Options.Qps) preDb := 0 for ele := range dre.keyChan { - if len(conf.Options.FilterKey) != 0 && !hasAtLeastOnePrefix(ele.key, conf.Options.FilterKey) { + if len(conf.Options.FilterKey) != 0 && !utils.HasAtLeastOnePrefix(ele.key, conf.Options.FilterKey) { continue } // QoS, limit the qps diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 78aac48..90f51ab 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -426,7 +426,7 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type } if len(conf.Options.FilterKey) != 0 { - if hasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) { + if utils.HasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) { utils.RestoreRdbEntry(c, e) } } else if len(conf.Options.FilterSlot) > 0 { diff --git a/src/redis-shake/utils.go b/src/redis-shake/utils.go deleted file mode 100644 index 0f21279..0000000 --- a/src/redis-shake/utils.go +++ /dev/null @@ -1,13 +0,0 @@ -package run - -import "strings" - -// hasAtLeastOnePrefix checks whether the key has begins with at least one of prefixes. -func hasAtLeastOnePrefix(key string, prefixes []string) bool { - for _, prefix := range prefixes { - if strings.HasPrefix(key, prefix) { - return true - } - } - return false -} diff --git a/src/redis-shake/utils_test.go b/src/redis-shake/utils_test.go deleted file mode 100644 index 77a3353..0000000 --- a/src/redis-shake/utils_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package run - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestHasAtLeastOnePrefix(t *testing.T) { - - cases := []struct { - key string - prefixes []string - expectResult bool - }{ - { - // no prefix provided - "a", - []string{}, - false, - }, - { - // has prefix - "abc", - []string{"ab"}, - true, - }, - { - // does NOT have prefix - "abc", - []string{"edf", "wab"}, - false, - }, - } - - for _, c := range cases { - result := hasAtLeastOnePrefix(c.key, c.prefixes) - assert.Equal(t, c.expectResult, result) - } -}