feat: add fakeSlaveDelayOffset metric and source master offset info. (#380)

v4
Jacky Wu 3 years ago committed by GitHub
parent 753b5d842f
commit 7b3038c0f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      src/redis-shake/common/utils.go
  2. 3
      src/redis-shake/dbSync/dbSyncer.go
  3. 3
      src/redis-shake/dbSync/status.go
  4. 25
      src/redis-shake/dbSync/syncIncrease.go
  5. 12
      src/redis-shake/metric/metric.go
  6. 10
      src/redis-shake/metric/prometheus_metrics.go
  7. 5
      src/redis-shake/metric/variables.go

@ -1021,10 +1021,14 @@ func CheckHandleNetError(err error) bool {
return false
}
func GetFakeSlaveOffset(c redigo.Conn) (string, error) {
infoStr, err := Bytes(c.Do("info", "Replication"))
func GetFakeSlaveOffset(c redigo.Conn) (fakeSlaveOffset string, masterOffset string, err error) {
var (
infoStr []byte
)
infoStr, err = Bytes(c.Do("info", "Replication"))
if err != nil {
return "", err
return
}
kv := ParseRedisInfo(infoStr)
@ -1034,12 +1038,22 @@ func GetFakeSlaveOffset(c redigo.Conn) (string, error) {
list := strings.Split(v, ",")
for _, item := range list {
if strings.HasPrefix(item, "offset=") {
return strings.Split(item, "=")[1], nil
fakeSlaveOffset = strings.Split(item, "=")[1]
break
}
}
}
if strings.Contains(k, "master_repl_offset") {
masterOffset = v
}
}
return "", fmt.Errorf("OffsetNotFoundInInfo")
if len(fakeSlaveOffset) == 0 {
err = fmt.Errorf("OffsetNotFoundInInfo")
return
}
return
}
func GetLocalIp(preferdInterfaces []string) (ip string, interfaceName string, err error) {

@ -74,7 +74,8 @@ func (ds *DbSyncer) GetExtraInfo() map[string]interface{} {
"SenderBufCount": len(ds.sendBuf),
"ProcessingCmdCount": len(ds.delayChannel),
"TargetDBOffset": ds.stat.targetOffset.Get(),
"SourceDBOffset": ds.stat.sourceOffset,
"SourceMasterDBOffset": ds.stat.sourceMasterOffset.Get(),
"SourceDBOffset": ds.stat.sourceOffset.Get(),
}
}

@ -10,7 +10,8 @@ type Status struct {
fullSyncFilter atomic2.Int64 // filtered keys in full sync (ignore)
incrSyncFilter atomic2.Int64 // filtered keys in increase sync (nbypass)
targetOffset atomic2.Int64 // target offset
sourceOffset int64 // source offset
sourceOffset atomic2.Int64 // source offset
sourceMasterOffset atomic2.Int64 // source master offset
}
func (s *Status) Stat() *syncerStat {

@ -66,7 +66,7 @@ func (ds *DbSyncer) fetchOffset() {
incrSyncReadeTimeout, incrSyncReadeTimeout, false, conf.Options.SourceTLSEnable)
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
offset, err := utils.GetFakeSlaveOffset(srcConn)
slaveOffset, masterOffset, err := utils.GetFakeSlaveOffset(srcConn)
if err != nil {
// log.PurePrintf("%s\n", NewLogItem("GetFakeSlaveOffsetFail", "WARN", NewErrorLogDetail("", err.Error())))
log.Warnf("DbSyncer[%d] Event:GetFakeSlaveOffsetFail\tId:%s\tWarn:%s",
@ -76,16 +76,35 @@ func (ds *DbSyncer) fetchOffset() {
if err == io.EOF {
srcConn = utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType,
ds.sourcePassword, incrSyncReadeTimeout, incrSyncReadeTimeout, false, conf.Options.SourceTLSEnable)
log.Warnf("DbSyncer[%d] Event:GetFakeSlaveOffsetReconn\tId:%s\t",
ds.id, conf.Options.Id)
} else if _, ok := err.(net.Error); ok {
srcConn = utils.OpenRedisConnWithTimeout([]string{ds.source}, conf.Options.SourceAuthType,
ds.sourcePassword, incrSyncReadeTimeout, incrSyncReadeTimeout, false, conf.Options.SourceTLSEnable)
log.Warnf("DbSyncer[%d] Event:GetFakeSlaveOffsetReconn\tId:%s\t",
ds.id, conf.Options.Id)
}
} else {
// ds.SyncStat.SetOffset(offset)
if ds.stat.sourceOffset, err = strconv.ParseInt(offset, 10, 64); err != nil {
log.Errorf("DbSyncer[%d] Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s",
var (
sourceOffset, sourceMasterOffset int64
)
if sourceOffset, err = strconv.ParseInt(slaveOffset, 10, 64); err != nil {
log.Errorf("DbSyncer[%d] Event:GetFakeSlaveOffsetFail\tId:%s\tError: parse slave offset failed(%s)",
ds.id, conf.Options.Id, err.Error())
} else {
ds.stat.sourceOffset.Set(sourceOffset)
}
if sourceMasterOffset, err = strconv.ParseInt(masterOffset, 10, 64); err != nil {
log.Errorf("DbSyncer[%d] Event:GetFakeSlaveOffsetFail\tId:%s\tError: parse master offset failed(%s)",
ds.id, conf.Options.Id, err.Error())
} else {
ds.stat.sourceMasterOffset.Set(sourceMasterOffset)
}
metric.GetMetric(ds.id).SetFakeSlaveDelayOffset(ds.id, uint64(sourceMasterOffset)-uint64(sourceOffset))
}
}

@ -11,7 +11,7 @@ import (
"github.com/alibaba/RedisShake/pkg/libs/log"
"github.com/alibaba/RedisShake/redis-shake/base"
"github.com/alibaba/RedisShake/redis-shake/configure"
conf "github.com/alibaba/RedisShake/redis-shake/configure"
)
const (
@ -91,6 +91,7 @@ type Metric struct {
NetworkFlow Combine // +speed
FullSyncProgress uint64
FakeSlaveDelayOffset uint64
}
func CreateMetric(r base.Runner) {
@ -251,6 +252,15 @@ func (m *Metric) SetFullSyncProgress(dbSyncerID int, val uint64) {
fullSyncProcessPercent.WithLabelValues(strconv.Itoa(dbSyncerID)).Set(float64(val))
}
func (m *Metric) SetFakeSlaveDelayOffset(dbSyncerID int, val uint64) {
m.FakeSlaveDelayOffset = val
fakeSlaveDelayOffset.WithLabelValues(strconv.Itoa(dbSyncerID)).Set(float64(val))
}
func (m *Metric) GetFakeSlaveDelayOffset() interface{} {
return m.FakeSlaveDelayOffset
}
func (m *Metric) GetFullSyncProgress() interface{} {
return m.FullSyncProgress
}

@ -3,7 +3,7 @@ package metric
import (
"strconv"
"github.com/alibaba/RedisShake/redis-shake/common"
utils "github.com/alibaba/RedisShake/redis-shake/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -71,6 +71,14 @@ var (
},
[]string{dbSyncerLabelName},
)
fakeSlaveDelayOffset = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "fake_slave_delay_offset",
Help: "RedisShake fake slave delay offset",
},
[]string{dbSyncerLabelName},
)
averageDelayInMs = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metricNamespace,

@ -2,8 +2,9 @@ package metric
import (
"fmt"
"github.com/alibaba/RedisShake/redis-shake/base"
"github.com/alibaba/RedisShake/redis-shake/common"
utils "github.com/alibaba/RedisShake/redis-shake/common"
)
type MetricRest struct {
@ -28,6 +29,7 @@ type MetricRest struct {
ProcessingCmdCount interface{} // length of delay channel
TargetDBOffset interface{} // target redis offset
SourceDBOffset interface{} // source redis offset
SourceMasterDBOffset interface{} // source redis master reply offset
SourceAddress interface{}
TargetAddress interface{}
Details interface{} // other details info
@ -78,6 +80,7 @@ func NewMetricRest() []MetricRest {
ProcessingCmdCount: detailMap["ProcessingCmdCount"],
TargetDBOffset: detailMap["TargetDBOffset"],
SourceDBOffset: detailMap["SourceDBOffset"],
SourceMasterDBOffset: detailMap["SourceMasterDBOffset"],
SourceAddress: detailMap["SourceAddress"],
TargetAddress: detailMap["TargetAddress"],
Details: detailMap["Details"],

Loading…
Cancel
Save