From 7b3038c0f3985f8d02a6e2558f1b821b043fd6ad Mon Sep 17 00:00:00 2001 From: Jacky Wu Date: Sun, 26 Sep 2021 11:36:59 +0800 Subject: [PATCH] feat: add fakeSlaveDelayOffset metric and source master offset info. (#380) --- src/redis-shake/common/utils.go | 24 +++++++++++++++---- src/redis-shake/dbSync/dbSyncer.go | 13 +++++----- src/redis-shake/dbSync/status.go | 17 ++++++------- src/redis-shake/dbSync/syncIncrease.go | 25 +++++++++++++++++--- src/redis-shake/metric/metric.go | 14 +++++++++-- src/redis-shake/metric/prometheus_metrics.go | 10 +++++++- src/redis-shake/metric/variables.go | 5 +++- 7 files changed, 82 insertions(+), 26 deletions(-) diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index c832c9a..62f4a1d 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -1021,10 +1021,14 @@ func CheckHandleNetError(err error) bool { return false } -func GetFakeSlaveOffset(c redigo.Conn) (string, error) { - infoStr, err := Bytes(c.Do("info", "Replication")) +func GetFakeSlaveOffset(c redigo.Conn) (fakeSlaveOffset string, masterOffset string, err error) { + var ( + infoStr []byte + ) + + infoStr, err = Bytes(c.Do("info", "Replication")) if err != nil { - return "", err + return } kv := ParseRedisInfo(infoStr) @@ -1034,12 +1038,22 @@ func GetFakeSlaveOffset(c redigo.Conn) (string, error) { list := strings.Split(v, ",") for _, item := range list { if strings.HasPrefix(item, "offset=") { - return strings.Split(item, "=")[1], nil + fakeSlaveOffset = strings.Split(item, "=")[1] + break } } } + if strings.Contains(k, "master_repl_offset") { + masterOffset = v + } } - return "", fmt.Errorf("OffsetNotFoundInInfo") + + if len(fakeSlaveOffset) == 0 { + err = fmt.Errorf("OffsetNotFoundInInfo") + return + } + + return } func GetLocalIp(preferdInterfaces []string) (ip string, interfaceName string, err error) { diff --git a/src/redis-shake/dbSync/dbSyncer.go b/src/redis-shake/dbSync/dbSyncer.go index 6280548..e688b0b 100644 --- a/src/redis-shake/dbSync/dbSyncer.go +++ b/src/redis-shake/dbSync/dbSyncer.go @@ -69,12 +69,13 @@ type DbSyncer struct { func (ds *DbSyncer) GetExtraInfo() map[string]interface{} { return map[string]interface{}{ - "SourceAddress": ds.source, - "TargetAddress": ds.target, - "SenderBufCount": len(ds.sendBuf), - "ProcessingCmdCount": len(ds.delayChannel), - "TargetDBOffset": ds.stat.targetOffset.Get(), - "SourceDBOffset": ds.stat.sourceOffset, + "SourceAddress": ds.source, + "TargetAddress": ds.target, + "SenderBufCount": len(ds.sendBuf), + "ProcessingCmdCount": len(ds.delayChannel), + "TargetDBOffset": ds.stat.targetOffset.Get(), + "SourceMasterDBOffset": ds.stat.sourceMasterOffset.Get(), + "SourceDBOffset": ds.stat.sourceOffset.Get(), } } diff --git a/src/redis-shake/dbSync/status.go b/src/redis-shake/dbSync/status.go index 286bc0d..d8f1946 100644 --- a/src/redis-shake/dbSync/status.go +++ b/src/redis-shake/dbSync/status.go @@ -3,14 +3,15 @@ package dbSync import "github.com/alibaba/RedisShake/pkg/libs/atomic2" type Status struct { - rBytes atomic2.Int64 // read bytes - wBytes atomic2.Int64 // write bytes - wCommands atomic2.Int64 // write commands (forward) - keys atomic2.Int64 // total key number (nentry) - fullSyncFilter atomic2.Int64 // filtered keys in full sync (ignore) - incrSyncFilter atomic2.Int64 // filtered keys in increase sync (nbypass) - targetOffset atomic2.Int64 // target offset - sourceOffset int64 // source offset + rBytes atomic2.Int64 // read bytes + wBytes atomic2.Int64 // write bytes + wCommands atomic2.Int64 // write commands (forward) + keys atomic2.Int64 // total key number (nentry) + fullSyncFilter atomic2.Int64 // filtered keys in full sync (ignore) + incrSyncFilter atomic2.Int64 // filtered keys in increase sync (nbypass) + targetOffset atomic2.Int64 // target offset + sourceOffset atomic2.Int64 // source offset + sourceMasterOffset atomic2.Int64 // source master offset } func (s *Status) Stat() *syncerStat { diff --git a/src/redis-shake/dbSync/syncIncrease.go b/src/redis-shake/dbSync/syncIncrease.go index de51dec..66e9807 100644 --- a/src/redis-shake/dbSync/syncIncrease.go +++ b/src/redis-shake/dbSync/syncIncrease.go @@ -66,7 +66,7 @@ func (ds *DbSyncer) fetchOffset() { incrSyncReadeTimeout, incrSyncReadeTimeout, false, conf.Options.SourceTLSEnable) ticker := time.NewTicker(10 * time.Second) for range ticker.C { - offset, err := utils.GetFakeSlaveOffset(srcConn) + slaveOffset, masterOffset, err := utils.GetFakeSlaveOffset(srcConn) if err != nil { // log.PurePrintf("%s\n", NewLogItem("GetFakeSlaveOffsetFail", "WARN", NewErrorLogDetail("", err.Error()))) log.Warnf("DbSyncer[%d] Event:GetFakeSlaveOffsetFail\tId:%s\tWarn:%s", @@ -76,16 +76,35 @@ func (ds *DbSyncer) fetchOffset() { if err == io.EOF { srcConn = utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType, ds.sourcePassword, incrSyncReadeTimeout, incrSyncReadeTimeout, false, conf.Options.SourceTLSEnable) + log.Warnf("DbSyncer[%d] Event:GetFakeSlaveOffsetReconn\tId:%s\t", + ds.id, conf.Options.Id) } else if _, ok := err.(net.Error); ok { srcConn = utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType, ds.sourcePassword, incrSyncReadeTimeout, incrSyncReadeTimeout, false, conf.Options.SourceTLSEnable) + log.Warnf("DbSyncer[%d] Event:GetFakeSlaveOffsetReconn\tId:%s\t", + ds.id, conf.Options.Id) } } else { // ds.SyncStat.SetOffset(offset) - if ds.stat.sourceOffset, err = strconv.ParseInt(offset, 10, 64); err != nil { - log.Errorf("DbSyncer[%d] Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s", + var ( + sourceOffset, sourceMasterOffset int64 + ) + + if sourceOffset, err = strconv.ParseInt(slaveOffset, 10, 64); err != nil { + log.Errorf("DbSyncer[%d] Event:GetFakeSlaveOffsetFail\tId:%s\tError: parse slave offset failed(%s)", ds.id, conf.Options.Id, err.Error()) + } else { + ds.stat.sourceOffset.Set(sourceOffset) } + + if sourceMasterOffset, err = strconv.ParseInt(masterOffset, 10, 64); err != nil { + log.Errorf("DbSyncer[%d] Event:GetFakeSlaveOffsetFail\tId:%s\tError: parse master offset failed(%s)", + ds.id, conf.Options.Id, err.Error()) + } else { + ds.stat.sourceMasterOffset.Set(sourceMasterOffset) + } + + metric.GetMetric(ds.id).SetFakeSlaveDelayOffset(ds.id, uint64(sourceMasterOffset)-uint64(sourceOffset)) } } diff --git a/src/redis-shake/metric/metric.go b/src/redis-shake/metric/metric.go index aa51609..1f116cb 100644 --- a/src/redis-shake/metric/metric.go +++ b/src/redis-shake/metric/metric.go @@ -11,7 +11,7 @@ import ( "github.com/alibaba/RedisShake/pkg/libs/log" "github.com/alibaba/RedisShake/redis-shake/base" - "github.com/alibaba/RedisShake/redis-shake/configure" + conf "github.com/alibaba/RedisShake/redis-shake/configure" ) const ( @@ -90,7 +90,8 @@ type Metric struct { AvgDelay Percent // ms NetworkFlow Combine // +speed - FullSyncProgress uint64 + FullSyncProgress uint64 + FakeSlaveDelayOffset uint64 } func CreateMetric(r base.Runner) { @@ -251,6 +252,15 @@ func (m *Metric) SetFullSyncProgress(dbSyncerID int, val uint64) { fullSyncProcessPercent.WithLabelValues(strconv.Itoa(dbSyncerID)).Set(float64(val)) } +func (m *Metric) SetFakeSlaveDelayOffset(dbSyncerID int, val uint64) { + m.FakeSlaveDelayOffset = val + fakeSlaveDelayOffset.WithLabelValues(strconv.Itoa(dbSyncerID)).Set(float64(val)) +} + +func (m *Metric) GetFakeSlaveDelayOffset() interface{} { + return m.FakeSlaveDelayOffset +} + func (m *Metric) GetFullSyncProgress() interface{} { return m.FullSyncProgress } diff --git a/src/redis-shake/metric/prometheus_metrics.go b/src/redis-shake/metric/prometheus_metrics.go index b471e5a..1f9dd65 100644 --- a/src/redis-shake/metric/prometheus_metrics.go +++ b/src/redis-shake/metric/prometheus_metrics.go @@ -3,7 +3,7 @@ package metric import ( "strconv" - "github.com/alibaba/RedisShake/redis-shake/common" + utils "github.com/alibaba/RedisShake/redis-shake/common" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -71,6 +71,14 @@ var ( }, []string{dbSyncerLabelName}, ) + fakeSlaveDelayOffset = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "fake_slave_delay_offset", + Help: "RedisShake fake slave delay offset", + }, + []string{dbSyncerLabelName}, + ) averageDelayInMs = promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: metricNamespace, diff --git a/src/redis-shake/metric/variables.go b/src/redis-shake/metric/variables.go index 237f984..07f7a54 100644 --- a/src/redis-shake/metric/variables.go +++ b/src/redis-shake/metric/variables.go @@ -2,8 +2,9 @@ package metric import ( "fmt" + "github.com/alibaba/RedisShake/redis-shake/base" - "github.com/alibaba/RedisShake/redis-shake/common" + utils "github.com/alibaba/RedisShake/redis-shake/common" ) type MetricRest struct { @@ -28,6 +29,7 @@ type MetricRest struct { ProcessingCmdCount interface{} // length of delay channel TargetDBOffset interface{} // target redis offset SourceDBOffset interface{} // source redis offset + SourceMasterDBOffset interface{} // source redis master reply offset SourceAddress interface{} TargetAddress interface{} Details interface{} // other details info @@ -78,6 +80,7 @@ func NewMetricRest() []MetricRest { ProcessingCmdCount: detailMap["ProcessingCmdCount"], TargetDBOffset: detailMap["TargetDBOffset"], SourceDBOffset: detailMap["SourceDBOffset"], + SourceMasterDBOffset: detailMap["SourceMasterDBOffset"], SourceAddress: detailMap["SourceAddress"], TargetAddress: detailMap["TargetAddress"], Details: detailMap["Details"],