@ -1,12 +1,19 @@
package reader
import (
"strconv"
"strings"
"github.com/alibaba/RedisShake/internal/client"
"github.com/alibaba/RedisShake/internal/client/proto"
"github.com/alibaba/RedisShake/internal/entry"
"github.com/alibaba/RedisShake/internal/log"
"github.com/alibaba/RedisShake/internal/statistics"
"strconv"
)
const (
// cluster_enabled: Indicate Redis cluster is enabled. reference from https://redis.io/commands/info/
clusterMode = "cluster_enabled:1"
)
type dbKey struct {
@ -21,6 +28,7 @@ type scanReader struct {
// client for scan keys
clientScan * client . Redis
innerChannel chan * dbKey
isCluster bool
// client for dump keys
clientDump * client . Redis
@ -34,9 +42,17 @@ func NewScanReader(address string, username string, password string, isTls bool)
r . clientScan = client . NewRedisClient ( address , username , password , isTls )
r . clientDump = client . NewRedisClient ( address , username , password , isTls )
log . Infof ( "scanReader connected to redis successful. address=[%s]" , address )
r . isCluster = r . IsCluster ( )
return r
}
// IsCluster is for determining whether the server is in cluster mode.
func ( r * scanReader ) IsCluster ( ) bool {
reply := r . clientScan . DoWithStringReply ( "INFO" , "Cluster" )
return strings . Contains ( reply , clusterMode )
}
func ( r * scanReader ) StartRead ( ) chan * entry . Entry {
r . ch = make ( chan * entry . Entry , 1024 )
r . innerChannel = make ( chan * dbKey , 1024 )
@ -46,7 +62,12 @@ func (r *scanReader) StartRead() chan *entry.Entry {
}
func ( r * scanReader ) scan ( ) {
for dbId := 0 ; dbId < 16 ; dbId ++ {
scanDbIdUpper := 15
if r . isCluster {
log . Infof ( "scanReader node are in cluster mode, only scan db 0" )
scanDbIdUpper = 0
}
for dbId := 0 ; dbId <= scanDbIdUpper ; dbId ++ {
var cursor uint64 = 0
reply := r . clientScan . DoWithStringReply ( "SELECT" , strconv . Itoa ( dbId ) )