diff --git a/ChangeLog b/ChangeLog index 2f996b4..90a3f6b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +2019-07-24 Alibaba Cloud. + * VERSION: 1.6.13 + * IMPROVE: support `filter.db.whitelist` and `filter.db.blacklist` to let + different db syncing to db0 even when target type is cluster. see #127. + * BUGFIX: fix bug of connection url in automatic discovery in cluster. see + #124. + * IMPROVE: support `target.db` in rump mode. + * IMPROVE: add debug log in RDB syncing. 2019-07-11 Alibaba Cloud. * VERSION: 1.6.12 * IMPROVE: support filter key with whitelist and blacklist. diff --git a/src/redis-shake/common/cluster.go b/src/redis-shake/common/cluster.go index a4cbeb7..8bf9df5 100644 --- a/src/redis-shake/common/cluster.go +++ b/src/redis-shake/common/cluster.go @@ -7,6 +7,10 @@ import ( "pkg/libs/log" ) +const( + RecvChanSize = 4096 +) + /* implement redigo.Conn(https://github.com/garyburd/redigo) * Embed redis-go-cluster(https://github.com/chasex/redis-go-cluster) * The reason I create this struct is that redis-go-cluster isn't fulfill redigo.Conn @@ -24,6 +28,10 @@ type reply struct { } func NewClusterConn(clusterClient *redigoCluster.Cluster, recvChanSize int) redigo.Conn { + if recvChanSize == 0 { + recvChanSize = RecvChanSize + } + return &ClusterConn{ client: clusterClient, recvChan: make(chan reply, recvChanSize), diff --git a/src/redis-shake/common/mix.go b/src/redis-shake/common/mix.go index a17daec..f3637d4 100644 --- a/src/redis-shake/common/mix.go +++ b/src/redis-shake/common/mix.go @@ -52,7 +52,7 @@ func Welcome() { `______________________________ \ \ _ ______ | \ \ / \___-=O'/|O'/__| - \ redis-shake, here we go !! \_______\ / | / ) + \ RedisShake, here we go !! \_______\ / | / ) / / '/-==__ _/__|/__=-| -GM / / * \ | | / / (o) diff --git a/src/redis-shake/main/main.go b/src/redis-shake/main/main.go index d899d5a..b91c146 100644 --- a/src/redis-shake/main/main.go +++ b/src/redis-shake/main/main.go @@ -328,13 +328,6 @@ func sanitizeOptions(tp string) error { return fmt.Errorf("only one of 'filter.key.whitelist' and 'filter.key.blacklist' can be given") } - // if the target is "cluster", only allow pass db 0 - if conf.Options.TargetType == conf.RedisTypeCluster { - conf.Options.FilterDBWhitelist = []string{"0"} // set whitelist = 0 - conf.Options.FilterDBBlacklist = []string{} // reset blacklist - log.Info("the target redis type is cluster, only pass db0") - } - if len(conf.Options.FilterSlot) > 0 { for i, val := range conf.Options.FilterSlot { if _, err := strconv.Atoi(val); err != nil { @@ -353,6 +346,20 @@ func sanitizeOptions(tp string) error { conf.Options.TargetDB = v } + // if the target is "cluster", only allow pass db 0 + if conf.Options.TargetType == conf.RedisTypeCluster { + if conf.Options.TargetDB == -1 { + conf.Options.FilterDBWhitelist = []string{"0"} // set whitelist = 0 + conf.Options.FilterDBBlacklist = []string{} // reset blacklist + log.Info("the target redis type is cluster, only pass db0") + } else if conf.Options.TargetDB == 0 { + log.Info("the target redis type is cluster, all db syncing to db0") + } else { + // > 0 + return fmt.Errorf("target.db[%v] should in {-1, 0} when target type is cluster", conf.Options.TargetDB) + } + } + if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 { return fmt.Errorf("HttpProfile[%v] should in [0, 65535]", conf.Options.HttpProfile) } else if conf.Options.HttpProfile == 0 { diff --git a/src/redis-shake/restore.go b/src/redis-shake/restore.go index ef1fef9..234f807 100644 --- a/src/redis-shake/restore.go +++ b/src/redis-shake/restore.go @@ -158,6 +158,9 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth dr.ignore.Incr() } else { dr.nentry.Incr() + + log.Debugf("routine[%v] try restore key[%s] with value length[%v]", dr.id, e.Key, len(e.Value)) + if conf.Options.TargetDB != -1 { if conf.Options.TargetDB != int(lastdb) { lastdb = uint32(conf.Options.TargetDB) @@ -173,7 +176,11 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth if filter.FilterKey(string(e.Key)) { continue } + + log.Debugf("routine[%v] start restoring key[%s] with value length[%v]", dr.id, e.Key, len(e.Value)) + utils.RestoreRdbEntry(c, e) + log.Debugf("routine[%v] restore key[%s] ok", dr.id, e.Key) } } }() diff --git a/src/redis-shake/rump.go b/src/redis-shake/rump.go index 5ad0624..2a0bb03 100644 --- a/src/redis-shake/rump.go +++ b/src/redis-shake/rump.go @@ -339,6 +339,9 @@ func (dre *dbRumperExecutor) writer() { log.Debugf("dbRumper[%v] executor[%v] skip key %s for expired", dre.rumperId, dre.executorId, ele.key) continue } + if conf.Options.TargetDB != -1 { + ele.db = conf.Options.TargetDB + } log.Debugf("dbRumper[%v] executor[%v] restore[%s], length[%v]", dre.rumperId, dre.executorId, ele.key, len(ele.value)) diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 1545695..a5a12ef 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -414,6 +414,9 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type ds.ignore.Incr() } else { ds.nentry.Incr() + + log.Debugf("dbSyncer[%v] try restore key[%s] with value length[%v]", ds.id, e.Key, len(e.Value)) + if conf.Options.TargetDB != -1 { if conf.Options.TargetDB != int(lastdb) { lastdb = uint32(conf.Options.TargetDB) @@ -438,7 +441,11 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type continue } } + + log.Debugf("dbSyncer[%v] start restoring key[%s] with value length[%v]", ds.id, e.Key, len(e.Value)) + utils.RestoreRdbEntry(c, e) + log.Debugf("dbSyncer[%v] restore key[%s] ok", ds.id, e.Key) } } }()