merge remote

v4
vinllen 5 years ago
commit aa343e6a4b
  1. 8
      ChangeLog
  2. 8
      src/redis-shake/common/cluster.go
  3. 2
      src/redis-shake/common/mix.go
  4. 21
      src/redis-shake/main/main.go
  5. 7
      src/redis-shake/restore.go
  6. 3
      src/redis-shake/rump.go
  7. 7
      src/redis-shake/sync.go

@ -1,3 +1,11 @@
2019-07-24 Alibaba Cloud.
* VERSION: 1.6.13
* IMPROVE: support `filter.db.whitelist` and `filter.db.blacklist` to let
different db syncing to db0 even when target type is cluster. see #127.
* BUGFIX: fix bug of connection url in automatic discovery in cluster. see
#124.
* IMPROVE: support `target.db` in rump mode.
* IMPROVE: add debug log in RDB syncing.
2019-07-11 Alibaba Cloud. 2019-07-11 Alibaba Cloud.
* VERSION: 1.6.12 * VERSION: 1.6.12
* IMPROVE: support filter key with whitelist and blacklist. * IMPROVE: support filter key with whitelist and blacklist.

@ -7,6 +7,10 @@ import (
"pkg/libs/log" "pkg/libs/log"
) )
const(
RecvChanSize = 4096
)
/* implement redigo.Conn(https://github.com/garyburd/redigo) /* implement redigo.Conn(https://github.com/garyburd/redigo)
* Embed redis-go-cluster(https://github.com/chasex/redis-go-cluster) * Embed redis-go-cluster(https://github.com/chasex/redis-go-cluster)
* The reason I create this struct is that redis-go-cluster isn't fulfill redigo.Conn * The reason I create this struct is that redis-go-cluster isn't fulfill redigo.Conn
@ -24,6 +28,10 @@ type reply struct {
} }
func NewClusterConn(clusterClient *redigoCluster.Cluster, recvChanSize int) redigo.Conn { func NewClusterConn(clusterClient *redigoCluster.Cluster, recvChanSize int) redigo.Conn {
if recvChanSize == 0 {
recvChanSize = RecvChanSize
}
return &ClusterConn{ return &ClusterConn{
client: clusterClient, client: clusterClient,
recvChan: make(chan reply, recvChanSize), recvChan: make(chan reply, recvChanSize),

@ -52,7 +52,7 @@ func Welcome() {
`______________________________ `______________________________
\ \ _ ______ | \ \ _ ______ |
\ \ / \___-=O'/|O'/__| \ \ / \___-=O'/|O'/__|
\ redis-shake, here we go !! \_______\ / | / ) \ RedisShake, here we go !! \_______\ / | / )
/ / '/-==__ _/__|/__=-| -GM / / '/-==__ _/__|/__=-| -GM
/ / * \ | | / / * \ | |
/ / (o) / / (o)

@ -328,13 +328,6 @@ func sanitizeOptions(tp string) error {
return fmt.Errorf("only one of 'filter.key.whitelist' and 'filter.key.blacklist' can be given") return fmt.Errorf("only one of 'filter.key.whitelist' and 'filter.key.blacklist' can be given")
} }
// if the target is "cluster", only allow pass db 0
if conf.Options.TargetType == conf.RedisTypeCluster {
conf.Options.FilterDBWhitelist = []string{"0"} // set whitelist = 0
conf.Options.FilterDBBlacklist = []string{} // reset blacklist
log.Info("the target redis type is cluster, only pass db0")
}
if len(conf.Options.FilterSlot) > 0 { if len(conf.Options.FilterSlot) > 0 {
for i, val := range conf.Options.FilterSlot { for i, val := range conf.Options.FilterSlot {
if _, err := strconv.Atoi(val); err != nil { if _, err := strconv.Atoi(val); err != nil {
@ -353,6 +346,20 @@ func sanitizeOptions(tp string) error {
conf.Options.TargetDB = v conf.Options.TargetDB = v
} }
// if the target is "cluster", only allow pass db 0
if conf.Options.TargetType == conf.RedisTypeCluster {
if conf.Options.TargetDB == -1 {
conf.Options.FilterDBWhitelist = []string{"0"} // set whitelist = 0
conf.Options.FilterDBBlacklist = []string{} // reset blacklist
log.Info("the target redis type is cluster, only pass db0")
} else if conf.Options.TargetDB == 0 {
log.Info("the target redis type is cluster, all db syncing to db0")
} else {
// > 0
return fmt.Errorf("target.db[%v] should in {-1, 0} when target type is cluster", conf.Options.TargetDB)
}
}
if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 { if conf.Options.HttpProfile < 0 || conf.Options.HttpProfile > 65535 {
return fmt.Errorf("HttpProfile[%v] should in [0, 65535]", conf.Options.HttpProfile) return fmt.Errorf("HttpProfile[%v] should in [0, 65535]", conf.Options.HttpProfile)
} else if conf.Options.HttpProfile == 0 { } else if conf.Options.HttpProfile == 0 {

@ -158,6 +158,9 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth
dr.ignore.Incr() dr.ignore.Incr()
} else { } else {
dr.nentry.Incr() dr.nentry.Incr()
log.Debugf("routine[%v] try restore key[%s] with value length[%v]", dr.id, e.Key, len(e.Value))
if conf.Options.TargetDB != -1 { if conf.Options.TargetDB != -1 {
if conf.Options.TargetDB != int(lastdb) { if conf.Options.TargetDB != int(lastdb) {
lastdb = uint32(conf.Options.TargetDB) lastdb = uint32(conf.Options.TargetDB)
@ -173,7 +176,11 @@ func (dr *dbRestorer) restoreRDBFile(reader *bufio.Reader, target []string, auth
if filter.FilterKey(string(e.Key)) { if filter.FilterKey(string(e.Key)) {
continue continue
} }
log.Debugf("routine[%v] start restoring key[%s] with value length[%v]", dr.id, e.Key, len(e.Value))
utils.RestoreRdbEntry(c, e) utils.RestoreRdbEntry(c, e)
log.Debugf("routine[%v] restore key[%s] ok", dr.id, e.Key)
} }
} }
}() }()

@ -339,6 +339,9 @@ func (dre *dbRumperExecutor) writer() {
log.Debugf("dbRumper[%v] executor[%v] skip key %s for expired", dre.rumperId, dre.executorId, ele.key) log.Debugf("dbRumper[%v] executor[%v] skip key %s for expired", dre.rumperId, dre.executorId, ele.key)
continue continue
} }
if conf.Options.TargetDB != -1 {
ele.db = conf.Options.TargetDB
}
log.Debugf("dbRumper[%v] executor[%v] restore[%s], length[%v]", dre.rumperId, dre.executorId, ele.key, log.Debugf("dbRumper[%v] executor[%v] restore[%s], length[%v]", dre.rumperId, dre.executorId, ele.key,
len(ele.value)) len(ele.value))

@ -414,6 +414,9 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type
ds.ignore.Incr() ds.ignore.Incr()
} else { } else {
ds.nentry.Incr() ds.nentry.Incr()
log.Debugf("dbSyncer[%v] try restore key[%s] with value length[%v]", ds.id, e.Key, len(e.Value))
if conf.Options.TargetDB != -1 { if conf.Options.TargetDB != -1 {
if conf.Options.TargetDB != int(lastdb) { if conf.Options.TargetDB != int(lastdb) {
lastdb = uint32(conf.Options.TargetDB) lastdb = uint32(conf.Options.TargetDB)
@ -438,7 +441,11 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type
continue continue
} }
} }
log.Debugf("dbSyncer[%v] start restoring key[%s] with value length[%v]", ds.id, e.Key, len(e.Value))
utils.RestoreRdbEntry(c, e) utils.RestoreRdbEntry(c, e)
log.Debugf("dbSyncer[%v] restore key[%s] ok", ds.id, e.Key)
} }
} }
}() }()

Loading…
Cancel
Save