Merge pull request #43 from alibaba/improve-1.4.1

improve rump to better fetch data from aliyun_cluster and tencent_clu…
v4
Vinllen Chen 6 years ago committed by GitHub
commit f126baffaf
  1. 4
      ChangeLog
  2. 6
      conf/redis-shake.conf
  3. 14
      src/redis-shake/common/common.go
  4. 68
      src/redis-shake/rump.go

@ -1,3 +1,7 @@
2019-04-24 Alibaba Cloud.
* VERSION: 1.4.1
* IMPROVE: improve rump to better fetch data from aliyun_cluster and
tencent_cluster.
2019-04-21 Alibaba Cloud.
* VERSION: 1.4.0
* FEATURE: support "rump" type to syncing data when `sync` and `psync`

@ -147,12 +147,6 @@ scan.key_number = 50
# 有些版本具有特殊的格式,与普通的scan命令有所不同,我们进行了特殊的适配。目前支持腾讯云的集群版"tencent_cluster"
# 和阿里云的集群版"aliyun_cluster"。
scan.special_cloud =
# 如果源端是腾讯云的集群版,那么需要传入不同子节点的id(通过`cluster nodes`命令),以数组形式表示(分号分割)。
# shake会"串行"进行遍历并抓取。例如:"25b21f1836026bd49c52b2d10e09fbf8c6aa1fdc;da6041781b5d7fe21404811d430cdffea2bf84de"
# 具体请参考:https://cloud.tencent.com/document/product/239/18336 中的"自定义命令"小节。
scan.special_cloud.tencent.urls =
# 如果源端是阿里云的集群版,那么需要传入子节点的个数。例如:16
scan.special_cloud.aliyun.node_number =
# ----------------splitter----------------
# below variables are useless for current open source version so don't set.

@ -4,6 +4,7 @@ import (
"net"
"fmt"
"strings"
"bytes"
"pkg/libs/bytesize"
@ -52,4 +53,17 @@ func RemoveRESPEnd(input string) string {
return input[: length - 2]
}
return input
}
func ParseInfo(content []byte) map[string]string {
result := make(map[string]string, 10)
lines := bytes.Split(content, []byte("\r\n"))
for i := 0; i < len(lines); i++ {
items := bytes.SplitN(lines[i], []byte(":"), 2)
if len(items) != 2 {
continue
}
result[string(items[0])] = string(items[1])
}
return result
}

@ -8,6 +8,8 @@ import (
"redis-shake/configure"
"github.com/garyburd/redigo/redis"
"fmt"
"bytes"
)
const (
@ -58,14 +60,68 @@ func (cr *CmdRump) Main() {
cr.receiver()
}
/*
* get db node info when conf.Options.ScanSpecialCloud != "".
* return:
* int: node number. Used in aliyun_cluster
* []string: node lists. Used in tencent_cluster
*/
func (cr *CmdRump) getDbNode() (int, []string, error) {
switch conf.Options.ScanSpecialCloud {
case AliyunCluster:
info, err := redis.Bytes(cr.sourceConn.Do("info", "Cluster"))
if err != nil {
return -1, nil, err
}
result := utils.ParseInfo(info)
if count, err := strconv.ParseInt(result["nodecount"], 10, 0); err != nil {
return -1, nil, err
} else if count <= 0 {
return -1, nil, fmt.Errorf("source node count[%v] illegal", count)
} else {
return int(count), nil, nil
}
case TencentCluster:
/*
* tencent cluster return:
* 10.1.1.1:2000> cluster nodes
* 25b21f1836026bd49c52b2d10e09fbf8c6aa1fdc 10.0.0.15:6379@11896 slave 36034e645951464098f40d339386e9d51a9d7e77 0 1531471918205 1 connected
* da6041781b5d7fe21404811d430cdffea2bf84de 10.0.0.15:6379@11170 master - 0 1531471916000 2 connected 10923-16383
* 36034e645951464098f40d339386e9d51a9d7e77 10.0.0.15:6379@11541 myself,master - 0 1531471915000 1 connected 0-5460
* 53f552fd8e43112ae68b10dada69d3af77c33649 10.0.0.15:6379@11681 slave da6041781b5d7fe21404811d430cdffea2bf84de 0 1531471917204 3 connected
* 18090a0e57cf359f9f8c8c516aa62a811c0f0f0a 10.0.0.15:6379@11428 slave ef3cf5e20e1a7cf5f9cc259ed488c82c4aa17171 0 1531471917000 2 connected
* ef3cf5e20e1a7cf5f9cc259ed488c82c4aa17171 10.0.0.15:6379@11324 master - 0 1531471916204 0 connected 5461-10922
*/
info, err := redis.Bytes(cr.sourceConn.Do("cluster", "nodes"))
if err != nil {
return -1, nil, err
}
lines := bytes.Split(info, []byte("\r\n"))
ret := make([]string, 0, len(lines) / 2)
master := []byte("master")
for _, row := range lines {
col := bytes.Split(row, []byte(" "))
if len(col) >= 3 && bytes.Contains(col[2], master) {
ret = append(ret, string(col[0]))
}
}
return len(ret), ret, nil
default:
return 1, nil, nil
}
}
func (cr *CmdRump) fetcher() {
length := 1
if conf.Options.ScanSpecialCloud == TencentCluster {
length = len(conf.Options.ScanSpecialCloudTencentUrls)
} else if conf.Options.ScanSpecialCloud == AliyunCluster {
length = int(conf.Options.ScanSpecialCloudAliyunNodeNumber)
length, list, err := cr.getDbNode()
if err != nil || length <= 0 {
log.Panicf("fetch db node failed: length[%v], list[%v], error[%v]", length, list, err)
}
log.Infof("start fetcher with special-cloud[%v], length[%v], list[%v]", conf.Options.ScanSpecialCloud,
length, list)
// iterate all source nodes
for i := 0; i < length; i++ {
var (
@ -83,7 +139,7 @@ func (cr *CmdRump) fetcher() {
conf.Options.ScanKeyNumber))
case TencentCluster:
values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT",
conf.Options.ScanKeyNumber, conf.Options.ScanSpecialCloudTencentUrls[i]))
conf.Options.ScanKeyNumber, list[i]))
case AliyunCluster:
values, err = redis.Values(cr.sourceConn.Do("ISCAN", i, cursor, "COUNT",
conf.Options.ScanKeyNumber))

Loading…
Cancel
Save