Merge pull request #101 from ceshihao/rump_filter_key

support filter by key in restore and rump mode
v4
Vinllen Chen 6 years ago committed by GitHub
commit caec047d10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      conf/redis-shake.conf
  2. 12
      src/redis-shake/common/utils.go
  3. 48
      src/redis-shake/common/utils_test.go
  4. 6
      src/redis-shake/restore.go
  5. 19
      src/redis-shake/rump.go
  6. 11
      src/redis-shake/sync.go

@ -111,7 +111,7 @@ filter.db =
# filter key with prefix string. multiple keys are separated by ';'. # filter key with prefix string. multiple keys are separated by ';'.
# e.g., a;b;c # e.g., a;b;c
# default is all. # default is all.
# used in `restore` and `sync`. # used in `restore`, `sync` and `rump`.
# 支持过滤key,只让指定的key通过,分号分隔 # 支持过滤key,只让指定的key通过,分号分隔
filter.key = filter.key =
# filter given slot, multiple slots are separated by ';'. # filter given slot, multiple slots are separated by ';'.

@ -35,7 +35,7 @@ func OpenRedisConn(target []string, auth_type, passwd string, isCluster bool, tl
} }
func OpenRedisConnWithTimeout(target []string, auth_type, passwd string, readTimeout, writeTimeout time.Duration, func OpenRedisConnWithTimeout(target []string, auth_type, passwd string, readTimeout, writeTimeout time.Duration,
isCluster bool, tlsEnable bool) redigo.Conn { isCluster bool, tlsEnable bool) redigo.Conn {
// return redigo.NewConn(OpenNetConn(target, auth_type, passwd), readTimeout, writeTimeout) // return redigo.NewConn(OpenNetConn(target, auth_type, passwd), readTimeout, writeTimeout)
if isCluster { if isCluster {
cluster, err := redigoCluster.NewCluster( cluster, err := redigoCluster.NewCluster(
@ -1003,3 +1003,13 @@ var defaultDialFunction = func(addr string) (redigo.Conn, error) {
} }
return c, nil return c, nil
} }
// HasAtLeastOnePrefix checks whether the key begins with at least one of prefixes.
func HasAtLeastOnePrefix(key string, prefixes []string) bool {
for _, prefix := range prefixes {
if strings.HasPrefix(key, prefix) {
return true
}
}
return false
}

@ -1,9 +1,9 @@
package utils package utils
import ( import (
"testing"
"fmt" "fmt"
"sort" "sort"
"testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -14,8 +14,8 @@ func TestGetAllClusterNode(t *testing.T) {
fmt.Printf("TestGetAllClusterNode case %d.\n", nr) fmt.Printf("TestGetAllClusterNode case %d.\n", nr)
nr++ nr++
client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false) client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false, false)
ret, err := GetAllClusterNode(client, "master") ret, err := GetAllClusterNode(client, "master", "")
sort.Strings(ret) sort.Strings(ret)
assert.Equal(t, nil, err, "should be equal") assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 3, len(ret), "should be equal") assert.Equal(t, 3, len(ret), "should be equal")
@ -28,8 +28,8 @@ func TestGetAllClusterNode(t *testing.T) {
fmt.Printf("TestGetAllClusterNode case %d.\n", nr) fmt.Printf("TestGetAllClusterNode case %d.\n", nr)
nr++ nr++
client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false) client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false, false)
ret, err := GetAllClusterNode(client, "slave") ret, err := GetAllClusterNode(client, "slave", "")
sort.Strings(ret) sort.Strings(ret)
assert.Equal(t, nil, err, "should be equal") assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 3, len(ret), "should be equal") assert.Equal(t, 3, len(ret), "should be equal")
@ -42,8 +42,8 @@ func TestGetAllClusterNode(t *testing.T) {
fmt.Printf("TestGetAllClusterNode case %d.\n", nr) fmt.Printf("TestGetAllClusterNode case %d.\n", nr)
nr++ nr++
client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false) client := OpenRedisConn([]string{"10.1.1.1:21333"}, "auth", "123456", false, false)
ret, err := GetAllClusterNode(client, "all") ret, err := GetAllClusterNode(client, "all", "")
sort.Strings(ret) sort.Strings(ret)
assert.Equal(t, nil, err, "should be equal") assert.Equal(t, nil, err, "should be equal")
assert.Equal(t, 6, len(ret), "should be equal") assert.Equal(t, 6, len(ret), "should be equal")
@ -54,4 +54,36 @@ func TestGetAllClusterNode(t *testing.T) {
assert.Equal(t, "10.1.1.1:21335", ret[4], "should be equal") assert.Equal(t, "10.1.1.1:21335", ret[4], "should be equal")
assert.Equal(t, "10.1.1.1:21336", ret[5], "should be equal") assert.Equal(t, "10.1.1.1:21336", ret[5], "should be equal")
} }
} }
func TestHasAtLeastOnePrefix(t *testing.T) {
cases := []struct {
key string
prefixes []string
expectResult bool
}{
{
// no prefix provided
"a",
[]string{},
false,
},
{
// has prefix
"abc",
[]string{"ab"},
true,
},
{
// does NOT have prefix
"abc",
[]string{"edf", "wab"},
false,
},
}
for _, c := range cases {
result := HasAtLeastOnePrefix(c.key, c.prefixes)
assert.Equal(t, c.expectResult, result)
}
}

