support filter by key in restore and rump mode

v4
Zheng Dayu 6 years ago
parent 3fc9515c26
commit 145fcf336b
  1. 4
      src/redis-shake/restore.go
  2. 13
      src/redis-shake/rump.go
  3. 5
      src/redis-shake/sync.go
  4. 13
      src/redis-shake/utils.go
  5. 40
      src/redis-shake/utils_test.go

@ -137,7 +137,6 @@ func (dr *dbRestorer) restore() {
} }
} }
func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64, func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth_type, passwd string, nsize int64,
tlsEnable bool) { tlsEnable bool) {
pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize) pipe := utils.NewRDBLoader(reader, &dr.rbytes, base.RDBPipeSize)
@ -168,6 +167,9 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth
utils.SelectDB(c, lastdb) utils.SelectDB(c, lastdb)
} }
} }
if len(conf.Options.FilterKey) != 0 && !hasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) {
continue
}
utils.RestoreRdbEntry(c, e) utils.RestoreRdbEntry(c, e)
} }
} }

@ -1,18 +1,18 @@
package run package run
import ( import (
"strconv"
"sync"
"fmt" "fmt"
"reflect"
"math" "math"
"reflect"
"strconv"
"sync"
"pkg/libs/log"
"pkg/libs/atomic2" "pkg/libs/atomic2"
"pkg/libs/log"
"redis-shake/common" "redis-shake/common"
"redis-shake/configure" "redis-shake/configure"
"redis-shake/scanner"
"redis-shake/metric" "redis-shake/metric"
"redis-shake/scanner"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
) )
@ -320,6 +320,9 @@ func (dre *dbRumperExecutor) writer() {
bucket := utils.StartQoS(conf.Options.Qps) bucket := utils.StartQoS(conf.Options.Qps)
preDb := 0 preDb := 0
for ele := range dre.keyChan { for ele := range dre.keyChan {
if len(conf.Options.FilterKey) != 0 && !hasAtLeastOnePrefix(ele.key, conf.Options.FilterKey) {
continue
}
// QoS, limit the qps // QoS, limit the qps
<-bucket <-bucket

@ -426,11 +426,8 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type
} }
if len(conf.Options.FilterKey) != 0 { if len(conf.Options.FilterKey) != 0 {
for i := 0; i < len(conf.Options.FilterKey); i++ { if hasAtLeastOnePrefix(string(e.Key), conf.Options.FilterKey) {
if strings.HasPrefix(string(e.Key), conf.Options.FilterKey[i]) {
utils.RestoreRdbEntry(c, e) utils.RestoreRdbEntry(c, e)
break
}
} }
} else if len(conf.Options.FilterSlot) > 0 { } else if len(conf.Options.FilterSlot) > 0 {
for _, slot := range conf.Options.FilterSlot { for _, slot := range conf.Options.FilterSlot {

@ -0,0 +1,13 @@
package run
import "strings"
// hasAtLeastOnePrefix checks whether the key has begins with at least one of prefixes.
func hasAtLeastOnePrefix(key string, prefixes []string) bool {
for _, prefix := range prefixes {
if strings.HasPrefix(key, prefix) {
return true
}
}
return false
}

@ -0,0 +1,40 @@
package run
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestHasAtLeastOnePrefix(t *testing.T) {
cases := []struct {
key string
prefixes []string
expectResult bool
}{
{
// no prefix provided
"a",
[]string{},
false,
},
{
// has prefix
"abc",
[]string{"ab"},
true,
},
{
// does NOT have prefix
"abc",
[]string{"edf", "wab"},
false,
},
}
for _, c := range cases {
result := hasAtLeastOnePrefix(c.key, c.prefixes)
assert.Equal(t, c.expectResult, result)
}
}
Loading…
Cancel
Save