fix bug of 'rump' mode only syncing db 0 data

v4
vinllen 6 years ago
commit 3222891ec5
  1. 3
      ChangeLog
  2. 3
      README.md
  3. 37
      src/redis-shake/common/command.go
  4. 127
      src/redis-shake/rump.go
  5. 2
      src/redis-shake/sync.go

@ -1,3 +1,6 @@
2019-05-16 Alibaba Cloud.
* VERSION: 1.6.2
* BUGFIX: fix bug of `rump` mode only syncing db 0 data.
2019-05-14 Alibaba Cloud.
* VERSION: 1.6.1
* IMPROVE: support fetching db address from sentinel, the failover

@ -62,7 +62,8 @@ Version rules: a.b.c.
| improve-\* | improvement branch. forked from develop branch and then merge back after finish developing, testing, and code review. |
Tag rules:<br>
Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0.2-20180628"
Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0.2-20180628"<br>
User can use `-version` to print the version.
# Usage
---

@ -0,0 +1,37 @@
package utils
import (
"bytes"
"fmt"
"strconv"
)
func ParseKeyspace(content []byte) (map[int32]int64, error) {
if bytes.HasPrefix(content, []byte("# Keyspace")) == false {
return nil, fmt.Errorf("invalid info Keyspace: %s", string(content))
}
lines := bytes.Split(content, []byte("\n"))
reply := make(map[int32]int64)
for _, line := range lines {
line = bytes.TrimSpace(line)
if bytes.HasPrefix(line, []byte("db")) == true {
// line "db0:keys=18,expires=0,avg_ttl=0"
items := bytes.Split(line, []byte(":"))
db, err := strconv.Atoi(string(items[0][2:]))
if err != nil {
return nil, err
}
nums := bytes.Split(items[1], []byte(","))
if bytes.HasPrefix(nums[0], []byte("keys=")) == false {
return nil, fmt.Errorf("invalid info Keyspace: %s", string(content))
}
keysNum, err := strconv.ParseInt(string(nums[0][5:]), 10, 0)
if err != nil {
return nil, err
}
reply[int32(db)] = int64(keysNum)
} // end true
} // end for
return reply, nil
}

@ -4,6 +4,7 @@ import (
"pkg/libs/log"
"strconv"
"sync"
"fmt"
"redis-shake/common"
"redis-shake/configure"
@ -87,47 +88,22 @@ func (cr *CmdRump) fetcher(idx int) {
log.Panicf("fetch db node failed: length[%v], error[%v]", length, err)
}
log.Infof("start fetcher with special-cloud[%v], length[%v]", conf.Options.ScanSpecialCloud, length)
log.Infof("start fetcher with special-cloud[%v], nodes[%v]", conf.Options.ScanSpecialCloud, length)
// iterate all source nodes
for i := 0; i < length; i++ {
// fetch data from on node
for {
keys, err := cr.scanners[idx].ScanKey(i)
if err != nil {
log.Panic(err)
}
log.Info("scaned keys: ", len(keys))
if len(keys) != 0 {
// pipeline dump
for _, key := range keys {
log.Debug("scan key: ", key)
cr.sourceConn[idx].Send("DUMP", key)
}
dumps, err := redis.Strings(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do dump with failed[%v]", err)
}
// pipeline ttl
for _, key := range keys {
cr.sourceConn[idx].Send("PTTL", key)
}
pttls, err := redis.Int64s(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do ttl with failed[%v]", err)
}
for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
}
}
// fetch db number from 'info Keyspace'
dbNumber, err := cr.getSourceDbList(i)
if err != nil {
log.Panic(err)
}
// Last iteration of scan.
if cr.scanners[idx].EndNode() {
break
log.Infof("fetch node[%v] with db list: %v", i, dbNumber)
// iterate all db
for _, db := range dbNumber {
log.Infof("fetch node[%v] db[%v]", i, db)
if err := cr.doFetch(int(db), i); err != nil {
log.Panic(err)
}
}
}
@ -174,3 +150,80 @@ func (cr *CmdRump) receiver() {
}
}
}
func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) {
conn := cr.sourceConn[id]
if ret, err := conn.Do("info", "Keyspace"); err != nil {
return nil, err
} else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil {
return nil, err
} else {
list := make([]int32, 0, len(mp))
for key, val := range mp {
if val > 0 {
list = append(list, key)
}
}
return list, nil
}
}
func (cr *CmdRump) doFetch(db, idx int) error {
// send 'select' command to both source and target
log.Infof("send source select db")
if _, err := cr.sourceConn[idx].Do("select", db); err != nil {
return err
}
log.Infof("send target select db")
cr.targetConn.Flush()
if err := cr.targetConn.Send("select", db); err != nil {
return err
}
cr.targetConn.Flush()
log.Infof("finish select db, start fetching node[%v] db[%v]", idx, db)
for {
keys, err := cr.scanners[idx].ScanKey(idx)
if err != nil {
return err
}
log.Info("scanned keys: ", len(keys))
if len(keys) != 0 {
// pipeline dump
for _, key := range keys {
log.Debug("scan key: ", key)
cr.sourceConn[idx].Send("DUMP", key)
}
dumps, err := redis.Strings(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do dump with failed[%v]", err)
}
// pipeline ttl
for _, key := range keys {
cr.sourceConn[idx].Send("PTTL", key)
}
pttls, err := redis.Int64s(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do ttl with failed[%v]", err)
}
for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
}
}
// Last iteration of scan.
if cr.scanners[idx].EndNode() {
break
}
}
log.Infof("finish fetching node[%v] db[%v]", idx, db)
return nil
}

@ -95,7 +95,7 @@ func (cmd *CmdSync) Main() {
}
var wg sync.WaitGroup
wg.Add(len(conf.Options.SourceAddress))
wg.Add(len(conf.Options.SourceAddressList))
for i := 0; i < int(conf.Options.SourceParallel); i++ {
go func() {

Loading…
Cancel
Save