You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
5.7 KiB

package reader
import (
type ScanReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
KSN bool `mapstructure:"ksn" default:"false"`
DBS []int `mapstructure:"dbs"`
type dbKey struct {
db int
key string
type scanStandaloneReader struct {
dbs []int
opts *ScanReaderOptions
ch chan *entry.Entry
keyQueue *utils.UniqueQueue
stat struct {
Name string `json:"name"`
ScanFinished bool `json:"scan_finished"`
ScanDbId int `json:"scan_dbId"`
ScanCursor uint64 `json:"scan_cursor"`
ScanPercentByDbId string `json:"scan_percent"`
NeedUpdateCount int64 `json:"need_update_count"`
func NewScanStandaloneReader(opts *ScanReaderOptions) Reader {
r := new(scanStandaloneReader)
// dbs
c := client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node
r.dbs = []int{0}
} else {
if len(opts.DBS) == 0 {
r.dbs = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
} else {
r.dbs = opts.DBS
r.opts = opts = make(chan *entry.Entry, 1024)
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
r.keyQueue = utils.NewUniqueQueue(100000) // cache 100000 keys
return r
func (r *scanStandaloneReader) StartRead() chan *entry.Entry {
go r.scan()
go r.fetch()
func (r *scanStandaloneReader) subscript() {
if !r.opts.KSN {
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c.Send("psubscribe", "__keyevent@*__:*")
go func() {
_, err := c.Receive()
if err != nil {
regex := regexp.MustCompile(`\d+`)
for {
resp, err := c.Receive()
if err != nil {
key := resp.([]interface{})[3].(string)
dbId := regex.FindString(resp.([]interface{})[2].(string))
dbIdInt, err := strconv.Atoi(dbId)
if err != nil {
r.keyQueue.Put(dbKey{db: dbIdInt, key: key})
func (r *scanStandaloneReader) scan() {
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
for _, dbId := range r.dbs {
if dbId != 0 {
reply := c.DoWithStringReply("SELECT", strconv.Itoa(dbId))
if reply != "OK" {
log.Panicf("scanStandaloneReader select db failed. db=[%d]", dbId)
var cursor uint64 = 0
for {
var keys []string
cursor, keys = c.Scan(cursor)
for _, key := range keys {
r.keyQueue.Put(dbKey{dbId, key}) // pass value not pointer
// stat
r.stat.ScanCursor = cursor
r.stat.ScanDbId = dbId
r.stat.ScanPercentByDbId = fmt.Sprintf("%.2f%%", float64(bits.Reverse64(cursor))/float64(^uint(0))*100)
if cursor == 0 {
r.stat.ScanFinished = true
if !r.opts.KSN {
func (r *scanStandaloneReader) fetch() {
nowDbId := 0
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
for item := range r.keyQueue.Ch {
r.stat.NeedUpdateCount = int64(r.keyQueue.Len())
dbId := item.(dbKey).db
key := item.(dbKey).key
if nowDbId != dbId {
reply := c.DoWithStringReply("SELECT", strconv.Itoa(dbId))
if reply != "OK" {
log.Panicf("scanStandaloneReader select db failed. db=[%d]", dbId)
nowDbId = dbId
// dump
c.Send("DUMP", key)
c.Send("PTTL", key)
iDump, err1 := c.Receive()
iPttl, err2 := c.Receive()
if err1 == proto.Nil {
continue // key not exist
} else if err1 != nil {
} else if err2 != nil {
dump := iDump.(string)
pttl := int(iPttl.(int64))
if pttl == -2 {
continue // key not exist
if pttl == -1 {
pttl = 0 // -1 means no expire
if uint64(len(dump)) > config.Opt.Advanced.TargetRedisProtoMaxBulkLen {
log.Warnf("key=[%s] dump len=[%d] too large, split it. This is not a good practice in Redis.", key, len(dump))
typeByte := dump[0]
anotherReader := strings.NewReader(dump[1 : len(dump)-10])
o := types.ParseObject(anotherReader, typeByte, key)
cmds := o.Rewrite()
for _, cmd := range cmds {
e := entry.NewEntry()
e.DbId = dbId
e.Argv = cmd <- e
if pttl != 0 {
e := entry.NewEntry()
e.DbId = dbId
e.Argv = []string{"PEXPIRE", key, strconv.Itoa(pttl)} <- e
} else {
argv := []string{"RESTORE", key, strconv.Itoa(pttl), dump}
if config.Opt.Advanced.RDBRestoreCommandBehavior == "rewrite" {
argv = append(argv, "replace")
} <- &entry.Entry{
DbId: dbId,
Argv: argv,
log.Infof("[%s] scanStandaloneReader fetch finished.", r.stat.Name)
func (r *scanStandaloneReader) Status() interface{} {
return r.stat
func (r *scanStandaloneReader) StatusString() string {
if r.stat.ScanFinished {
return fmt.Sprintf("need_update_count=[%d]", r.stat.NeedUpdateCount)
return fmt.Sprintf("scan_dbid=[%d], scan_percent=[%s], need_update_count=[%d]", r.stat.ScanDbId, r.stat.ScanPercentByDbId, r.stat.NeedUpdateCount)
func (r *scanStandaloneReader) StatusConsistent() bool {
return r.stat.ScanFinished && r.stat.NeedUpdateCount == 0