|
|
@ -349,9 +349,9 @@ func (dre *dbRumperExecutor) writer() { |
|
|
|
preDb := 0 |
|
|
|
preDb := 0 |
|
|
|
preBigKeyDb := 0 |
|
|
|
preBigKeyDb := 0 |
|
|
|
for ele := range dre.keyChan { |
|
|
|
for ele := range dre.keyChan { |
|
|
|
if filter.FilterKey(ele.key) { |
|
|
|
/*if filter.FilterKey(ele.key) { |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
}*/ |
|
|
|
// QoS, limit the qps
|
|
|
|
// QoS, limit the qps
|
|
|
|
<-bucket |
|
|
|
<-bucket |
|
|
|
|
|
|
|
|
|
|
@ -494,11 +494,26 @@ func (dre *dbRumperExecutor) doFetch(db int) error { |
|
|
|
log.Infof("dbRumper[%v] executor[%v] start fetching node db[%v]", dre.rumperId, dre.executorId, db) |
|
|
|
log.Infof("dbRumper[%v] executor[%v] start fetching node db[%v]", dre.rumperId, dre.executorId, db) |
|
|
|
|
|
|
|
|
|
|
|
for { |
|
|
|
for { |
|
|
|
keys, err := dre.scanner.ScanKey() |
|
|
|
rawKeys, err := dre.scanner.ScanKey() |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var keys []string |
|
|
|
|
|
|
|
if len(conf.Options.FilterKeyBlacklist) != 0 || len(conf.Options.FilterKeyWhitelist) != 0 { |
|
|
|
|
|
|
|
// filter keys
|
|
|
|
|
|
|
|
keys = make([]string, 0, len(rawKeys)) |
|
|
|
|
|
|
|
for _, key := range rawKeys { |
|
|
|
|
|
|
|
if filter.FilterKey(key) { |
|
|
|
|
|
|
|
log.Infof("dbRumper[%v] executor[%v] key[%v] filter", dre.rumperId, dre.executorId, key) |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
keys = append(keys, key) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
keys = rawKeys |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
log.Debugf("dbRumper[%v] executor[%v] scanned keys number: %v", dre.rumperId, dre.executorId, len(keys)) |
|
|
|
log.Debugf("dbRumper[%v] executor[%v] scanned keys number: %v", dre.rumperId, dre.executorId, len(keys)) |
|
|
|
|
|
|
|
|
|
|
|
if len(keys) != 0 { |
|
|
|
if len(keys) != 0 { |
|
|
|