Merge pull request #38 from alibaba/feature-1.4
release v1.4: support rump type to using dump+restore command to sync…v4
commit
5596b5d4f1
8 changed files with 303 additions and 57 deletions
@ -1,16 +1,20 @@ |
||||
2019-04-21 Alibaba Cloud. |
||||
* VERSION: 1.4.0 |
||||
* FEATURE: support "rump" type to syncing data when `sync` and `psync` |
||||
commands are not supported. |
||||
2019-04-13 Alibaba Cloud. |
||||
* version: 1.2.3 |
||||
* VERSION: 1.2.3 |
||||
* IMPROVE: polish log print to print more error info. |
||||
2019-04-03 Alibaba Cloud. |
||||
* version: 1.2.2 |
||||
* VERSION: 1.2.2 |
||||
* BUGFIX: support 5.0 rdb RDB_OPCODE_MODULE_AUX, RDB_OPCODE_IDLE and |
||||
RDB_OPCODE_FREQ type. |
||||
2019-03-27 Alibaba Cloud. |
||||
* version: 1.2.1 |
||||
* VERSION: 1.2.1 |
||||
* IMPROVE: support syncing lua script in RDB syncing. |
||||
2019-03-11 Alibaba Cloud. |
||||
* version: 1.2.0 |
||||
* IMPROVE: support 5.0. |
||||
* VERSION: 1.2.0 |
||||
* FEATURE: support 5.0. |
||||
2019-02-21 Alibaba Cloud. |
||||
* version: 1.0.0 |
||||
* redis-shake: initial release. |
||||
* VERSION: 1.0.0 |
||||
* REDIS-SHAKE: initial release. |
||||
|
@ -0,0 +1,173 @@ |
||||
package run |
||||
|
||||
import ( |
||||
"pkg/libs/log" |
||||
"strconv" |
||||
|
||||
"redis-shake/common" |
||||
"redis-shake/configure" |
||||
|
||||
"github.com/garyburd/redigo/redis" |
||||
) |
||||
|
||||
const ( |
||||
TencentCluster = "tencent_cluster" |
||||
AliyunCluster = "aliyun_cluster" |
||||
) |
||||
|
||||
type CmdRump struct { |
||||
sourceConn redis.Conn |
||||
targetConn redis.Conn |
||||
|
||||
keyChan chan *KeyNode // keyChan is used to communicated between routine1 and routine2
|
||||
resultChan chan *KeyNode // resultChan is used to communicated between routine2 and routine3
|
||||
} |
||||
|
||||
type KeyNode struct { |
||||
key string |
||||
value string |
||||
pttl int64 |
||||
} |
||||
|
||||
func (cr *CmdRump) GetDetailedInfo() []interface{} { |
||||
return nil |
||||
} |
||||
|
||||
func (cr *CmdRump) Main() { |
||||
// build connection
|
||||
cr.sourceConn = utils.OpenRedisConn(conf.Options.SourceAddress, conf.Options.SourceAuthType, |
||||
conf.Options.SourcePasswordRaw) |
||||
cr.targetConn = utils.OpenRedisConn(conf.Options.TargetAddress, conf.Options.TargetAuthType, |
||||
conf.Options.TargetPasswordRaw) |
||||
|
||||
// init two channels
|
||||
cr.keyChan = make(chan *KeyNode, conf.Options.ScanKeyNumber) |
||||
cr.resultChan = make(chan *KeyNode, conf.Options.ScanKeyNumber) |
||||
|
||||
/* |
||||
* we start 3 routines to run: |
||||
* 1. fetch keys from the source redis |
||||
* 2. write keys into the target redis |
||||
* 3. read result from the target redis |
||||
*/ |
||||
// routine1
|
||||
go cr.fetcher() |
||||
// routine2
|
||||
go cr.writer() |
||||
// routine3
|
||||
cr.receiver() |
||||
} |
||||
|
||||
func (cr *CmdRump) fetcher() { |
||||
length := 1 |
||||
if conf.Options.ScanSpecialCloud == TencentCluster { |
||||
length = len(conf.Options.ScanSpecialCloudTencentUrls) |
||||
} else if conf.Options.ScanSpecialCloud == AliyunCluster { |
||||
length = int(conf.Options.ScanSpecialCloudAliyunNodeNumber) |
||||
} |
||||
|
||||
// iterate all source nodes
|
||||
for i := 0; i < length; i++ { |
||||
var ( |
||||
cursor int64 |
||||
keys []string |
||||
values []interface{} |
||||
err error |
||||
) |
||||
|
||||
// fetch data from on node
|
||||
for { |
||||
switch conf.Options.ScanSpecialCloud { |
||||
case "": |
||||
values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT", |
||||
conf.Options.ScanKeyNumber)) |
||||
case TencentCluster: |
||||
values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT", |
||||
conf.Options.ScanKeyNumber, conf.Options.ScanSpecialCloudTencentUrls[i])) |
||||
case AliyunCluster: |
||||
values, err = redis.Values(cr.sourceConn.Do("ISCAN", i, cursor, "COUNT", |
||||
conf.Options.ScanKeyNumber)) |
||||
} |
||||
if err != nil && err != redis.ErrNil { |
||||
log.Panicf("scan with cursor[%v] failed[%v]", cursor, err) |
||||
} |
||||
|
||||
values, err = redis.Scan(values, &cursor, &keys) |
||||
if err != nil && err != redis.ErrNil { |
||||
log.Panicf("do scan with cursor[%v] failed[%v]", cursor, err) |
||||
} |
||||
|
||||
log.Info("scaned keys: ", len(keys)) |
||||
|
||||
// pipeline dump
|
||||
for _, key := range keys { |
||||
log.Debug("scan key: ", key) |
||||
cr.sourceConn.Send("DUMP", key) |
||||
} |
||||
dumps, err := redis.Strings(cr.sourceConn.Do("")) |
||||
if err != nil && err != redis.ErrNil { |
||||
log.Panicf("do dump with cursor[%v] failed[%v]", cursor, err) |
||||
} |
||||
|
||||
// pipeline ttl
|
||||
for _, key := range keys { |
||||
cr.sourceConn.Send("PTTL", key) |
||||
} |
||||
pttls, err := redis.Int64s(cr.sourceConn.Do("")) |
||||
if err != nil && err != redis.ErrNil { |
||||
log.Panicf("do ttl with cursor[%v] failed[%v]", cursor, err) |
||||
} |
||||
|
||||
for i, k := range keys { |
||||
cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]} |
||||
} |
||||
|
||||
// Last iteration of scan.
|
||||
if cursor == 0 { |
||||
break |
||||
} |
||||
} |
||||
} |
||||
|
||||
close(cr.keyChan) |
||||
} |
||||
|
||||
func (cr *CmdRump) writer() { |
||||
var count uint32 |
||||
for ele := range cr.keyChan { |
||||
if ele.pttl == -1 { // not set ttl
|
||||
ele.pttl = 0 |
||||
} |
||||
if ele.pttl == -2 { |
||||
log.Debugf("skip key %s for expired", ele.key) |
||||
continue |
||||
} |
||||
|
||||
log.Debugf("restore %s", ele.key) |
||||
if conf.Options.Rewrite { |
||||
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value, "REPLACE") |
||||
} else { |
||||
cr.targetConn.Send("RESTORE", ele.key, ele.pttl, ele.value) |
||||
} |
||||
|
||||
cr.resultChan <- ele |
||||
count++ |
||||
if count == conf.Options.ScanKeyNumber { |
||||
// batch
|
||||
log.Debugf("send keys %d\n", count) |
||||
cr.targetConn.Flush() |
||||
count = 0 |
||||
} |
||||
} |
||||
cr.targetConn.Flush() |
||||
close(cr.resultChan) |
||||
} |
||||
|
||||
func (cr *CmdRump) receiver() { |
||||
for ele := range cr.resultChan { |
||||
if _, err := cr.targetConn.Receive(); err != nil && err != redis.ErrNil { |
||||
log.Panicf("restore key[%v] with pttl[%v] error[%v]", ele.key, strconv.FormatInt(ele.pttl, 10), |
||||
err) |
||||
} |
||||
} |
||||
} |
Loading…
Reference in new issue