docs: Update documentation and configuration files

v4
suxb201 1 year ago committed by suxb201
parent 945819f50d
commit eac40bd0fa
  1. 83
      cmd/redis-shake/main.go
  2. 46
      docs/.vitepress/config.ts
  3. 14
      docs/package-lock.json
  4. 5
      docs/package.json
  5. 3
      docs/src/index.md
  6. 5
      docs/src/zh/function/best_practices.md
  7. 0
      docs/src/zh/function/introduction.md
  8. 8
      internal/client/redis.go
  9. 2
      internal/entry/entry.go
  10. 6
      internal/log/func.go
  11. 1
      internal/log/init.go
  12. 10
      internal/rdb/rdb.go
  13. 19
      internal/reader/scan_cluster_reader.go
  14. 40
      internal/reader/scan_standalone_reader.go
  15. 19
      internal/reader/sync_cluster_reader.go
  16. 100
      internal/reader/sync_standalone_reader.go
  17. 22
      internal/status/entry_count.go
  18. 30
      internal/status/status.go
  19. 2
      internal/utils/file_rotate/aof_reader.go
  20. 2
      internal/utils/file_rotate/aof_writer.go
  21. 2
      internal/utils/ncpu.go
  22. 13
      internal/writer/redis_cluster_writer.go
  23. 5
      internal/writer/redis_standalone_writer.go
  24. 26
      shake.toml
  25. 8
      tests/cases/auth_acl.py
  26. 63
      tests/helpers/shake.py

