From 07982e4a91d323c4152de08206fa126d32629e86 Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 24 Apr 2019 20:52:24 +0800 Subject: [PATCH] improve rump to better fetch data from aliyun_cluster and tencent_cluster. --- ChangeLog | 4 ++ conf/redis-shake.conf | 6 --- src/redis-shake/common/common.go | 14 +++++++ src/redis-shake/rump.go | 68 +++++++++++++++++++++++++++++--- 4 files changed, 80 insertions(+), 12 deletions(-) diff --git a/ChangeLog b/ChangeLog index ae8086a..c6075fb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +2019-04-24 Alibaba Cloud. + * VERSION: 1.4.1 + * IMPROVE: improve rump to better fetch data from aliyun_cluster and + tencent_cluster. 2019-04-21 Alibaba Cloud. * VERSION: 1.4.0 * FEATURE: support "rump" type to syncing data when `sync` and `psync` diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index 6148365..8e48e44 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -147,12 +147,6 @@ scan.key_number = 50 # 有些版本具有特殊的格式,与普通的scan命令有所不同,我们进行了特殊的适配。目前支持腾讯云的集群版"tencent_cluster" # 和阿里云的集群版"aliyun_cluster"。 scan.special_cloud = -# 如果源端是腾讯云的集群版,那么需要传入不同子节点的id(通过`cluster nodes`命令),以数组形式表示(分号分割)。 -# shake会"串行"进行遍历并抓取。例如:"25b21f1836026bd49c52b2d10e09fbf8c6aa1fdc;da6041781b5d7fe21404811d430cdffea2bf84de" -# 具体请参考:https://cloud.tencent.com/document/product/239/18336 中的"自定义命令"小节。 -scan.special_cloud.tencent.urls = -# 如果源端是阿里云的集群版,那么需要传入子节点的个数。例如:16 -scan.special_cloud.aliyun.node_number = # ----------------splitter---------------- # below variables are useless for current open source version so don't set. diff --git a/src/redis-shake/common/common.go b/src/redis-shake/common/common.go index 98f0794..24a1288 100644 --- a/src/redis-shake/common/common.go +++ b/src/redis-shake/common/common.go @@ -4,6 +4,7 @@ import ( "net" "fmt" "strings" + "bytes" "pkg/libs/bytesize" @@ -52,4 +53,17 @@ func RemoveRESPEnd(input string) string { return input[: length - 2] } return input +} + +func ParseInfo(content []byte) map[string]string { + result := make(map[string]string, 10) + lines := bytes.Split(content, []byte("\r\n")) + for i := 0; i < len(lines); i++ { + items := bytes.SplitN(lines[i], []byte(":"), 2) + if len(items) != 2 { + continue + } + result[string(items[0])] = string(items[1]) + } + return result } \ No newline at end of file diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index a03c882..d64d046 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -8,6 +8,8 @@ import ( "redis-shake/configure" "github.com/garyburd/redigo/redis" + "fmt" + "bytes" ) const ( @@ -58,14 +60,68 @@ func (cr *CmdRump) Main() { cr.receiver() } +/* + * get db node info when conf.Options.ScanSpecialCloud != "". + * return: + * int: node number. Used in aliyun_cluster + * []string: node lists. Used in tencent_cluster + */ +func (cr *CmdRump) getDbNode() (int, []string, error) { + switch conf.Options.ScanSpecialCloud { + case AliyunCluster: + info, err := redis.Bytes(cr.sourceConn.Do("info", "Cluster")) + if err != nil { + return -1, nil, err + } + + result := utils.ParseInfo(info) + if count, err := strconv.ParseInt(result["nodecount"], 10, 0); err != nil { + return -1, nil, err + } else if count <= 0 { + return -1, nil, fmt.Errorf("source node count[%v] illegal", count) + } else { + return int(count), nil, nil + } + case TencentCluster: + /* + * tencent cluster return: + * 10.1.1.1:2000> cluster nodes + * 25b21f1836026bd49c52b2d10e09fbf8c6aa1fdc 10.0.0.15:6379@11896 slave 36034e645951464098f40d339386e9d51a9d7e77 0 1531471918205 1 connected + * da6041781b5d7fe21404811d430cdffea2bf84de 10.0.0.15:6379@11170 master - 0 1531471916000 2 connected 10923-16383 + * 36034e645951464098f40d339386e9d51a9d7e77 10.0.0.15:6379@11541 myself,master - 0 1531471915000 1 connected 0-5460 + * 53f552fd8e43112ae68b10dada69d3af77c33649 10.0.0.15:6379@11681 slave da6041781b5d7fe21404811d430cdffea2bf84de 0 1531471917204 3 connected + * 18090a0e57cf359f9f8c8c516aa62a811c0f0f0a 10.0.0.15:6379@11428 slave ef3cf5e20e1a7cf5f9cc259ed488c82c4aa17171 0 1531471917000 2 connected + * ef3cf5e20e1a7cf5f9cc259ed488c82c4aa17171 10.0.0.15:6379@11324 master - 0 1531471916204 0 connected 5461-10922 + */ + info, err := redis.Bytes(cr.sourceConn.Do("cluster", "nodes")) + if err != nil { + return -1, nil, err + } + + lines := bytes.Split(info, []byte("\r\n")) + ret := make([]string, 0, len(lines) / 2) + master := []byte("master") + for _, row := range lines { + col := bytes.Split(row, []byte(" ")) + if len(col) >= 3 && bytes.Contains(col[2], master) { + ret = append(ret, string(col[0])) + } + } + return len(ret), ret, nil + default: + return 1, nil, nil + } +} + func (cr *CmdRump) fetcher() { - length := 1 - if conf.Options.ScanSpecialCloud == TencentCluster { - length = len(conf.Options.ScanSpecialCloudTencentUrls) - } else if conf.Options.ScanSpecialCloud == AliyunCluster { - length = int(conf.Options.ScanSpecialCloudAliyunNodeNumber) + length, list, err := cr.getDbNode() + if err != nil || length <= 0 { + log.Panicf("fetch db node failed: length[%v], list[%v], error[%v]", length, list, err) } + log.Infof("start fetcher with special-cloud[%v], length[%v], list[%v]", conf.Options.ScanSpecialCloud, + length, list) + // iterate all source nodes for i := 0; i < length; i++ { var ( @@ -83,7 +139,7 @@ func (cr *CmdRump) fetcher() { conf.Options.ScanKeyNumber)) case TencentCluster: values, err = redis.Values(cr.sourceConn.Do("SCAN", cursor, "COUNT", - conf.Options.ScanKeyNumber, conf.Options.ScanSpecialCloudTencentUrls[i])) + conf.Options.ScanKeyNumber, list[i])) case AliyunCluster: values, err = redis.Values(cr.sourceConn.Do("ISCAN", i, cursor, "COUNT", conf.Options.ScanKeyNumber))