diff --git a/ChangeLog b/ChangeLog index ea5e4a5..977ab3c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,6 @@ +2019-05-16 Alibaba Cloud. + * VERSION: 1.6.2 + * BUGFIX: fix bug of `rump` mode only syncing db 0 data. 2019-05-14 Alibaba Cloud. * VERSION: 1.6.1 * IMPROVE: support fetching db address from sentinel, the failover diff --git a/README.md b/README.md index c8bf05b..aabf1fe 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,8 @@ Version rules: a.b.c. | improve-\* | improvement branch. forked from develop branch and then merge back after finish developing, testing, and code review. | Tag rules:
-Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0.2-20180628" +Add tag when releasing: "release-v{version}-{date}". for example: "release-v1.0.2-20180628"
+User can use `-version` to print the version. # Usage --- diff --git a/src/redis-shake/common/command.go b/src/redis-shake/common/command.go new file mode 100644 index 0000000..4f71df9 --- /dev/null +++ b/src/redis-shake/common/command.go @@ -0,0 +1,37 @@ +package utils + +import ( + "bytes" + "fmt" + "strconv" +) + +func ParseKeyspace(content []byte) (map[int32]int64, error) { + if bytes.HasPrefix(content, []byte("# Keyspace")) == false { + return nil, fmt.Errorf("invalid info Keyspace: %s", string(content)) + } + + lines := bytes.Split(content, []byte("\n")) + reply := make(map[int32]int64) + for _, line := range lines { + line = bytes.TrimSpace(line) + if bytes.HasPrefix(line, []byte("db")) == true { + // line "db0:keys=18,expires=0,avg_ttl=0" + items := bytes.Split(line, []byte(":")) + db, err := strconv.Atoi(string(items[0][2:])) + if err != nil { + return nil, err + } + nums := bytes.Split(items[1], []byte(",")) + if bytes.HasPrefix(nums[0], []byte("keys=")) == false { + return nil, fmt.Errorf("invalid info Keyspace: %s", string(content)) + } + keysNum, err := strconv.ParseInt(string(nums[0][5:]), 10, 0) + if err != nil { + return nil, err + } + reply[int32(db)] = int64(keysNum) + } // end true + } // end for + return reply, nil +} \ No newline at end of file diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 6948342..2b8c869 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -4,6 +4,7 @@ import ( "pkg/libs/log" "strconv" "sync" + "fmt" "redis-shake/common" "redis-shake/configure" @@ -87,47 +88,22 @@ func (cr *CmdRump) fetcher(idx int) { log.Panicf("fetch db node failed: length[%v], error[%v]", length, err) } - log.Infof("start fetcher with special-cloud[%v], length[%v]", conf.Options.ScanSpecialCloud, length) + log.Infof("start fetcher with special-cloud[%v], nodes[%v]", conf.Options.ScanSpecialCloud, length) // iterate all source nodes for i := 0; i < length; i++ { - // fetch data from on node - for { - keys, err := cr.scanners[idx].ScanKey(i) - if err != nil { - log.Panic(err) - } - - log.Info("scaned keys: ", len(keys)) - - if len(keys) != 0 { - // pipeline dump - for _, key := range keys { - log.Debug("scan key: ", key) - cr.sourceConn[idx].Send("DUMP", key) - } - dumps, err := redis.Strings(cr.sourceConn[idx].Do("")) - if err != nil && err != redis.ErrNil { - log.Panicf("do dump with failed[%v]", err) - } - - // pipeline ttl - for _, key := range keys { - cr.sourceConn[idx].Send("PTTL", key) - } - pttls, err := redis.Int64s(cr.sourceConn[idx].Do("")) - if err != nil && err != redis.ErrNil { - log.Panicf("do ttl with failed[%v]", err) - } - - for i, k := range keys { - cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]} - } - } + // fetch db number from 'info Keyspace' + dbNumber, err := cr.getSourceDbList(i) + if err != nil { + log.Panic(err) + } - // Last iteration of scan. - if cr.scanners[idx].EndNode() { - break + log.Infof("fetch node[%v] with db list: %v", i, dbNumber) + // iterate all db + for _, db := range dbNumber { + log.Infof("fetch node[%v] db[%v]", i, db) + if err := cr.doFetch(int(db), i); err != nil { + log.Panic(err) } } } @@ -174,3 +150,80 @@ func (cr *CmdRump) receiver() { } } } + +func (cr *CmdRump) getSourceDbList(id int) ([]int32, error) { + conn := cr.sourceConn[id] + if ret, err := conn.Do("info", "Keyspace"); err != nil { + return nil, err + } else if mp, err := utils.ParseKeyspace(ret.([]byte)); err != nil { + return nil, err + } else { + list := make([]int32, 0, len(mp)) + for key, val := range mp { + if val > 0 { + list = append(list, key) + } + } + return list, nil + } +} + +func (cr *CmdRump) doFetch(db, idx int) error { + // send 'select' command to both source and target + log.Infof("send source select db") + if _, err := cr.sourceConn[idx].Do("select", db); err != nil { + return err + } + + log.Infof("send target select db") + cr.targetConn.Flush() + if err := cr.targetConn.Send("select", db); err != nil { + return err + } + cr.targetConn.Flush() + + log.Infof("finish select db, start fetching node[%v] db[%v]", idx, db) + + for { + keys, err := cr.scanners[idx].ScanKey(idx) + if err != nil { + return err + } + + log.Info("scanned keys: ", len(keys)) + + if len(keys) != 0 { + // pipeline dump + for _, key := range keys { + log.Debug("scan key: ", key) + cr.sourceConn[idx].Send("DUMP", key) + } + dumps, err := redis.Strings(cr.sourceConn[idx].Do("")) + if err != nil && err != redis.ErrNil { + return fmt.Errorf("do dump with failed[%v]", err) + } + + // pipeline ttl + for _, key := range keys { + cr.sourceConn[idx].Send("PTTL", key) + } + pttls, err := redis.Int64s(cr.sourceConn[idx].Do("")) + if err != nil && err != redis.ErrNil { + return fmt.Errorf("do ttl with failed[%v]", err) + } + + for i, k := range keys { + cr.keyChan <- &KeyNode{k, dumps[i], pttls[i]} + } + } + + // Last iteration of scan. + if cr.scanners[idx].EndNode() { + break + } + } + + log.Infof("finish fetching node[%v] db[%v]", idx, db) + + return nil +} \ No newline at end of file diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 21ec967..603c2ac 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -95,7 +95,7 @@ func (cmd *CmdSync) Main() { } var wg sync.WaitGroup - wg.Add(len(conf.Options.SourceAddress)) + wg.Add(len(conf.Options.SourceAddressList)) for i := 0; i < int(conf.Options.SourceParallel); i++ { go func() {