@ -23,42 +23,34 @@ func main() {
// create reader
var theReader reader.Reader
if v.IsSet("sync_standalone_reader") {
opts := new(reader.SyncStandaloneReaderOptions)
if v.IsSet("sync_reader") {
opts := new(reader.SyncReaderOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("sync_standalone_reader", opts)
err := v.UnmarshalKey("sync_reader", opts)
if err != nil {
log.Panicf("failed to read the SyncReader config entry. err: %v", err)
}
theReader = reader.NewSyncStandaloneReader(opts)
log.Infof("create SyncStandaloneReader: %v", opts.Address)
} else if v.IsSet("sync_cluster_reader") {
opts := new(reader.SyncClusterReaderOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("sync_cluster_reader", opts)
if err != nil {
log.Panicf("failed to read the SyncReader config entry. err: %v", err)
if opts.Cluster {
theReader = reader.NewSyncClusterReader(opts)
log.Infof("create SyncClusterReader: %v", opts.Address)
} else {
theReader = reader.NewSyncStandaloneReader(opts)
log.Infof("create SyncStandaloneReader: %v", opts.Address)
}
theReader = reader.NewSyncClusterReader(opts)
log.Infof("create SyncClusterReader: %v", opts.Address)
} else if v.IsSet("scan_standalone_reader") {
opts := new(reader.ScanStandaloneReaderOptions)
} else if v.IsSet("scan_reader") {
opts := new(reader.ScanReaderOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("scan_standalone_reader", opts)
err := v.UnmarshalKey("scan_reader", opts)
if err != nil {
log.Panicf("failed to read the ScanReader config entry. err: %v", err)
}
theReader = reader.NewScanStandaloneReader(opts)
log.Infof("create ScanStandaloneReader: %v", opts.Address)
} else if v.IsSet("scan_cluster_reader") {
opts := new(reader.ScanClusterReaderOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("scan_cluster_reader", opts)
if err != nil {
log.Panicf("failed to read the ScanReader config entry. err: %v", err)
if opts.Cluster {
theReader = reader.NewScanClusterReader(opts)
log.Infof("create ScanClusterReader: %v", opts.Address)
} else {
theReader = reader.NewScanStandaloneReader(opts)
log.Infof("create ScanStandaloneReader: %v", opts.Address)
}
theReader = reader.NewScanClusterReader(opts)
log.Infof("create ScanClusterReader: %v", opts.Address)
} else if v.IsSet("rdb_reader") {
opts := new(reader.RdbReaderOptions)
defaults.SetDefaults(opts)
@ -74,24 +66,20 @@ func main() {
// create writer
var theWriter writer.Writer
if v.IsSet("redis_standalone_writer") {
opts := new(writer.RedisStandaloneWriterOptions)
if v.IsSet("redis_writer") {
opts := new(writer.RedisWriterOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("redis_standalone_writer", opts)
err := v.UnmarshalKey("redis_writer", opts)
if err != nil {
log.Panicf("failed to read the RedisStandaloneWriter config entry. err: %v", err)
}
theWriter = writer.NewRedisStandaloneWriter(opts)
log.Infof("create RedisStandaloneWriter: %v", opts.Address)
} else if v.IsSet("redis_cluster_writer") {
opts := new(writer.RedisClusterWriterOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("redis_cluster_writer", opts)
if err != nil {
log.Panicf("failed to read the RedisClusterWriter config entry. err: %v", err)
if opts.Cluster {
theWriter = writer.NewRedisClusterWriter(opts)
log.Infof("create RedisClusterWriter: %v", opts.Address)
} else {
theWriter = writer.NewRedisStandaloneWriter(opts)
log.Infof("create RedisStandaloneWriter: %v", opts.Address)
}
theWriter = writer.NewRedisClusterWriter(opts)
log.Infof("create RedisClusterWriter: %v", opts.Address)
} else {
log.Panicf("no writer config entry found")
}
@ -99,22 +87,23 @@ func main() {
// create status
status.Init(theReader, theWriter)
log.Infof("start syncing...")
ch := theReader.StartRead()
for e := range ch {
// calc arguments
e.Preprocess()
e.Parse()
status.AddReadCount(e.CmdName)
// filter
log.Debugf("function before: %v", e)
entries := function.RunFunction(e)
log.Debugf("function after: %v", entries)
if len(entries) == 0 {
status.AddEntryCount(e.CmdName, false)
} else {
for _, entry := range entries {
theWriter.Write(entry)
status.AddEntryCount(entry.CmdName, true)
}
for _, entry := range entries {
entry.Parse()
theWriter.Write(entry)
status.AddWriteCount(entry.CmdName)
}
}

@ -19,21 +19,55 @@ export default defineConfig({
],
sidebar: [
{
text: '基础教程',
text: '介绍',
items: [
{ text: 'RedisShake 简介', link: '/zh/guide/getting-started' },
{ text: '什么是 RedisShake', link: '/zh/guide/getting-started' },
{ text: '快速上手', link: '/zh/guide/getting-started' },
{ text: '配置文件', link: '/zh/guide/config' }
{ text: '配置', link: '/zh/guide/getting-started' },
{ text: '迁移模式选择', link: '/zh/guide/getting-started' },
]
},
{
text: '变换/过滤',
text: 'Reader',
items: [
{ text: '上手使用', link: '/zh/transform/getting-started' },
{ text: '样例', link: '/zh/transform/examples' }
{ text: 'Sync Reader', link: '/zh/function/introduction' },
{ text: 'Scan Reader', link: '/zh/function/best_practices' },
{ text: 'RDB Reader', link: '/zh/function/best_practices' },
]
},
{
text: 'Writer',
items: [
{ text: 'Redis Writer', link: '/zh/function/introduction' },
]
},
{
text: 'function',
items: [
{ text: '什么是 function', link: '/zh/function/introduction' },
{
text: '最佳实践',
items: [
{ text: '监控', link: '/zh/function/best_practices' },
]
}
]
},
{
text: '进阶用法',
items: [
{ text: '监控', link: '/zh/function/best_practices' },
{ text: '双向同步', link: '/zh/function/best_practices' },
{ text: '容器部署', link: '/zh/function/best_practices' },
{ text: '主从实例向集群实例迁移', link: '/zh/function/best_practices' },
{ text: '大 key 重写', link: '/zh/function/best_practices' },
]
}
],
footer: {
message: 'Released under the MIT License.',
copyright: 'Copyright © 2019-present Tair'
}
}
},
en: {

@ -5,7 +5,7 @@
"packages": {
"": {
"devDependencies": {
"vitepress": "^1.0.0-alpha.75"
"vitepress": "^1.0.0-rc.4"
}
},
"node_modules/@algolia/autocomplete-core": {
@ -1220,23 +1220,23 @@
}
},
"node_modules/vitepress": {
"version": "1.0.0-beta.7",
"resolved": "https://registry.npmjs.org/vitepress/-/vitepress-1.0.0-beta.7.tgz",
"integrity": "sha512-P9Rw+FXatKIU4fVdtKxqwHl6fby8E/8zE3FIfep6meNgN4BxbWqoKJ6yfuuQQR9IrpQqwnyaBh4LSabyll6tWg==",
"version": "1.0.0-rc.4",
"resolved": "https://registry.npmjs.org/vitepress/-/vitepress-1.0.0-rc.4.tgz",
"integrity": "sha512-JCQ89Bm6ECUTnyzyas3JENo00UDJeK8q1SUQyJYou+4Yz5BKEc/F3O21cu++DnUT2zXc0kvQ2Aj4BZCc/nioXQ==",
"dev": true,
"dependencies": {
"@docsearch/css": "^3.5.1",
"@docsearch/js": "^3.5.1",
"@vitejs/plugin-vue": "^4.2.3",
"@vue/devtools-api": "^6.5.0",
"@vueuse/core": "^10.2.1",
"@vueuse/integrations": "^10.2.1",
"@vueuse/core": "^10.3.0",
"@vueuse/integrations": "^10.3.0",
"body-scroll-lock": "4.0.0-beta.0",
"focus-trap": "^7.5.2",
"mark.js": "8.11.1",
"minisearch": "^6.1.0",
"shiki": "^0.14.3",
"vite": "^4.4.7",
"vite": "^4.4.9",
"vue": "^3.3.4"
},
"bin": {

@ -1,10 +1,11 @@
{
"type": "module",
"devDependencies": {
"vitepress": "^1.0.0-alpha.75"
"vitepress": "^1.0.0-rc.4"
},
"scripts": {
"docs:dev": "vitepress dev",
"docs:build": "vitepress build",
"docs:preview": "vitepress preview"
}
}
}

@ -10,9 +10,6 @@ hero:
- theme: brand
text: 快速上手
link: /zh/guide/getting-started
# - theme: alt
# text: 云原生内存数据库Tair
# link: https://www.aliyun.com/product/apsaradb/kvstore/tair
features:
- title: 数据迁移

@ -1,6 +1,7 @@
---
outline: deep
---
# 上手使用
TODO
# 最佳实践

@ -101,6 +101,14 @@ func (r *Redis) Receive() (interface{}, error) {
return r.protoReader.ReadReply()
}
func (r *Redis) ReceiveString() string {
reply, err := r.Receive()
if err != nil {
log.Panicf(err.Error())
}
return reply.(string)
}
func (r *Redis) BufioReader() *bufio.Reader {
return r.reader
}

@ -50,7 +50,7 @@ func (e *Entry) Serialize() []byte {
return buf.Bytes()
}
func (e *Entry) Preprocess() {
func (e *Entry) Parse() {
e.CmdName, e.Group, e.Keys = commands.CalcKeys(e.Argv)
e.Slots = commands.CalcSlots(e.Keys)
}

@ -1,7 +1,6 @@
package log
import (
"fmt"
"github.com/go-stack/stack"
"os"
)
@ -20,10 +19,9 @@ func Warnf(format string, args ...interface{}) {
func Panicf(format string, args ...interface{}) {
frames := stack.Trace().TrimRuntime()
msgs := fmt.Sprintf(format, args...)
for _, frame := range frames {
msgs += fmt.Sprintf("\n%+v -> %n()", frame, frame)
logger.Warn().Msgf("%+v -> %n()", frame, frame)
}
logger.Error().Msg(msgs)
logger.Error().Msgf(format, args...)
os.Exit(1)
}

@ -11,7 +11,6 @@ import (
var logger zerolog.Logger
func Init(level string, file string) {
// log level
switch level {
case "debug":

@ -85,7 +85,7 @@ func (ld *Loader) ParseRDB() int {
if err != nil {
log.Panicf(err.Error())
}
log.Infof("[%s] RDB version: %d", ld.name, version)
log.Debugf("[%s] RDB version: %d", ld.name, version)
// read entries
ld.parseRDBEntry(rd)
@ -125,19 +125,19 @@ func (ld *Loader) parseRDBEntry(rd *bufio.Reader) {
if err != nil {
log.Panicf(err.Error())
}
log.Infof("[%s] RDB repl-stream-db: [%s]", ld.name, value)
log.Debugf("[%s] RDB repl-stream-db: [%s]", ld.name, value)
} else if key == "lua" {
e := entry.NewEntry()
e.Argv = []string{"script", "load", value}
ld.ch <- e
log.Infof("[%s] LUA script: [%s]", ld.name, value)
log.Debugf("[%s] LUA script: [%s]", ld.name, value)
} else {
log.Infof("[%s] RDB AUX: key=[%s], value=[%s]", ld.name, key, value)
log.Debugf("[%s] RDB AUX: key=[%s], value=[%s]", ld.name, key, value)
}
case kFlagResizeDB:
dbSize := structure.ReadLength(rd)
expireSize := structure.ReadLength(rd)
log.Infof("[%s] RDB resize db: db_size=[%d], expire_size=[%d]", ld.name, dbSize, expireSize)
log.Debugf("[%s] RDB resize db: db_size=[%d], expire_size=[%d]", ld.name, dbSize, expireSize)
case kFlagExpireMs:
ld.expireMs = int64(structure.ReadUint64(rd)) - time.Now().UnixMilli()
if ld.expireMs < 0 {

@ -3,26 +3,21 @@ package reader
import (
"RedisShake/internal/entry"
"RedisShake/internal/utils"
"fmt"
"sync"
)
type ScanClusterReaderOptions struct {
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
}
type scanClusterReader struct {
readers []Reader
readers []Reader
statusId int
}
func NewScanClusterReader(opts *ScanClusterReaderOptions) Reader {
func NewScanClusterReader(opts *ScanReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
rd := &scanClusterReader{}
for _, address := range addresses {
rd.readers = append(rd.readers, NewScanStandaloneReader(&ScanStandaloneReaderOptions{
rd.readers = append(rd.readers, NewScanStandaloneReader(&ScanReaderOptions{
Address: address,
Username: opts.Username,
Password: opts.Password,
@ -60,7 +55,9 @@ func (rd *scanClusterReader) Status() interface{} {
}
func (rd *scanClusterReader) StatusString() string {
return "scanClusterReader"
rd.statusId += 1
rd.statusId %= len(rd.readers)
return fmt.Sprintf("src-%d, %s", rd.statusId, rd.readers[rd.statusId].StatusString())
}
func (rd *scanClusterReader) StatusConsistent() bool {

@ -18,24 +18,17 @@ type dbKey struct {
isSelect bool
}
type ScanStandaloneReaderOptions struct {
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
}
type scanStandaloneReader struct {
isCluster bool
ch chan *entry.Entry
// client for scan keys
clientScan *client.Redis
innerChannel chan *dbKey
clientScan *client.Redis
// client for dump keys
keysNeedFetch chan *dbKey
clientDump *client.Redis
clientDumpDbid int
ch chan *entry.Entry
stat struct {
Name string `json:"name"`
@ -46,18 +39,27 @@ type scanStandaloneReader struct {
}
}
func NewScanStandaloneReader(opts *ScanStandaloneReaderOptions) Reader {
type ScanReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
KSN bool `mapstructure:"ksn" default:"false"`
}
func NewScanStandaloneReader(opts *ScanReaderOptions) Reader {
r := new(scanStandaloneReader)
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
r.clientScan = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
r.clientDump = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
r.isCluster = r.clientScan.IsCluster()
r.isCluster = r.clientScan.IsCluster() // not use opts.Cluster, because user may use standalone mode to scan a cluster node
return r
}
func (r *scanStandaloneReader) StartRead() chan *entry.Entry {
r.ch = make(chan *entry.Entry, 1024)
r.innerChannel = make(chan *dbKey, 1024)
r.keysNeedFetch = make(chan *dbKey, 1024)
go r.scan()
go r.fetch()
return r.ch
@ -77,7 +79,7 @@ func (r *scanStandaloneReader) scan() {
}
r.clientDump.Send("SELECT", strconv.Itoa(dbId))
r.innerChannel <- &dbKey{dbId, "", true}
r.keysNeedFetch <- &dbKey{dbId, "", true}
}
var cursor uint64 = 0
@ -87,7 +89,7 @@ func (r *scanStandaloneReader) scan() {
for _, key := range keys {
r.clientDump.Send("DUMP", key)
r.clientDump.Send("PTTL", key)
r.innerChannel <- &dbKey{dbId, key, false}
r.keysNeedFetch <- &dbKey{dbId, key, false}
}
// stat
@ -101,12 +103,12 @@ func (r *scanStandaloneReader) scan() {
}
}
r.stat.Finished = true
close(r.innerChannel)
close(r.keysNeedFetch)
}
func (r *scanStandaloneReader) fetch() {
var id uint64 = 0
for item := range r.innerChannel {
for item := range r.keysNeedFetch {
if item.isSelect {
// select
receive, err := client.String(r.clientDump.Receive())
@ -158,9 +160,9 @@ func (r *scanStandaloneReader) Status() interface{} {
func (r *scanStandaloneReader) StatusString() string {
if r.stat.Finished {
return fmt.Sprintf("[%s] finished", r.stat.Name)
return fmt.Sprintf("finished")
}
return fmt.Sprintf("[%s] dbid: [%d], percent: [%s]", r.stat.Name, r.stat.DbId, r.stat.PercentByDbId)
return fmt.Sprintf("dbid=[%d], percent=[%s]", r.stat.DbId, r.stat.PercentByDbId)
}
func (r *scanStandaloneReader) StatusConsistent() bool {

@ -4,21 +4,16 @@ import (
"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/utils"
"fmt"
"sync"
)
type SyncClusterReaderOptions struct {
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
}
type syncClusterReader struct {
readers []Reader
readers []Reader
statusId int
}
func NewSyncClusterReader(opts *SyncClusterReaderOptions) Reader {
func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
log.Debugf("get redis cluster nodes:")
for _, address := range addresses {
@ -26,7 +21,7 @@ func NewSyncClusterReader(opts *SyncClusterReaderOptions) Reader {
}
rd := &syncClusterReader{}
for _, address := range addresses {
rd.readers = append(rd.readers, NewSyncStandaloneReader(&SyncStandaloneReaderOptions{
rd.readers = append(rd.readers, NewSyncStandaloneReader(&SyncReaderOptions{
Address: address,
Username: opts.Username,
Password: opts.Password,
@ -64,7 +59,9 @@ func (rd *syncClusterReader) Status() interface{} {
}
func (rd *syncClusterReader) StatusString() string {
return "syncClusterReader"
rd.statusId += 1
rd.statusId %= len(rd.readers)
return fmt.Sprintf("src-%d, %s", rd.statusId, rd.readers[rd.statusId].StatusString())
}
func (rd *syncClusterReader) StatusConsistent() bool {

@ -19,13 +19,24 @@ import (
"time"
)
type SyncStandaloneReaderOptions struct {
type SyncReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
}
type State string
const (
kHandShake State = "hand shaking"
kWaitBgsave State = "waiting bgsave"
kReceiveRdb State = "receiving rdb"
kSyncRdb State = "syncing rdb"
kSyncAof State = "syncing aof"
)
type syncStandaloneReader struct {
client *client.Redis
@ -40,12 +51,12 @@ type syncStandaloneReader struct {
Dir string `json:"dir"`
// status
Status string `json:"status"`
Status State `json:"status"`
// rdb info
RdbFilePath string `json:"rdb_file_path"`
RdbFileSizeBytes int64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbFIleSizeHuman string `json:"rdb_file_size_human"`
RdbFileSizeHuman string `json:"rdb_file_size_human"`
RdbReceivedBytes int64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbReceivedHuman string `json:"rdb_received_human"`
RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan
@ -59,13 +70,13 @@ type syncStandaloneReader struct {
}
}
func NewSyncStandaloneReader(opts *SyncStandaloneReaderOptions) Reader {
func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader {
r := new(syncStandaloneReader)
r.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
r.rd = r.client.BufioReader()
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
r.stat.Address = opts.Address
r.stat.Status = "init"
r.stat.Status = kHandShake
r.stat.Dir = utils.GetAbsPath(r.stat.Name)
utils.CreateEmptyDir(r.stat.Dir)
return r
@ -81,6 +92,7 @@ func (r *syncStandaloneReader) StartRead() chan *entry.Entry {
startOffset := r.stat.AofReceivedOffset
go r.receiveAOF(r.rd)
r.sendRDB()
r.stat.Status = kSyncAof
r.sendAOF(startOffset)
}()
@ -109,34 +121,16 @@ func (r *syncStandaloneReader) sendPSync() {
r.client.Send(argv...)
// format: \n\n\n+<reply>\r\n
for true { // TODO better way to parse psync reply
// \n\n\n+
b, err := r.rd.ReadByte()
for {
bytes, err := r.rd.Peek(1)
if err != nil {
log.Panicf(err.Error())
}
if b == '\n' {
continue
if bytes[0] != '\n' {
break
}
if b == '-' {
reply, err := r.rd.ReadString('\n')
if err != nil {
log.Panicf(err.Error())
}
reply = strings.TrimSpace(reply)
log.Panicf("psync error. name=[%s], reply=[%s]", r.stat.Name, reply)
}
if b != '+' {
log.Panicf("invalid psync reply. name=[%s], b=[%s]", r.stat.Name, string(b))
}
break
}
reply, err := r.rd.ReadString('\n')
if err != nil {
log.Panicf(err.Error())
}
reply = strings.TrimSpace(reply)
reply := r.client.ReceiveString()
masterOffset, err := strconv.Atoi(strings.Split(reply, " ")[2])
if err != nil {
log.Panicf(err.Error())
@ -145,16 +139,16 @@ func (r *syncStandaloneReader) sendPSync() {
}
func (r *syncStandaloneReader) receiveRDB() {
log.Infof("[%s] source db is doing bgsave.", r.stat.Name)
r.stat.Status = "source db is doing bgsave"
log.Debugf("[%s] source db is doing bgsave.", r.stat.Name)
r.stat.Status = kWaitBgsave
timeStart := time.Now()
// format: \n\n\n$<length>\r\n<rdb>
for true {
for {
b, err := r.rd.ReadByte()
if err != nil {
log.Panicf(err.Error())
}
if b == '\n' {
if b == '\n' { // heartbeat
continue
}
if b != '$' {
@ -162,7 +156,7 @@ func (r *syncStandaloneReader) receiveRDB() {
}
break
}
log.Infof("[%s] source db bgsave finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds())
log.Debugf("[%s] source db bgsave finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds())
lengthStr, err := r.rd.ReadString('\n')
if err != nil {
log.Panicf(err.Error())
@ -172,23 +166,24 @@ func (r *syncStandaloneReader) receiveRDB() {
if err != nil {
log.Panicf(err.Error())
}
log.Infof("[%s] rdb length=[%d]", r.stat.Name, length)
log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length)))
r.stat.RdbFileSizeBytes = length
r.stat.RdbFIleSizeHuman = humanize.IBytes(uint64(length))
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length))
// create rdb file
r.stat.RdbFilePath, err = filepath.Abs(r.stat.Name + "/dump.rdb")
if err != nil {
log.Panicf(err.Error())
}
log.Infof("[%s] start receiving RDB. path=[%s]", r.stat.Name, r.stat.RdbFilePath)
timeStart = time.Now()
log.Debugf("[%s] start receiving RDB. path=[%s]", r.stat.Name, r.stat.RdbFilePath)
rdbFileHandle, err := os.OpenFile(r.stat.RdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
log.Panicf(err.Error())
}
// receive rdb
r.stat.Status = fmt.Sprintf("[%s]: receiving RDB", r.stat.Name)
r.stat.Status = kReceiveRdb
remainder := length
const bufSize int64 = 32 * 1024 * 1024 // 32MB
buf := make([]byte, bufSize)
@ -214,11 +209,11 @@ func (r *syncStandaloneReader) receiveRDB() {
if err != nil {
log.Panicf(err.Error())
}
log.Infof("[%s] save RDB finished.", r.stat.Name)
log.Debugf("[%s] save RDB finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds())
}
func (r *syncStandaloneReader) receiveAOF(rd io.Reader) {
log.Infof("[%s] start receiving aof data, and save to file", r.stat.Name)
log.Debugf("[%s] start receiving aof data, and save to file", r.stat.Name)
aofWriter := rotate.NewAOFWriter(r.stat.Name, r.stat.Dir, r.stat.AofReceivedOffset)
defer aofWriter.Close()
buf := make([]byte, 16*1024) // 16KB is enough for writing file
@ -236,15 +231,15 @@ func (r *syncStandaloneReader) receiveAOF(rd io.Reader) {
func (r *syncStandaloneReader) sendRDB() {
// start parse rdb
log.Infof("[%s] start sending RDB to target", r.stat.Name)
r.stat.Status = fmt.Sprintf("[%s]: sending RDB to target", r.stat.Name)
log.Debugf("[%s] start sending RDB to target", r.stat.Name)
r.stat.Status = kSyncRdb
updateFunc := func(offset int64) {
r.stat.RdbSentBytes = offset
r.stat.RdbSentHuman = humanize.IBytes(uint64(offset))
}
rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, r.stat.RdbFilePath, r.ch)
r.DbId = rdbLoader.ParseRDB()
log.Infof("[%s] send RDB finished", r.stat.Name)
log.Debugf("[%s] send RDB finished", r.stat.Name)
}
func (r *syncStandaloneReader) sendAOF(offset int64) {
@ -264,12 +259,23 @@ func (r *syncStandaloneReader) sendAOF(offset int64) {
r.DbId = DbId
continue
}
// ping
if strings.EqualFold(argv[0], "ping") {
continue
}
// replconf @AWS
if strings.EqualFold(argv[0], "replconf") {
continue
}
// opinfo @Aliyun
if strings.EqualFold(argv[0], "opinfo") {
continue
}
e := entry.NewEntry()
e.Argv = argv
e.DbId = r.DbId
r.ch <- e
r.stat.Status = fmt.Sprintf("[%s]: sending aof to target", r.stat.Name)
}
}
@ -287,7 +293,13 @@ func (r *syncStandaloneReader) Status() interface{} {
}
func (r *syncStandaloneReader) StatusString() string {
return r.stat.Status
if r.stat.Status == kSyncRdb {
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbSentHuman, r.stat.RdbFileSizeHuman)
}
if r.stat.Status == kSyncAof {
return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset)
}
return string(r.stat.Status)
}
func (r *syncStandaloneReader) StatusConsistent() bool {

@ -6,14 +6,14 @@ import (
)
type EntryCount struct {
Allow uint64 `json:"allow"`
Disallow uint64 `json:"disallow"`
AllowOps float64 `json:"allow_ops"`
DisallowOps float64 `json:"disallow_ops"`
ReadCount uint64 `json:"read_count"`
ReadOps float64 `json:"read_ops"`
WriteCount uint64 `json:"write_count"`
WriteOps float64 `json:"write_ops"`
// update ops
lastAllow uint64
lastDisallow uint64
lastReadCount uint64
lastWriteCount uint64
lastUpdateTimestampSec float64
}
@ -22,14 +22,14 @@ func (e *EntryCount) updateOPS() {
nowTimestampSec := float64(time.Now().UnixNano()) / 1e9
if e.lastUpdateTimestampSec != 0 {
timeIntervalSec := nowTimestampSec - e.lastUpdateTimestampSec
e.AllowOps = float64(e.Allow-e.lastAllow) / timeIntervalSec
e.DisallowOps = float64(e.Disallow-e.lastDisallow) / timeIntervalSec
e.lastAllow = e.Allow
e.lastDisallow = e.Disallow
e.ReadOps = float64(e.ReadCount-e.lastReadCount) / timeIntervalSec
e.WriteOps = float64(e.WriteCount-e.lastWriteCount) / timeIntervalSec
e.lastReadCount = e.ReadCount
e.lastWriteCount = e.WriteCount
}
e.lastUpdateTimestampSec = nowTimestampSec
}
func (e *EntryCount) String() string {
return fmt.Sprintf("allow: %.2fops/s, disallow: %.2fops/s", e.AllowOps, e.DisallowOps)
return fmt.Sprintf("read_count=[%d], read_ops=[%.2f], write_count=[%d], write_ops=[%.2f]", e.ReadCount, e.ReadOps, e.WriteCount, e.WriteOps)
}

@ -29,7 +29,7 @@ var stat = new(Stat)
var theReader Statusable
var theWriter Statusable
func AddEntryCount(cmd string, allow bool) {
func AddReadCount(cmd string) {
ch <- func() {
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
@ -39,13 +39,24 @@ func AddEntryCount(cmd string, allow bool) {
cmdEntryCount = EntryCount{}
stat.PerCmdEntriesCount[cmd] = cmdEntryCount
}
if allow {
stat.TotalEntriesCount.Allow += 1
cmdEntryCount.Allow += 1
} else {
stat.TotalEntriesCount.Disallow += 1
cmdEntryCount.Disallow += 1
stat.TotalEntriesCount.ReadCount += 1
cmdEntryCount.ReadCount += 1
stat.PerCmdEntriesCount[cmd] = cmdEntryCount
}
}
func AddWriteCount(cmd string) {
ch <- func() {
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}
cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd]
if !ok {
cmdEntryCount = EntryCount{}
stat.PerCmdEntriesCount[cmd] = cmdEntryCount
}
stat.TotalEntriesCount.WriteCount += 1
cmdEntryCount.WriteCount += 1
stat.PerCmdEntriesCount[cmd] = cmdEntryCount
}
}
@ -92,10 +103,7 @@ func Init(r Statusable, w Statusable) {
select {
case <-ticker.C:
ch <- func() {
log.Infof("%s, %s, %s",
stat.TotalEntriesCount.String(),
theReader.StatusString(),
theWriter.StatusString())
log.Infof("%s, %s", stat.TotalEntriesCount.String(), theReader.StatusString())
}
}
}

@ -35,7 +35,7 @@ func (r *AOFReader) openFile(offset int64) {
}
r.offset = offset
r.pos = 0
log.Infof("[%s] open file for read. filename=[%s]", r.name, r.filepath)
log.Debugf("[%s] open file for read. filename=[%s]", r.name, r.filepath)
}
func (r *AOFReader) readNextFile(offset int64) {

@ -35,7 +35,7 @@ func (w *AOFWriter) openFile(offset int64) {
}
w.offset = offset
w.filesize = 0
log.Infof("[%s] open file for write. filename=[%s]", w.name, w.filepath)
log.Debugf("[%s] open file for write. filename=[%s]", w.name, w.filepath)
}
func (w *AOFWriter) Write(buf []byte) {

@ -12,6 +12,6 @@ func SetNcpu() {
runtime.GOMAXPROCS(config.Opt.Advanced.Ncpu)
log.Infof("set GOMAXPROCS to %v", config.Opt.Advanced.Ncpu)
} else {
log.Infof("GOMAXPROCS defaults to the value of runtime.NumCPU %v", runtime.NumCPU())
log.Infof("GOMAXPROCS defaults to the value of runtime.NumCPU [%v]", runtime.NumCPU())
}
}

@ -11,13 +11,6 @@ import (
const KeySlots = 16384
type RedisClusterWriterOptions struct {
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
}
type RedisClusterWriter struct {
addresses []string
writers []Writer
@ -26,7 +19,7 @@ type RedisClusterWriter struct {
stat []interface{}
}
func NewRedisClusterWriter(opts *RedisClusterWriterOptions) Writer {
func NewRedisClusterWriter(opts *RedisWriterOptions) Writer {
rw := new(RedisClusterWriter)
rw.loadClusterNodes(opts)
log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses)
@ -39,7 +32,7 @@ func (r *RedisClusterWriter) Close() {
}
}
func (r *RedisClusterWriter) loadClusterNodes(opts *RedisClusterWriterOptions) {
func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) {
c := client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
reply := c.DoWithStringReply("cluster", "nodes")
reply = strings.TrimSpace(reply)
@ -69,7 +62,7 @@ func (r *RedisClusterWriter) loadClusterNodes(opts *RedisClusterWriterOptions) {
r.addresses = append(r.addresses, address)
// writers
opts := &RedisStandaloneWriterOptions{
opts := &RedisWriterOptions{
Address: address,
Username: opts.Username,
Password: opts.Password,

@ -14,7 +14,8 @@ import (
"time"
)
type RedisStandaloneWriterOptions struct {
type RedisWriterOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
@ -36,7 +37,7 @@ type redisStandaloneWriter struct {
}
}
func NewRedisStandaloneWriter(opts *RedisStandaloneWriterOptions) Writer {
func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer {
rw := new(redisStandaloneWriter)
rw.address = opts.Address
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)

@ -1,17 +1,35 @@
transform = ""
function = ""
[sync_standlone_reader]
[sync_reader]
cluster = false
address = "127.0.0.1:6379"
username = "" # keep empty if not using ACL
password = "" # keep empty if no authentication is required
tls = false
[redis_standalone_writer]
# [scan_reader]
# cluster = false
# address = "127.0.0.1:6379"
# username = "" # keep empty if not using ACL
# password = "" # keep empty if no authentication is required
# tls = false
# [rdb_reader]
# address = "127.0.0.1:6379"
# username = "" # keep empty if not using ACL
# password = "" # keep empty if no authentication is required
# tls = false
[redis_writer]
cluster = false
address = "127.0.0.1:6380"
username = "" # keep empty if not using ACL
password = "" # keep empty if no authentication is required
tls = false
[advanced]
dir = "data"
ncpu = 0 # runtime.GOMAXPROCS, 0 means use runtime.NumCPU() cpu cores
@ -30,7 +48,7 @@ log_interval = 5 # in seconds
# panic: redis-shake will stop when meet "Target key name is busy" error.
# rewrite: redis-shake will replace the key with new value.
# ignore: redis-shake will skip restore the key when meet "Target key name is busy" error.
rdb_restore_command_behavior = "rewrite" # panic, rewrite or skip
rdb_restore_command_behavior = "panic" # panic, rewrite or skip
# redis-shake uses pipeline to improve sending performance.
# This item limits the maximum number of commands in a pipeline.

@ -20,10 +20,10 @@ def acl():
inserter.add_data(src, cross_slots_cmd=True)
opts = h.ShakeOpts.create_sync_opts(src, dst)
opts["sync_standalone_reader"]["username"] = "user0"
opts["sync_standalone_reader"]["password"] = "password0"
opts["redis_standalone_writer"]["username"] = "user1"
opts["redis_standalone_writer"]["password"] = "password1"
opts["sync_reader"]["username"] = "user0"
opts["sync_reader"]["password"] = "password0"
opts["redis_writer"]["username"] = "user1"
opts["redis_writer"]["password"] = "password1"
p.log(f"opts: {opts}")
shake = h.Shake(opts)

@ -1,4 +1,3 @@
import time
import typing
import pybbt
@ -12,52 +11,44 @@ from helpers.utils.network import get_free_port
from helpers.utils.timer import Timer
# [SyncClusterReader]
# address = "127.0.0.1:6379"
# username = "" # keep empty if not using ACL
# password = "" # keep empty if no authentication is required
# tls = false
#
# [RedisClusterWriter]
# address = "127.0.0.1:6380"
# username = "" # keep empty if not using ACL
# password = "" # keep empty if no authentication is required
# tls = false
class ShakeOpts:
@staticmethod
def create_sync_opts(src: Redis, dst: Redis) -> typing.Dict:
d = {}
if src.is_cluster():
d["sync_cluster_reader"] = {"address": src.get_address()}
else:
d["sync_standalone_reader"] = {"address": src.get_address()}
if dst.is_cluster():
d["redis_cluster_writer"] = {"address": dst.get_address()}
else:
d["redis_standalone_writer"] = {"address": dst.get_address()}
d = {
"sync_reader": {
"cluster": src.is_cluster(),
"address": src.get_address()
},
"redis_writer": {
"cluster": dst.is_cluster(),
"address": dst.get_address()
}
}
return d
@staticmethod
def create_scan_opts(src: Redis, dst: Redis) -> typing.Dict:
d = {}
if src.is_cluster():
d["scan_cluster_reader"] = {"address": src.get_address()}
else:
d["scan_standalone_reader"] = {"address": src.get_address()}
if dst.is_cluster():
d["redis_cluster_writer"] = {"address": dst.get_address()}
else:
d["redis_standalone_writer"] = {"address": dst.get_address()}
d = {
"scan_reader": {
"cluster": src.is_cluster(),
"address": src.get_address()
},
"redis_writer": {
"cluster": dst.is_cluster(),
"address": dst.get_address()
}
}
return d
@staticmethod
def create_rdb_opts(rdb_path: str, dts: Redis) -> typing.Dict:
d = {"rdb_reader": {"filepath": rdb_path}}
if dts.is_cluster():
d["redis_cluster_writer"] = {"address": dts.get_address()}
else:
d["redis_standalone_writer"] = {"address": dts.get_address()}
d = {
"rdb_reader": {"filepath": rdb_path},
"redis_writer": {
"cluster": dts.is_cluster(),
"address": dts.get_address()
}
}
return d

Loading…
Cancel
Save