optimize the ticker time and configurable support

Co-authored-by: Mingyu Zhang <zhangmingyu800416@ihuman.com>
v4
zmy 3 years ago committed by GitHub
parent c3a342db54
commit e43689343a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/redis-shake/configure/configure.go
  2. 18
      src/redis-shake/dbSync/syncIncrease.go
  3. 5
      src/redis-shake/main/sanitize.go

@ -50,6 +50,7 @@ type Configuration struct {
SenderSize uint64 `config:"sender.size"` SenderSize uint64 `config:"sender.size"`
SenderCount uint `config:"sender.count"` SenderCount uint `config:"sender.count"`
SenderDelayChannelSize uint `config:"sender.delay_channel_size"` SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
SenderTickerMs int `config:"sender.ticker_ms"`
KeepAlive uint `config:"keep_alive"` KeepAlive uint `config:"keep_alive"`
PidPath string `config:"pid_path"` PidPath string `config:"pid_path"`
ScanKeyNumber uint32 `config:"scan.key_number"` ScanKeyNumber uint32 `config:"scan.key_number"`

@ -4,22 +4,24 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"fmt" "fmt"
"github.com/alibaba/RedisShake/pkg/libs/atomic2"
"github.com/alibaba/RedisShake/pkg/libs/log"
"github.com/alibaba/RedisShake/pkg/redis"
"github.com/alibaba/RedisShake/redis-shake/common"
"github.com/alibaba/RedisShake/redis-shake/configure"
"github.com/alibaba/RedisShake/redis-shake/filter"
"io" "io"
"net" "net"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/alibaba/RedisShake/pkg/libs/atomic2"
"github.com/alibaba/RedisShake/pkg/libs/log"
"github.com/alibaba/RedisShake/pkg/redis"
utils "github.com/alibaba/RedisShake/redis-shake/common"
conf "github.com/alibaba/RedisShake/redis-shake/configure"
"github.com/alibaba/RedisShake/redis-shake/filter"
"github.com/alibaba/RedisShake/redis-shake/metric" "github.com/alibaba/RedisShake/redis-shake/metric"
redigo "github.com/garyburd/redigo/redis"
"unsafe" "unsafe"
redigo "github.com/garyburd/redigo/redis"
) )
func (ds *DbSyncer) syncCommand(reader *bufio.Reader, target []string, authType, passwd string, tlsEnable bool, tlsSkipVerify bool, dbid int) { func (ds *DbSyncer) syncCommand(reader *bufio.Reader, target []string, authType, passwd string, tlsEnable bool, tlsSkipVerify bool, dbid int) {
@ -304,7 +306,7 @@ func (ds *DbSyncer) sendTargetCommand(c redigo.Conn) {
checkpointRunId := fmt.Sprintf("%s-%s", ds.source, utils.CheckpointRunId) checkpointRunId := fmt.Sprintf("%s-%s", ds.source, utils.CheckpointRunId)
checkpointVersion := fmt.Sprintf("%s-%s", ds.source, utils.CheckpointVersion) checkpointVersion := fmt.Sprintf("%s-%s", ds.source, utils.CheckpointVersion)
checkpointOffset := fmt.Sprintf("%s-%s", ds.source, utils.CheckpointOffset) checkpointOffset := fmt.Sprintf("%s-%s", ds.source, utils.CheckpointOffset)
ticker := time.NewTicker(500 * time.Millisecond) ticker := time.NewTicker(time.Duration(conf.Options.SenderTickerMs) * time.Millisecond)
// mark whether the given db has already send runId, no need to send run-id each time. // mark whether the given db has already send runId, no need to send run-id each time.
runIdMap := make(map[int]struct{}) runIdMap := make(map[int]struct{})

@ -304,6 +304,11 @@ func SanitizeOptions(tp string) error {
conf.Options.SenderDelayChannelSize = 32 conf.Options.SenderDelayChannelSize = 32
} }
//ticker (0,100s], default 20ms
if conf.Options.SenderTickerMs <= 0 || conf.Options.SenderTickerMs > 100000 {
conf.Options.SenderTickerMs = 20
}
// [0, 100 million] // [0, 100 million]
if conf.Options.Qps < 0 || conf.Options.Qps >= 100000000 { if conf.Options.Qps < 0 || conf.Options.Qps >= 100000000 {
return fmt.Errorf("qps[%v] should in (0, 100000000]", conf.Options.Qps) return fmt.Errorf("qps[%v] should in (0, 100000000]", conf.Options.Qps)

Loading…
Cancel
Save