From 5a9d7b97f4c42793ecde18a069a97db65a58b940 Mon Sep 17 00:00:00 2001 From: suxb201 Date: Fri, 8 Jul 2022 21:05:56 +0800 Subject: [PATCH] support ElastiCache psync --- README.md | 5 +++-- cmd/redis-shake/main.go | 2 +- internal/config/config.go | 13 +++++++------ internal/reader/psync.go | 31 +++++++++++++++++++------------ redis-shake.toml | 5 +++-- test/assets/empty.toml | 1 + 6 files changed, 34 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 9c92894..fdd99ab 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,9 @@ redis-shake is a tool for Redis data migration and provides a certain degree of * 🌐 Support single instance and cluster * ✅ Tested on Redis 5.0, Redis 6.0 and Redis 7.0 * 🤗 Supports custom filtering rules using lua -* 💪 Support large instance migration -* 💖 Support restore mode and sync mode +* 💪 Supports large instance migration +* 💖 Supports restore mode and sync mode +* ☁️ Supports ElastiCache and Aliyun Redis ![image.png](https://s2.loli.net/2022/06/30/vU346lVBrNofKzu.png) diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 0e2aed3..c9c5f64 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -65,7 +65,7 @@ func main() { source := &config.Config.Source var theReader reader.Reader if source.Type == "sync" { - theReader = reader.NewPSyncReader(source.Address, source.Username, source.Password, source.IsTLS) + theReader = reader.NewPSyncReader(source.Address, source.Username, source.Password, source.IsTLS, source.ElastiCachePSync) } else if source.Type == "restore" { theReader = reader.NewRDBReader(source.RDBFilePath) } else { diff --git a/internal/config/config.go b/internal/config/config.go index 1bbdf28..55de7bd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,12 +10,13 @@ import ( ) type tomlSource struct { - Type string `toml:"type"` - Address string `toml:"address"` - Username string `toml:"username"` - Password string `toml:"password"` - IsTLS bool `toml:"tls"` - RDBFilePath string `toml:"rdb_file_path"` + Type string `toml:"type"` + Address string `toml:"address"` + Username string `toml:"username"` + Password string `toml:"password"` + IsTLS bool `toml:"tls"` + ElastiCachePSync string `toml:"elasticache_psync"` + RDBFilePath string `toml:"rdb_file_path"` } type tomlTarget struct { diff --git a/internal/reader/psync.go b/internal/reader/psync.go index dc8f45a..27b8903 100644 --- a/internal/reader/psync.go +++ b/internal/reader/psync.go @@ -22,23 +22,19 @@ type psyncReader struct { ch chan *entry.Entry DbId int - rd *bufio.Reader - receivedOffset int64 + rd *bufio.Reader + receivedOffset int64 + elastiCachePSync string } -func NewPSyncReader(address string, username string, password string, isTls bool) Reader { +func NewPSyncReader(address string, username string, password string, isTls bool, ElastiCachePSync string) Reader { r := new(psyncReader) - r.init(address, username, password, isTls) - return r -} - -func (r *psyncReader) init(address string, username string, password string, isTls bool) { r.address = address - standalone := client.NewRedisClient(address, username, password, isTls) - - r.client = standalone + r.elastiCachePSync = ElastiCachePSync + r.client = client.NewRedisClient(address, username, password, isTls) r.rd = r.client.BufioReader() log.Infof("psyncReader connected to redis successful. address=[%s]", address) + return r } func (r *psyncReader) StartRead() chan *entry.Entry { @@ -86,6 +82,9 @@ func (r *psyncReader) saveRDB() { // send psync argv = []string{"PSYNC", "?", "-1"} + if r.elastiCachePSync != "" { + argv = []string{r.elastiCachePSync, "?", "-1"} + } r.client.Send(argv...) log.Infof("send %v", argv) // format: \n\n\n$\r\n @@ -98,8 +97,16 @@ func (r *psyncReader) saveRDB() { if b == '\n' { continue } + if b == '-' { + reply, err := r.rd.ReadString('\n') + if err != nil { + log.PanicError(err) + } + reply = strings.TrimSpace(reply) + log.Panicf("psync error. address=[%s], reply=[%s]", r.address, reply) + } if b != '+' { - log.Panicf("invalid rdb format. address=[%s], b=[%s]", r.address, string(b)) + log.Panicf("invalid psync reply. address=[%s], b=[%s]", r.address, string(b)) } break } diff --git a/redis-shake.toml b/redis-shake.toml index 43b65df..ddc5b7b 100644 --- a/redis-shake.toml +++ b/redis-shake.toml @@ -4,12 +4,13 @@ address = "127.0.0.1:6379" username = "" # keep empty if not using ACL password = "" # keep empty if no authentication is required tls = false +elasticache_psync = "" # using when source is ElastiCache. ref: https://github.com/alibaba/RedisShake/issues/373 [target] -type = "cluster" # standalone or cluster +type = "standalone" # standalone or cluster # When the target is a cluster, write the address of one of the nodes. # redis-shake will obtain other nodes through the `cluster nodes` command. -address = "127.0.0.1:30001" +address = "127.0.0.1:6380" username = "" # keep empty if not using ACL password = "" # keep empty if no authentication is required tls = false diff --git a/test/assets/empty.toml b/test/assets/empty.toml index 43b65df..98517ef 100644 --- a/test/assets/empty.toml +++ b/test/assets/empty.toml @@ -4,6 +4,7 @@ address = "127.0.0.1:6379" username = "" # keep empty if not using ACL password = "" # keep empty if no authentication is required tls = false +elasticache_psync = "" # using when source is ElastiCache. ref: https://github.com/alibaba/RedisShake/issues/373 [target] type = "cluster" # standalone or cluster