Merge pull request #15 from alibaba/develop

update
v4
翊仰 6 years ago committed by GitHub
commit b9a545a3ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      ChangeLog
  2. 3
      README.md
  3. 37
      src/redis-shake/common/command.go
  4. 121
      src/redis-shake/rump.go
  5. 2
      src/redis-shake/sync.go

@ -1,3 +1,6 @@
2019-05-16 Alibaba Cloud.
* VERSION: 1.6.2
* BUGFIX: fix bug of `rump` mode only syncing db 0 data.
2019-05-14 Alibaba Cloud. 2019-05-14 Alibaba Cloud.
* VERSION: 1.6.1 * VERSION: 1.6.1
* IMPROVE: support fetching db address from sentinel, the failover * IMPROVE: support fetching db address from sentinel, the failover

@ -62,7 +62,8 @@ Version rules: a.b.c.
| improve-\* | improvement branch. forked from develop branch and then merge back after finish developing, testing, and code review. | | improve-\* | improvement branch. forked from develop branch and then merge back after finish developing, testing, and code review. |
Tag rules:<br> Tag rules:<br>
Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0.2-20180628" Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0.2-20180628"<br>
User can use `-version` to print the version.
# Usage # Usage
--- ---

@ -0,0 +1,37 @@
package utils
import (
"bytes"
"fmt"
"strconv"
)
func ParseKeyspace(content []byte) (map[int32]int64, error) {
if bytes.HasPrefix(content, []byte("# Keyspace")) == false {
return nil, fmt.Errorf("invalid info Keyspace: %s", string(content))
}
lines := bytes.Split(content, []byte("\n"))
reply := make(map[int32]int64)
for _, line := range lines {
line = bytes.TrimSpace(line)
if bytes.HasPrefix(line, []byte("db")) == true {
// line "db0:keys=18,expires=0,avg_ttl=0"
items := bytes.Split(line, []byte(":"))
db, err := strconv.Atoi(string(items[0][2:]))
if err != nil {
return nil, err
}
nums := bytes.Split(items[1], []byte(","))
if bytes.HasPrefix(nums[0], []byte("keys=")) == false {
return nil, fmt.Errorf("invalid info Keyspace: %s", string(content))
}
keysNum, err := strconv.ParseInt(string(nums[0][5:]), 10, 0)
if err != nil {
return nil, err
}
reply[int32(db)] = int64(keysNum)
} // end true
} // end for
return reply, nil
}

@ -4,6 +4,7 @@ import (
"pkg/libs/log" "pkg/libs/log"
"strconv" "strconv"
"sync" "sync"
"fmt"
"redis-shake/common" "redis-shake/common"
"redis-shake/configure" "redis-shake/configure"
@ -87,47 +88,22 @@ func (cr *CmdRump) fetcher(idx int) {
log.Panicf("fetch db node failed: length[%v], error[%v]", length, err) log.Panicf("fetch db node failed: length[%v], error[%v]", length, err)
} }
log.Infof("start fetcher with special-cloud[%v], length[%v]", conf.Options.ScanSpecialCloud, length) log.Infof("start fetcher with special-cloud[%v], nodes[%v]", conf.Options.ScanSpecialCloud, length)
// iterate all source nodes // iterate all source nodes
for i := 0; i < length; i++ { for i := 0; i < length; i++ {
// fetch data from on node // fetch db number from 'info Keyspace'
for { dbNumber, err := cr.getSourceDbList(i)
keys, err := cr.scanners[idx].ScanKey(i)
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
} }
log.Info("scaned keys: ", len(keys)) log.Infof("fetch node[%v] with db list: %v", i, dbNumber)
// iterate all db
if len(keys) != 0 { for _, db := range dbNumber {
// pipeline dump log.Infof("fetch node[%v] db[%v]", i, db)
for _, key := range keys { if err := cr.doFetch(int(db), i); err != nil {
log.Debug("scan key: ", key) log.Panic(err)
cr.sourceConn[idx].Send("DUMP", key)
}
dumps, err := redis.Strings(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do dump with failed[%v]", err)
}
// pipeline ttl
for _, key := range keys {
cr.sourceConn[idx].Send("PTTL", key)
}
pttls, err := redis.Int64s(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
log.Panicf("do ttl with failed[%v]", err)
}
for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
}
}
// Last iteration of scan.
if cr.scanners[idx].EndNode() {
break
} }
} }
} }
@ -174,3 +150,80 @@ func (cr *CmdRump) receiver() {
} }
} }
} }
func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) {
conn := cr.sourceConn[id]
if ret, err := conn.Do("info", "Keyspace"); err != nil {
return nil, err
} else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil {
return nil, err
} else {
list := make([]int32, 0, len(mp))
for key, val := range mp {
if val > 0 {
list = append(list, key)
}
}
return list, nil
}
}
func (cr *CmdRump) doFetch(db, idx int) error {
// send 'select' command to both source and target
log.Infof("send source select db")
if _, err := cr.sourceConn[idx].Do("select", db); err != nil {
return err
}
log.Infof("send target select db")
cr.targetConn.Flush()
if err := cr.targetConn.Send("select", db); err != nil {
return err
}
cr.targetConn.Flush()
log.Infof("finish select db, start fetching node[%v] db[%v]", idx, db)
for {
keys, err := cr.scanners[idx].ScanKey(idx)
if err != nil {
return err
}
log.Info("scanned keys: ", len(keys))
if len(keys) != 0 {
// pipeline dump
for _, key := range keys {
log.Debug("scan key: ", key)
cr.sourceConn[idx].Send("DUMP", key)
}
dumps, err := redis.Strings(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do dump with failed[%v]", err)
}
// pipeline ttl
for _, key := range keys {
cr.sourceConn[idx].Send("PTTL", key)
}
pttls, err := redis.Int64s(cr.sourceConn[idx].Do(""))
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do ttl with failed[%v]", err)
}
for i, k := range keys {
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]}
}
}
// Last iteration of scan.
if cr.scanners[idx].EndNode() {
break
}
}
log.Infof("finish fetching node[%v] db[%v]", idx, db)
return nil
}

@ -95,7 +95,7 @@ func (cmd *CmdSync) Main() {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(conf.Options.SourceAddress)) wg.Add(len(conf.Options.SourceAddressList))
for i := 0; i < int(conf.Options.SourceParallel); i++ { for i := 0; i < int(conf.Options.SourceParallel); i++ {
go func() { go func() {

Loading…
Cancel
Save