diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index ccdd079..2aea899 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -23,42 +23,34 @@ func main() { // create reader var theReader reader.Reader - if v.IsSet("sync_standalone_reader") { - opts := new(reader.SyncStandaloneReaderOptions) + if v.IsSet("sync_reader") { + opts := new(reader.SyncReaderOptions) defaults.SetDefaults(opts) - err := v.UnmarshalKey("sync_standalone_reader", opts) + err := v.UnmarshalKey("sync_reader", opts) if err != nil { log.Panicf("failed to read the SyncReader config entry. err: %v", err) } - theReader = reader.NewSyncStandaloneReader(opts) - log.Infof("create SyncStandaloneReader: %v", opts.Address) - } else if v.IsSet("sync_cluster_reader") { - opts := new(reader.SyncClusterReaderOptions) - defaults.SetDefaults(opts) - err := v.UnmarshalKey("sync_cluster_reader", opts) - if err != nil { - log.Panicf("failed to read the SyncReader config entry. err: %v", err) + if opts.Cluster { + theReader = reader.NewSyncClusterReader(opts) + log.Infof("create SyncClusterReader: %v", opts.Address) + } else { + theReader = reader.NewSyncStandaloneReader(opts) + log.Infof("create SyncStandaloneReader: %v", opts.Address) } - theReader = reader.NewSyncClusterReader(opts) - log.Infof("create SyncClusterReader: %v", opts.Address) - } else if v.IsSet("scan_standalone_reader") { - opts := new(reader.ScanStandaloneReaderOptions) + } else if v.IsSet("scan_reader") { + opts := new(reader.ScanReaderOptions) defaults.SetDefaults(opts) - err := v.UnmarshalKey("scan_standalone_reader", opts) + err := v.UnmarshalKey("scan_reader", opts) if err != nil { log.Panicf("failed to read the ScanReader config entry. err: %v", err) } - theReader = reader.NewScanStandaloneReader(opts) - log.Infof("create ScanStandaloneReader: %v", opts.Address) - } else if v.IsSet("scan_cluster_reader") { - opts := new(reader.ScanClusterReaderOptions) - defaults.SetDefaults(opts) - err := v.UnmarshalKey("scan_cluster_reader", opts) - if err != nil { - log.Panicf("failed to read the ScanReader config entry. err: %v", err) + if opts.Cluster { + theReader = reader.NewScanClusterReader(opts) + log.Infof("create ScanClusterReader: %v", opts.Address) + } else { + theReader = reader.NewScanStandaloneReader(opts) + log.Infof("create ScanStandaloneReader: %v", opts.Address) } - theReader = reader.NewScanClusterReader(opts) - log.Infof("create ScanClusterReader: %v", opts.Address) } else if v.IsSet("rdb_reader") { opts := new(reader.RdbReaderOptions) defaults.SetDefaults(opts) @@ -74,24 +66,20 @@ func main() { // create writer var theWriter writer.Writer - if v.IsSet("redis_standalone_writer") { - opts := new(writer.RedisStandaloneWriterOptions) + if v.IsSet("redis_writer") { + opts := new(writer.RedisWriterOptions) defaults.SetDefaults(opts) - err := v.UnmarshalKey("redis_standalone_writer", opts) + err := v.UnmarshalKey("redis_writer", opts) if err != nil { log.Panicf("failed to read the RedisStandaloneWriter config entry. err: %v", err) } - theWriter = writer.NewRedisStandaloneWriter(opts) - log.Infof("create RedisStandaloneWriter: %v", opts.Address) - } else if v.IsSet("redis_cluster_writer") { - opts := new(writer.RedisClusterWriterOptions) - defaults.SetDefaults(opts) - err := v.UnmarshalKey("redis_cluster_writer", opts) - if err != nil { - log.Panicf("failed to read the RedisClusterWriter config entry. err: %v", err) + if opts.Cluster { + theWriter = writer.NewRedisClusterWriter(opts) + log.Infof("create RedisClusterWriter: %v", opts.Address) + } else { + theWriter = writer.NewRedisStandaloneWriter(opts) + log.Infof("create RedisStandaloneWriter: %v", opts.Address) } - theWriter = writer.NewRedisClusterWriter(opts) - log.Infof("create RedisClusterWriter: %v", opts.Address) } else { log.Panicf("no writer config entry found") } @@ -99,22 +87,23 @@ func main() { // create status status.Init(theReader, theWriter) + log.Infof("start syncing...") + ch := theReader.StartRead() for e := range ch { // calc arguments - e.Preprocess() + e.Parse() + status.AddReadCount(e.CmdName) // filter log.Debugf("function before: %v", e) entries := function.RunFunction(e) log.Debugf("function after: %v", entries) - if len(entries) == 0 { - status.AddEntryCount(e.CmdName, false) - } else { - for _, entry := range entries { - theWriter.Write(entry) - status.AddEntryCount(entry.CmdName, true) - } + + for _, entry := range entries { + entry.Parse() + theWriter.Write(entry) + status.AddWriteCount(entry.CmdName) } } diff --git a/docs/.vitepress/config.ts b/docs/.vitepress/config.ts index 28163a5..5111ab3 100644 --- a/docs/.vitepress/config.ts +++ b/docs/.vitepress/config.ts @@ -19,21 +19,55 @@ export default defineConfig({ ], sidebar: [ { - text: '基础教程', + text: '介绍', items: [ - { text: 'RedisShake 简介', link: '/zh/guide/getting-started' }, + { text: '什么是 RedisShake', link: '/zh/guide/getting-started' }, { text: '快速上手', link: '/zh/guide/getting-started' }, - { text: '配置文件', link: '/zh/guide/config' } + { text: '配置', link: '/zh/guide/getting-started' }, + { text: '迁移模式选择', link: '/zh/guide/getting-started' }, ] }, { - text: '变换/过滤', + text: 'Reader', items: [ - { text: '上手使用', link: '/zh/transform/getting-started' }, - { text: '样例', link: '/zh/transform/examples' } + { text: 'Sync Reader', link: '/zh/function/introduction' }, + { text: 'Scan Reader', link: '/zh/function/best_practices' }, + { text: 'RDB Reader', link: '/zh/function/best_practices' }, + ] + }, + { + text: 'Writer', + items: [ + { text: 'Redis Writer', link: '/zh/function/introduction' }, + ] + }, + { + text: 'function', + items: [ + { text: '什么是 function', link: '/zh/function/introduction' }, + { + text: '最佳实践', + items: [ + { text: '监控', link: '/zh/function/best_practices' }, + ] + } + ] + }, + { + text: '进阶用法', + items: [ + { text: '监控', link: '/zh/function/best_practices' }, + { text: '双向同步', link: '/zh/function/best_practices' }, + { text: '容器部署', link: '/zh/function/best_practices' }, + { text: '主从实例向集群实例迁移', link: '/zh/function/best_practices' }, + { text: '大 key 重写', link: '/zh/function/best_practices' }, ] } ], + footer: { + message: 'Released under the MIT License.', + copyright: 'Copyright © 2019-present Tair' + } } }, en: { diff --git a/docs/package-lock.json b/docs/package-lock.json index 8f9b78b..017ce21 100644 --- a/docs/package-lock.json +++ b/docs/package-lock.json @@ -5,7 +5,7 @@ "packages": { "": { "devDependencies": { - "vitepress": "^1.0.0-alpha.75" + "vitepress": "^1.0.0-rc.4" } }, "node_modules/@algolia/autocomplete-core": { @@ -1220,23 +1220,23 @@ } }, "node_modules/vitepress": { - "version": "1.0.0-beta.7", - "resolved": "https://registry.npmjs.org/vitepress/-/vitepress-1.0.0-beta.7.tgz", - "integrity": "sha512-P9Rw+FXatKIU4fVdtKxqwHl6fby8E/8zE3FIfep6meNgN4BxbWqoKJ6yfuuQQR9IrpQqwnyaBh4LSabyll6tWg==", + "version": "1.0.0-rc.4", + "resolved": "https://registry.npmjs.org/vitepress/-/vitepress-1.0.0-rc.4.tgz", + "integrity": "sha512-JCQ89Bm6ECUTnyzyas3JENo00UDJeK8q1SUQyJYou+4Yz5BKEc/F3O21cu++DnUT2zXc0kvQ2Aj4BZCc/nioXQ==", "dev": true, "dependencies": { "@docsearch/css": "^3.5.1", "@docsearch/js": "^3.5.1", "@vitejs/plugin-vue": "^4.2.3", "@vue/devtools-api": "^6.5.0", - "@vueuse/core": "^10.2.1", - "@vueuse/integrations": "^10.2.1", + "@vueuse/core": "^10.3.0", + "@vueuse/integrations": "^10.3.0", "body-scroll-lock": "4.0.0-beta.0", "focus-trap": "^7.5.2", "mark.js": "8.11.1", "minisearch": "^6.1.0", "shiki": "^0.14.3", - "vite": "^4.4.7", + "vite": "^4.4.9", "vue": "^3.3.4" }, "bin": { diff --git a/docs/package.json b/docs/package.json index 5e63370..444d549 100644 --- a/docs/package.json +++ b/docs/package.json @@ -1,10 +1,11 @@ { + "type": "module", "devDependencies": { - "vitepress": "^1.0.0-alpha.75" + "vitepress": "^1.0.0-rc.4" }, "scripts": { "docs:dev": "vitepress dev", "docs:build": "vitepress build", "docs:preview": "vitepress preview" } -} \ No newline at end of file +} diff --git a/docs/src/index.md b/docs/src/index.md index ead6a7f..af9ea6d 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -10,9 +10,6 @@ hero: - theme: brand text: 快速上手 link: /zh/guide/getting-started - # - theme: alt - # text: 云原生内存数据库Tair - # link: https://www.aliyun.com/product/apsaradb/kvstore/tair features: - title: 数据迁移 diff --git a/docs/src/zh/transform/getting-started.md b/docs/src/zh/function/best_practices.md similarity index 53% rename from docs/src/zh/transform/getting-started.md rename to docs/src/zh/function/best_practices.md index 726575f..46fe000 100644 --- a/docs/src/zh/transform/getting-started.md +++ b/docs/src/zh/function/best_practices.md @@ -1,6 +1,7 @@ --- outline: deep --- -# 上手使用 -TODO +# 最佳实践 + + diff --git a/docs/src/zh/transform/examples.md b/docs/src/zh/function/introduction.md similarity index 100% rename from docs/src/zh/transform/examples.md rename to docs/src/zh/function/introduction.md diff --git a/internal/client/redis.go b/internal/client/redis.go index d85cc8d..16861c3 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -101,6 +101,14 @@ func (r *Redis) Receive() (interface{}, error) { return r.protoReader.ReadReply() } +func (r *Redis) ReceiveString() string { + reply, err := r.Receive() + if err != nil { + log.Panicf(err.Error()) + } + return reply.(string) +} + func (r *Redis) BufioReader() *bufio.Reader { return r.reader } diff --git a/internal/entry/entry.go b/internal/entry/entry.go index c21965d..1539941 100644 --- a/internal/entry/entry.go +++ b/internal/entry/entry.go @@ -50,7 +50,7 @@ func (e *Entry) Serialize() []byte { return buf.Bytes() } -func (e *Entry) Preprocess() { +func (e *Entry) Parse() { e.CmdName, e.Group, e.Keys = commands.CalcKeys(e.Argv) e.Slots = commands.CalcSlots(e.Keys) } diff --git a/internal/log/func.go b/internal/log/func.go index 899cc67..2d6aa15 100644 --- a/internal/log/func.go +++ b/internal/log/func.go @@ -1,7 +1,6 @@ package log import ( - "fmt" "github.com/go-stack/stack" "os" ) @@ -20,10 +19,9 @@ func Warnf(format string, args ...interface{}) { func Panicf(format string, args ...interface{}) { frames := stack.Trace().TrimRuntime() - msgs := fmt.Sprintf(format, args...) for _, frame := range frames { - msgs += fmt.Sprintf("\n%+v -> %n()", frame, frame) + logger.Warn().Msgf("%+v -> %n()", frame, frame) } - logger.Error().Msg(msgs) + logger.Error().Msgf(format, args...) os.Exit(1) } diff --git a/internal/log/init.go b/internal/log/init.go index cf36661..0274946 100644 --- a/internal/log/init.go +++ b/internal/log/init.go @@ -11,7 +11,6 @@ import ( var logger zerolog.Logger func Init(level string, file string) { - // log level switch level { case "debug": diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 46e5753..81fa05a 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -85,7 +85,7 @@ func (ld *Loader) ParseRDB() int { if err != nil { log.Panicf(err.Error()) } - log.Infof("[%s] RDB version: %d", ld.name, version) + log.Debugf("[%s] RDB version: %d", ld.name, version) // read entries ld.parseRDBEntry(rd) @@ -125,19 +125,19 @@ func (ld *Loader) parseRDBEntry(rd *bufio.Reader) { if err != nil { log.Panicf(err.Error()) } - log.Infof("[%s] RDB repl-stream-db: [%s]", ld.name, value) + log.Debugf("[%s] RDB repl-stream-db: [%s]", ld.name, value) } else if key == "lua" { e := entry.NewEntry() e.Argv = []string{"script", "load", value} ld.ch <- e - log.Infof("[%s] LUA script: [%s]", ld.name, value) + log.Debugf("[%s] LUA script: [%s]", ld.name, value) } else { - log.Infof("[%s] RDB AUX: key=[%s], value=[%s]", ld.name, key, value) + log.Debugf("[%s] RDB AUX: key=[%s], value=[%s]", ld.name, key, value) } case kFlagResizeDB: dbSize := structure.ReadLength(rd) expireSize := structure.ReadLength(rd) - log.Infof("[%s] RDB resize db: db_size=[%d], expire_size=[%d]", ld.name, dbSize, expireSize) + log.Debugf("[%s] RDB resize db: db_size=[%d], expire_size=[%d]", ld.name, dbSize, expireSize) case kFlagExpireMs: ld.expireMs = int64(structure.ReadUint64(rd)) - time.Now().UnixMilli() if ld.expireMs < 0 { diff --git a/internal/reader/scan_cluster_reader.go b/internal/reader/scan_cluster_reader.go index 898a9a8..65853ec 100644 --- a/internal/reader/scan_cluster_reader.go +++ b/internal/reader/scan_cluster_reader.go @@ -3,26 +3,21 @@ package reader import ( "RedisShake/internal/entry" "RedisShake/internal/utils" + "fmt" "sync" ) -type ScanClusterReaderOptions struct { - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` -} - type scanClusterReader struct { - readers []Reader + readers []Reader + statusId int } -func NewScanClusterReader(opts *ScanClusterReaderOptions) Reader { +func NewScanClusterReader(opts *ScanReaderOptions) Reader { addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) rd := &scanClusterReader{} for _, address := range addresses { - rd.readers = append(rd.readers, NewScanStandaloneReader(&ScanStandaloneReaderOptions{ + rd.readers = append(rd.readers, NewScanStandaloneReader(&ScanReaderOptions{ Address: address, Username: opts.Username, Password: opts.Password, @@ -60,7 +55,9 @@ func (rd *scanClusterReader) Status() interface{} { } func (rd *scanClusterReader) StatusString() string { - return "scanClusterReader" + rd.statusId += 1 + rd.statusId %= len(rd.readers) + return fmt.Sprintf("src-%d, %s", rd.statusId, rd.readers[rd.statusId].StatusString()) } func (rd *scanClusterReader) StatusConsistent() bool { diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index dcd172b..23d4ce4 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -18,24 +18,17 @@ type dbKey struct { isSelect bool } -type ScanStandaloneReaderOptions struct { - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` -} - type scanStandaloneReader struct { isCluster bool + ch chan *entry.Entry // client for scan keys - clientScan *client.Redis - innerChannel chan *dbKey + clientScan *client.Redis // client for dump keys + keysNeedFetch chan *dbKey clientDump *client.Redis clientDumpDbid int - ch chan *entry.Entry stat struct { Name string `json:"name"` @@ -46,18 +39,27 @@ type scanStandaloneReader struct { } } -func NewScanStandaloneReader(opts *ScanStandaloneReaderOptions) Reader { +type ScanReaderOptions struct { + Cluster bool `mapstructure:"cluster" default:"false"` + Address string `mapstructure:"address" default:""` + Username string `mapstructure:"username" default:""` + Password string `mapstructure:"password" default:""` + Tls bool `mapstructure:"tls" default:"false"` + KSN bool `mapstructure:"ksn" default:"false"` +} + +func NewScanStandaloneReader(opts *ScanReaderOptions) Reader { r := new(scanStandaloneReader) r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) r.clientScan = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) r.clientDump = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) - r.isCluster = r.clientScan.IsCluster() + r.isCluster = r.clientScan.IsCluster() // not use opts.Cluster, because user may use standalone mode to scan a cluster node return r } func (r *scanStandaloneReader) StartRead() chan *entry.Entry { r.ch = make(chan *entry.Entry, 1024) - r.innerChannel = make(chan *dbKey, 1024) + r.keysNeedFetch = make(chan *dbKey, 1024) go r.scan() go r.fetch() return r.ch @@ -77,7 +79,7 @@ func (r *scanStandaloneReader) scan() { } r.clientDump.Send("SELECT", strconv.Itoa(dbId)) - r.innerChannel <- &dbKey{dbId, "", true} + r.keysNeedFetch <- &dbKey{dbId, "", true} } var cursor uint64 = 0 @@ -87,7 +89,7 @@ func (r *scanStandaloneReader) scan() { for _, key := range keys { r.clientDump.Send("DUMP", key) r.clientDump.Send("PTTL", key) - r.innerChannel <- &dbKey{dbId, key, false} + r.keysNeedFetch <- &dbKey{dbId, key, false} } // stat @@ -101,12 +103,12 @@ func (r *scanStandaloneReader) scan() { } } r.stat.Finished = true - close(r.innerChannel) + close(r.keysNeedFetch) } func (r *scanStandaloneReader) fetch() { var id uint64 = 0 - for item := range r.innerChannel { + for item := range r.keysNeedFetch { if item.isSelect { // select receive, err := client.String(r.clientDump.Receive()) @@ -158,9 +160,9 @@ func (r *scanStandaloneReader) Status() interface{} { func (r *scanStandaloneReader) StatusString() string { if r.stat.Finished { - return fmt.Sprintf("[%s] finished", r.stat.Name) + return fmt.Sprintf("finished") } - return fmt.Sprintf("[%s] dbid: [%d], percent: [%s]", r.stat.Name, r.stat.DbId, r.stat.PercentByDbId) + return fmt.Sprintf("dbid=[%d], percent=[%s]", r.stat.DbId, r.stat.PercentByDbId) } func (r *scanStandaloneReader) StatusConsistent() bool { diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go index 74cd3b9..43fe82f 100644 --- a/internal/reader/sync_cluster_reader.go +++ b/internal/reader/sync_cluster_reader.go @@ -4,21 +4,16 @@ import ( "RedisShake/internal/entry" "RedisShake/internal/log" "RedisShake/internal/utils" + "fmt" "sync" ) -type SyncClusterReaderOptions struct { - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` -} - type syncClusterReader struct { - readers []Reader + readers []Reader + statusId int } -func NewSyncClusterReader(opts *SyncClusterReaderOptions) Reader { +func NewSyncClusterReader(opts *SyncReaderOptions) Reader { addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) log.Debugf("get redis cluster nodes:") for _, address := range addresses { @@ -26,7 +21,7 @@ func NewSyncClusterReader(opts *SyncClusterReaderOptions) Reader { } rd := &syncClusterReader{} for _, address := range addresses { - rd.readers = append(rd.readers, NewSyncStandaloneReader(&SyncStandaloneReaderOptions{ + rd.readers = append(rd.readers, NewSyncStandaloneReader(&SyncReaderOptions{ Address: address, Username: opts.Username, Password: opts.Password, @@ -64,7 +59,9 @@ func (rd *syncClusterReader) Status() interface{} { } func (rd *syncClusterReader) StatusString() string { - return "syncClusterReader" + rd.statusId += 1 + rd.statusId %= len(rd.readers) + return fmt.Sprintf("src-%d, %s", rd.statusId, rd.readers[rd.statusId].StatusString()) } func (rd *syncClusterReader) StatusConsistent() bool { diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index c7362e8..7931e60 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -19,13 +19,24 @@ import ( "time" ) -type SyncStandaloneReaderOptions struct { +type SyncReaderOptions struct { + Cluster bool `mapstructure:"cluster" default:"false"` Address string `mapstructure:"address" default:""` Username string `mapstructure:"username" default:""` Password string `mapstructure:"password" default:""` Tls bool `mapstructure:"tls" default:"false"` } +type State string + +const ( + kHandShake State = "hand shaking" + kWaitBgsave State = "waiting bgsave" + kReceiveRdb State = "receiving rdb" + kSyncRdb State = "syncing rdb" + kSyncAof State = "syncing aof" +) + type syncStandaloneReader struct { client *client.Redis @@ -40,12 +51,12 @@ type syncStandaloneReader struct { Dir string `json:"dir"` // status - Status string `json:"status"` + Status State `json:"status"` // rdb info RdbFilePath string `json:"rdb_file_path"` RdbFileSizeBytes int64 `json:"rdb_file_size_bytes"` // bytes of the rdb file - RdbFIleSizeHuman string `json:"rdb_file_size_human"` + RdbFileSizeHuman string `json:"rdb_file_size_human"` RdbReceivedBytes int64 `json:"rdb_received_bytes"` // bytes of RDB received from master RdbReceivedHuman string `json:"rdb_received_human"` RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan @@ -59,13 +70,13 @@ type syncStandaloneReader struct { } } -func NewSyncStandaloneReader(opts *SyncStandaloneReaderOptions) Reader { +func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader { r := new(syncStandaloneReader) r.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) r.rd = r.client.BufioReader() r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) r.stat.Address = opts.Address - r.stat.Status = "init" + r.stat.Status = kHandShake r.stat.Dir = utils.GetAbsPath(r.stat.Name) utils.CreateEmptyDir(r.stat.Dir) return r @@ -81,6 +92,7 @@ func (r *syncStandaloneReader) StartRead() chan *entry.Entry { startOffset := r.stat.AofReceivedOffset go r.receiveAOF(r.rd) r.sendRDB() + r.stat.Status = kSyncAof r.sendAOF(startOffset) }() @@ -109,34 +121,16 @@ func (r *syncStandaloneReader) sendPSync() { r.client.Send(argv...) // format: \n\n\n+\r\n - for true { // TODO better way to parse psync reply - // \n\n\n+ - b, err := r.rd.ReadByte() + for { + bytes, err := r.rd.Peek(1) if err != nil { log.Panicf(err.Error()) } - if b == '\n' { - continue + if bytes[0] != '\n' { + break } - if b == '-' { - reply, err := r.rd.ReadString('\n') - if err != nil { - log.Panicf(err.Error()) - } - reply = strings.TrimSpace(reply) - log.Panicf("psync error. name=[%s], reply=[%s]", r.stat.Name, reply) - } - if b != '+' { - log.Panicf("invalid psync reply. name=[%s], b=[%s]", r.stat.Name, string(b)) - } - break } - reply, err := r.rd.ReadString('\n') - if err != nil { - log.Panicf(err.Error()) - } - reply = strings.TrimSpace(reply) - + reply := r.client.ReceiveString() masterOffset, err := strconv.Atoi(strings.Split(reply, " ")[2]) if err != nil { log.Panicf(err.Error()) @@ -145,16 +139,16 @@ func (r *syncStandaloneReader) sendPSync() { } func (r *syncStandaloneReader) receiveRDB() { - log.Infof("[%s] source db is doing bgsave.", r.stat.Name) - r.stat.Status = "source db is doing bgsave" + log.Debugf("[%s] source db is doing bgsave.", r.stat.Name) + r.stat.Status = kWaitBgsave timeStart := time.Now() // format: \n\n\n$\r\n - for true { + for { b, err := r.rd.ReadByte() if err != nil { log.Panicf(err.Error()) } - if b == '\n' { + if b == '\n' { // heartbeat continue } if b != '$' { @@ -162,7 +156,7 @@ func (r *syncStandaloneReader) receiveRDB() { } break } - log.Infof("[%s] source db bgsave finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds()) + log.Debugf("[%s] source db bgsave finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds()) lengthStr, err := r.rd.ReadString('\n') if err != nil { log.Panicf(err.Error()) @@ -172,23 +166,24 @@ func (r *syncStandaloneReader) receiveRDB() { if err != nil { log.Panicf(err.Error()) } - log.Infof("[%s] rdb length=[%d]", r.stat.Name, length) + log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length))) r.stat.RdbFileSizeBytes = length - r.stat.RdbFIleSizeHuman = humanize.IBytes(uint64(length)) + r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length)) // create rdb file r.stat.RdbFilePath, err = filepath.Abs(r.stat.Name + "/dump.rdb") if err != nil { log.Panicf(err.Error()) } - log.Infof("[%s] start receiving RDB. path=[%s]", r.stat.Name, r.stat.RdbFilePath) + timeStart = time.Now() + log.Debugf("[%s] start receiving RDB. path=[%s]", r.stat.Name, r.stat.RdbFilePath) rdbFileHandle, err := os.OpenFile(r.stat.RdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { log.Panicf(err.Error()) } // receive rdb - r.stat.Status = fmt.Sprintf("[%s]: receiving RDB", r.stat.Name) + r.stat.Status = kReceiveRdb remainder := length const bufSize int64 = 32 * 1024 * 1024 // 32MB buf := make([]byte, bufSize) @@ -214,11 +209,11 @@ func (r *syncStandaloneReader) receiveRDB() { if err != nil { log.Panicf(err.Error()) } - log.Infof("[%s] save RDB finished.", r.stat.Name) + log.Debugf("[%s] save RDB finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds()) } func (r *syncStandaloneReader) receiveAOF(rd io.Reader) { - log.Infof("[%s] start receiving aof data, and save to file", r.stat.Name) + log.Debugf("[%s] start receiving aof data, and save to file", r.stat.Name) aofWriter := rotate.NewAOFWriter(r.stat.Name, r.stat.Dir, r.stat.AofReceivedOffset) defer aofWriter.Close() buf := make([]byte, 16*1024) // 16KB is enough for writing file @@ -236,15 +231,15 @@ func (r *syncStandaloneReader) receiveAOF(rd io.Reader) { func (r *syncStandaloneReader) sendRDB() { // start parse rdb - log.Infof("[%s] start sending RDB to target", r.stat.Name) - r.stat.Status = fmt.Sprintf("[%s]: sending RDB to target", r.stat.Name) + log.Debugf("[%s] start sending RDB to target", r.stat.Name) + r.stat.Status = kSyncRdb updateFunc := func(offset int64) { r.stat.RdbSentBytes = offset r.stat.RdbSentHuman = humanize.IBytes(uint64(offset)) } rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, r.stat.RdbFilePath, r.ch) r.DbId = rdbLoader.ParseRDB() - log.Infof("[%s] send RDB finished", r.stat.Name) + log.Debugf("[%s] send RDB finished", r.stat.Name) } func (r *syncStandaloneReader) sendAOF(offset int64) { @@ -264,12 +259,23 @@ func (r *syncStandaloneReader) sendAOF(offset int64) { r.DbId = DbId continue } + // ping + if strings.EqualFold(argv[0], "ping") { + continue + } + // replconf @AWS + if strings.EqualFold(argv[0], "replconf") { + continue + } + // opinfo @Aliyun + if strings.EqualFold(argv[0], "opinfo") { + continue + } e := entry.NewEntry() e.Argv = argv e.DbId = r.DbId r.ch <- e - r.stat.Status = fmt.Sprintf("[%s]: sending aof to target", r.stat.Name) } } @@ -287,7 +293,13 @@ func (r *syncStandaloneReader) Status() interface{} { } func (r *syncStandaloneReader) StatusString() string { - return r.stat.Status + if r.stat.Status == kSyncRdb { + return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbSentHuman, r.stat.RdbFileSizeHuman) + } + if r.stat.Status == kSyncAof { + return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset) + } + return string(r.stat.Status) } func (r *syncStandaloneReader) StatusConsistent() bool { diff --git a/internal/status/entry_count.go b/internal/status/entry_count.go index 3b0c312..02a933c 100644 --- a/internal/status/entry_count.go +++ b/internal/status/entry_count.go @@ -6,14 +6,14 @@ import ( ) type EntryCount struct { - Allow uint64 `json:"allow"` - Disallow uint64 `json:"disallow"` - AllowOps float64 `json:"allow_ops"` - DisallowOps float64 `json:"disallow_ops"` + ReadCount uint64 `json:"read_count"` + ReadOps float64 `json:"read_ops"` + WriteCount uint64 `json:"write_count"` + WriteOps float64 `json:"write_ops"` // update ops - lastAllow uint64 - lastDisallow uint64 + lastReadCount uint64 + lastWriteCount uint64 lastUpdateTimestampSec float64 } @@ -22,14 +22,14 @@ func (e *EntryCount) updateOPS() { nowTimestampSec := float64(time.Now().UnixNano()) / 1e9 if e.lastUpdateTimestampSec != 0 { timeIntervalSec := nowTimestampSec - e.lastUpdateTimestampSec - e.AllowOps = float64(e.Allow-e.lastAllow) / timeIntervalSec - e.DisallowOps = float64(e.Disallow-e.lastDisallow) / timeIntervalSec - e.lastAllow = e.Allow - e.lastDisallow = e.Disallow + e.ReadOps = float64(e.ReadCount-e.lastReadCount) / timeIntervalSec + e.WriteOps = float64(e.WriteCount-e.lastWriteCount) / timeIntervalSec + e.lastReadCount = e.ReadCount + e.lastWriteCount = e.WriteCount } e.lastUpdateTimestampSec = nowTimestampSec } func (e *EntryCount) String() string { - return fmt.Sprintf("allow: %.2fops/s, disallow: %.2fops/s", e.AllowOps, e.DisallowOps) + return fmt.Sprintf("read_count=[%d], read_ops=[%.2f], write_count=[%d], write_ops=[%.2f]", e.ReadCount, e.ReadOps, e.WriteCount, e.WriteOps) } diff --git a/internal/status/status.go b/internal/status/status.go index db013bd..a83a441 100644 --- a/internal/status/status.go +++ b/internal/status/status.go @@ -29,7 +29,7 @@ var stat = new(Stat) var theReader Statusable var theWriter Statusable -func AddEntryCount(cmd string, allow bool) { +func AddReadCount(cmd string) { ch <- func() { if stat.PerCmdEntriesCount == nil { stat.PerCmdEntriesCount = make(map[string]EntryCount) @@ -39,13 +39,24 @@ func AddEntryCount(cmd string, allow bool) { cmdEntryCount = EntryCount{} stat.PerCmdEntriesCount[cmd] = cmdEntryCount } - if allow { - stat.TotalEntriesCount.Allow += 1 - cmdEntryCount.Allow += 1 - } else { - stat.TotalEntriesCount.Disallow += 1 - cmdEntryCount.Disallow += 1 + stat.TotalEntriesCount.ReadCount += 1 + cmdEntryCount.ReadCount += 1 + stat.PerCmdEntriesCount[cmd] = cmdEntryCount + } +} + +func AddWriteCount(cmd string) { + ch <- func() { + if stat.PerCmdEntriesCount == nil { + stat.PerCmdEntriesCount = make(map[string]EntryCount) + } + cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd] + if !ok { + cmdEntryCount = EntryCount{} + stat.PerCmdEntriesCount[cmd] = cmdEntryCount } + stat.TotalEntriesCount.WriteCount += 1 + cmdEntryCount.WriteCount += 1 stat.PerCmdEntriesCount[cmd] = cmdEntryCount } } @@ -92,10 +103,7 @@ func Init(r Statusable, w Statusable) { select { case <-ticker.C: ch <- func() { - log.Infof("%s, %s, %s", - stat.TotalEntriesCount.String(), - theReader.StatusString(), - theWriter.StatusString()) + log.Infof("%s, %s", stat.TotalEntriesCount.String(), theReader.StatusString()) } } } diff --git a/internal/utils/file_rotate/aof_reader.go b/internal/utils/file_rotate/aof_reader.go index f0c79fc..1d82d20 100644 --- a/internal/utils/file_rotate/aof_reader.go +++ b/internal/utils/file_rotate/aof_reader.go @@ -35,7 +35,7 @@ func (r *AOFReader) openFile(offset int64) { } r.offset = offset r.pos = 0 - log.Infof("[%s] open file for read. filename=[%s]", r.name, r.filepath) + log.Debugf("[%s] open file for read. filename=[%s]", r.name, r.filepath) } func (r *AOFReader) readNextFile(offset int64) { diff --git a/internal/utils/file_rotate/aof_writer.go b/internal/utils/file_rotate/aof_writer.go index b482896..18c12b8 100644 --- a/internal/utils/file_rotate/aof_writer.go +++ b/internal/utils/file_rotate/aof_writer.go @@ -35,7 +35,7 @@ func (w *AOFWriter) openFile(offset int64) { } w.offset = offset w.filesize = 0 - log.Infof("[%s] open file for write. filename=[%s]", w.name, w.filepath) + log.Debugf("[%s] open file for write. filename=[%s]", w.name, w.filepath) } func (w *AOFWriter) Write(buf []byte) { diff --git a/internal/utils/ncpu.go b/internal/utils/ncpu.go index 0d2adb3..ba3c502 100644 --- a/internal/utils/ncpu.go +++ b/internal/utils/ncpu.go @@ -12,6 +12,6 @@ func SetNcpu() { runtime.GOMAXPROCS(config.Opt.Advanced.Ncpu) log.Infof("set GOMAXPROCS to %v", config.Opt.Advanced.Ncpu) } else { - log.Infof("GOMAXPROCS defaults to the value of runtime.NumCPU %v", runtime.NumCPU()) + log.Infof("GOMAXPROCS defaults to the value of runtime.NumCPU [%v]", runtime.NumCPU()) } } diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go index 145d1ad..62acb4a 100644 --- a/internal/writer/redis_cluster_writer.go +++ b/internal/writer/redis_cluster_writer.go @@ -11,13 +11,6 @@ import ( const KeySlots = 16384 -type RedisClusterWriterOptions struct { - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` -} - type RedisClusterWriter struct { addresses []string writers []Writer @@ -26,7 +19,7 @@ type RedisClusterWriter struct { stat []interface{} } -func NewRedisClusterWriter(opts *RedisClusterWriterOptions) Writer { +func NewRedisClusterWriter(opts *RedisWriterOptions) Writer { rw := new(RedisClusterWriter) rw.loadClusterNodes(opts) log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses) @@ -39,7 +32,7 @@ func (r *RedisClusterWriter) Close() { } } -func (r *RedisClusterWriter) loadClusterNodes(opts *RedisClusterWriterOptions) { +func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) { c := client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) reply := c.DoWithStringReply("cluster", "nodes") reply = strings.TrimSpace(reply) @@ -69,7 +62,7 @@ func (r *RedisClusterWriter) loadClusterNodes(opts *RedisClusterWriterOptions) { r.addresses = append(r.addresses, address) // writers - opts := &RedisStandaloneWriterOptions{ + opts := &RedisWriterOptions{ Address: address, Username: opts.Username, Password: opts.Password, diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index 2c4ebe5..f4f57e3 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -14,7 +14,8 @@ import ( "time" ) -type RedisStandaloneWriterOptions struct { +type RedisWriterOptions struct { + Cluster bool `mapstructure:"cluster" default:"false"` Address string `mapstructure:"address" default:""` Username string `mapstructure:"username" default:""` Password string `mapstructure:"password" default:""` @@ -36,7 +37,7 @@ type redisStandaloneWriter struct { } } -func NewRedisStandaloneWriter(opts *RedisStandaloneWriterOptions) Writer { +func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer { rw := new(redisStandaloneWriter) rw.address = opts.Address rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1) diff --git a/shake.toml b/shake.toml index 8062954..6f5824d 100644 --- a/shake.toml +++ b/shake.toml @@ -1,17 +1,35 @@ -transform = "" +function = "" -[sync_standlone_reader] + +[sync_reader] +cluster = false address = "127.0.0.1:6379" username = "" # keep empty if not using ACL password = "" # keep empty if no authentication is required tls = false -[redis_standalone_writer] +# [scan_reader] +# cluster = false +# address = "127.0.0.1:6379" +# username = "" # keep empty if not using ACL +# password = "" # keep empty if no authentication is required +# tls = false + +# [rdb_reader] +# address = "127.0.0.1:6379" +# username = "" # keep empty if not using ACL +# password = "" # keep empty if no authentication is required +# tls = false + + +[redis_writer] +cluster = false address = "127.0.0.1:6380" username = "" # keep empty if not using ACL password = "" # keep empty if no authentication is required tls = false + [advanced] dir = "data" ncpu = 0 # runtime.GOMAXPROCS, 0 means use runtime.NumCPU() cpu cores @@ -30,7 +48,7 @@ log_interval = 5 # in seconds # panic: redis-shake will stop when meet "Target key name is busy" error. # rewrite: redis-shake will replace the key with new value. # ignore: redis-shake will skip restore the key when meet "Target key name is busy" error. -rdb_restore_command_behavior = "rewrite" # panic, rewrite or skip +rdb_restore_command_behavior = "panic" # panic, rewrite or skip # redis-shake uses pipeline to improve sending performance. # This item limits the maximum number of commands in a pipeline. diff --git a/tests/cases/auth_acl.py b/tests/cases/auth_acl.py index 7df0d4d..5601126 100644 --- a/tests/cases/auth_acl.py +++ b/tests/cases/auth_acl.py @@ -20,10 +20,10 @@ def acl(): inserter.add_data(src, cross_slots_cmd=True) opts = h.ShakeOpts.create_sync_opts(src, dst) - opts["sync_standalone_reader"]["username"] = "user0" - opts["sync_standalone_reader"]["password"] = "password0" - opts["redis_standalone_writer"]["username"] = "user1" - opts["redis_standalone_writer"]["password"] = "password1" + opts["sync_reader"]["username"] = "user0" + opts["sync_reader"]["password"] = "password0" + opts["redis_writer"]["username"] = "user1" + opts["redis_writer"]["password"] = "password1" p.log(f"opts: {opts}") shake = h.Shake(opts) diff --git a/tests/helpers/shake.py b/tests/helpers/shake.py index 2597a35..34c7c52 100644 --- a/tests/helpers/shake.py +++ b/tests/helpers/shake.py @@ -1,4 +1,3 @@ -import time import typing import pybbt @@ -12,52 +11,44 @@ from helpers.utils.network import get_free_port from helpers.utils.timer import Timer -# [SyncClusterReader] -# address = "127.0.0.1:6379" -# username = "" # keep empty if not using ACL -# password = "" # keep empty if no authentication is required -# tls = false -# -# [RedisClusterWriter] -# address = "127.0.0.1:6380" -# username = "" # keep empty if not using ACL -# password = "" # keep empty if no authentication is required -# tls = false - class ShakeOpts: @staticmethod def create_sync_opts(src: Redis, dst: Redis) -> typing.Dict: - d = {} - if src.is_cluster(): - d["sync_cluster_reader"] = {"address": src.get_address()} - else: - d["sync_standalone_reader"] = {"address": src.get_address()} - if dst.is_cluster(): - d["redis_cluster_writer"] = {"address": dst.get_address()} - else: - d["redis_standalone_writer"] = {"address": dst.get_address()} + d = { + "sync_reader": { + "cluster": src.is_cluster(), + "address": src.get_address() + }, + "redis_writer": { + "cluster": dst.is_cluster(), + "address": dst.get_address() + } + } return d @staticmethod def create_scan_opts(src: Redis, dst: Redis) -> typing.Dict: - d = {} - if src.is_cluster(): - d["scan_cluster_reader"] = {"address": src.get_address()} - else: - d["scan_standalone_reader"] = {"address": src.get_address()} - if dst.is_cluster(): - d["redis_cluster_writer"] = {"address": dst.get_address()} - else: - d["redis_standalone_writer"] = {"address": dst.get_address()} + d = { + "scan_reader": { + "cluster": src.is_cluster(), + "address": src.get_address() + }, + "redis_writer": { + "cluster": dst.is_cluster(), + "address": dst.get_address() + } + } return d @staticmethod def create_rdb_opts(rdb_path: str, dts: Redis) -> typing.Dict: - d = {"rdb_reader": {"filepath": rdb_path}} - if dts.is_cluster(): - d["redis_cluster_writer"] = {"address": dts.get_address()} - else: - d["redis_standalone_writer"] = {"address": dts.get_address()} + d = { + "rdb_reader": {"filepath": rdb_path}, + "redis_writer": { + "cluster": dts.is_cluster(), + "address": dts.get_address() + } + } return d