From e43689343aa046b19965854b5794828b1484e457 Mon Sep 17 00:00:00 2001 From: zmy Date: Fri, 11 Mar 2022 10:53:18 +0800 Subject: [PATCH] optimize the ticker time and configurable support Co-authored-by: Mingyu Zhang --- src/redis-shake/configure/configure.go | 1 + src/redis-shake/dbSync/syncIncrease.go | 18 ++++++++++-------- src/redis-shake/main/sanitize.go | 5 +++++ 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index 6a48013..13b672f 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -50,6 +50,7 @@ type Configuration struct { SenderSize uint64 `config:"sender.size"` SenderCount uint `config:"sender.count"` SenderDelayChannelSize uint `config:"sender.delay_channel_size"` + SenderTickerMs int `config:"sender.ticker_ms"` KeepAlive uint `config:"keep_alive"` PidPath string `config:"pid_path"` ScanKeyNumber uint32 `config:"scan.key_number"` diff --git a/src/redis-shake/dbSync/syncIncrease.go b/src/redis-shake/dbSync/syncIncrease.go index a2cda84..74d3c5c 100644 --- a/src/redis-shake/dbSync/syncIncrease.go +++ b/src/redis-shake/dbSync/syncIncrease.go @@ -4,22 +4,24 @@ import ( "bufio" "bytes" "fmt" - "github.com/alibaba/RedisShake/pkg/libs/atomic2" - "github.com/alibaba/RedisShake/pkg/libs/log" - "github.com/alibaba/RedisShake/pkg/redis" - "github.com/alibaba/RedisShake/redis-shake/common" - "github.com/alibaba/RedisShake/redis-shake/configure" - "github.com/alibaba/RedisShake/redis-shake/filter" "io" "net" "strconv" "strings" "time" + "github.com/alibaba/RedisShake/pkg/libs/atomic2" + "github.com/alibaba/RedisShake/pkg/libs/log" + "github.com/alibaba/RedisShake/pkg/redis" + utils "github.com/alibaba/RedisShake/redis-shake/common" + conf "github.com/alibaba/RedisShake/redis-shake/configure" + "github.com/alibaba/RedisShake/redis-shake/filter" + "github.com/alibaba/RedisShake/redis-shake/metric" - redigo "github.com/garyburd/redigo/redis" "unsafe" + + redigo "github.com/garyburd/redigo/redis" ) func (ds *DbSyncer) syncCommand(reader *bufio.Reader, target []string, authType, passwd string, tlsEnable bool, tlsSkipVerify bool, dbid int) { @@ -304,7 +306,7 @@ func (ds *DbSyncer) sendTargetCommand(c redigo.Conn) { checkpointRunId := fmt.Sprintf("%s-%s", ds.source, utils.CheckpointRunId) checkpointVersion := fmt.Sprintf("%s-%s", ds.source, utils.CheckpointVersion) checkpointOffset := fmt.Sprintf("%s-%s", ds.source, utils.CheckpointOffset) - ticker := time.NewTicker(500 * time.Millisecond) + ticker := time.NewTicker(time.Duration(conf.Options.SenderTickerMs) * time.Millisecond) // mark whether the given db has already send runId, no need to send run-id each time. runIdMap := make(map[int]struct{}) diff --git a/src/redis-shake/main/sanitize.go b/src/redis-shake/main/sanitize.go index 92cd5b6..80ee52a 100644 --- a/src/redis-shake/main/sanitize.go +++ b/src/redis-shake/main/sanitize.go @@ -304,6 +304,11 @@ func SanitizeOptions(tp string) error { conf.Options.SenderDelayChannelSize = 32 } + //ticker (0,100s], default 20ms + if conf.Options.SenderTickerMs <= 0 || conf.Options.SenderTickerMs > 100000 { + conf.Options.SenderTickerMs = 20 + } + // [0, 100 million] if conf.Options.Qps < 0 || conf.Options.Qps >= 100000000 { return fmt.Errorf("qps[%v] should in (0, 100000000]", conf.Options.Qps)