@ -137,9 +137,8 @@ func (dr *dbRestorer) restore() {
} }
} }
func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64, func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64,
tlsEnable bool) { tlsEnable bool) {
pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize) pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize)
wait := make(chan struct{}) wait := make(chan struct{})
go func() { go func() {
@ -168,6 +167,9 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth
utils.SelectDB(c, lastdb) utils.SelectDB(c, lastdb)
} }
} }
if len(conf.Options.FilterKey) != 0 && !utils.HasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) {
continue
}
utils.RestoreRdbEntry(c, e) utils.RestoreRdbEntry(c, e)
} }
} }

@ -1,18 +1,18 @@
package run package run
import ( import (
"strconv"
"sync"
"fmt" "fmt"
"reflect"
"math" "math"
"reflect"
"strconv"
"sync"
"pkg/libs/log"
"pkg/libs/atomic2" "pkg/libs/atomic2"
"pkg/libs/log"
"redis-shake/common" "redis-shake/common"
"redis-shake/configure" "redis-shake/configure"
"redis-shake/scanner"
"redis-shake/metric" "redis-shake/metric"
"redis-shake/scanner"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
) )
@ -33,7 +33,7 @@ func (cr *CmdRump) GetDetailedInfo() interface{} {
// TODO, better to move to the next level // TODO, better to move to the next level
metric.AddMetric(0) metric.AddMetric(0)
return []map[string]interface{} { return []map[string]interface{}{
{ {
"Details": ret, "Details": ret,
}, },
@ -94,7 +94,7 @@ func (dr *dbRumper) getStats() map[string]interface{} {
func (dr *dbRumper) run() { func (dr *dbRumper) run() {
// single connection // single connection
dr.client = utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType, dr.client = utils.OpenRedisConn([]string{dr.address}, conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable) conf.Options.SourcePasswordRaw, false, conf.Options.SourceTLSEnable)
// some clouds may have several db under proxy // some clouds may have several db under proxy
count, err := dr.getNode() count, err := dr.getNode()
@ -320,6 +320,9 @@ func (dre *dbRumperExecutor) writer() {
bucket := utils.StartQoS(conf.Options.Qps) bucket := utils.StartQoS(conf.Options.Qps)
preDb := 0 preDb := 0
for ele := range dre.keyChan { for ele := range dre.keyChan {
if len(conf.Options.FilterKey) != 0 && !utils.HasAtLeastOnePrefix(ele.key, conf.Options.FilterKey) {
continue
}
// QoS, limit the qps // QoS, limit the qps
<-bucket <-bucket
@ -495,4 +498,4 @@ func (dre *dbRumperExecutor) doFetch(db int) error {
log.Infof("dbRumper[%v] executor[%v] finish fetching db[%v]", dre.rumperId, dre.executorId, db) log.Infof("dbRumper[%v] executor[%v] finish fetching db[%v]", dre.rumperId, dre.executorId, db)
return nil return nil
} }

@ -426,11 +426,8 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type
} }
if len(conf.Options.FilterKey) != 0 { if len(conf.Options.FilterKey) != 0 {
for i := 0; i < len(conf.Options.FilterKey); i++ { if utils.HasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) {
if strings.HasPrefix(string(e.Key), conf.Options.FilterKey[i]) { utils.RestoreRdbEntry(c, e)
utils.RestoreRdbEntry(c, e)
break
}
} }
} else if len(conf.Options.FilterSlot) > 0 { } else if len(conf.Options.FilterSlot) > 0 {
for _, slot := range conf.Options.FilterSlot { for _, slot := range conf.Options.FilterSlot {
@ -473,8 +470,8 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type
} }
func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type, passwd string, tlsEnable bool) { func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type, passwd string, tlsEnable bool) {
readeTimeout := time.Duration(10)*time.Minute readeTimeout := time.Duration(10) * time.Minute
writeTimeout := time.Duration(10)*time.Minute writeTimeout := time.Duration(10) * time.Minute
isCluster := conf.Options.TargetType == conf.RedisTypeCluster isCluster := conf.Options.TargetType == conf.RedisTypeCluster
c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, readeTimeout, writeTimeout, isCluster, tlsEnable) c := utils.OpenRedisConnWithTimeout(target, auth_type, passwd, readeTimeout, writeTimeout, isCluster, tlsEnable)
defer c.Close() defer c.Close()

Loading…
Cancel
Save