Add Prometheus metrics (#103)

* support prometheus metrics

* add namespace to prometheus metrics

* reuse previous average delay calculate function

* use const dbSyncerLabelName

* update vendor.json
v4
Zheng Dayu 6 years ago committed by Vinllen Chen
parent caec047d10
commit f8df138940
  1. 42
      src/redis-shake/metric/metric.go
  2. 95
      src/redis-shake/metric/prometheus_metrics.go
  3. 17
      src/redis-shake/restful/restful.go
  4. 16
      src/redis-shake/sync.go
  5. 81
      src/vendor/vendor.json

@ -1,15 +1,16 @@
package metric
import (
"time"
"sync/atomic"
"fmt"
"encoding/json"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"pkg/libs/log"
"redis-shake/base"
"redis-shake/configure"
"pkg/libs/log"
"sync"
)
const (
@ -18,7 +19,7 @@ const (
var (
MetricMap = new(sync.Map)
runner base.Runner
runner base.Runner
)
type Op interface {
@ -48,7 +49,7 @@ func (p *Percent) Get(returnString bool) interface{} {
if returnString {
return fmt.Sprintf("%.02f", float64(dividend)/float64(divisor))
} else {
return dividend / divisor
return float64(dividend) / float64(divisor)
}
}
}
@ -131,7 +132,7 @@ func (m *Metric) run() {
tick := 0
for range time.NewTicker(1 * time.Second).C {
tick++
if tick % updateInterval == 0 && conf.Options.MetricPrintLog {
if tick%updateInterval == 0 && conf.Options.MetricPrintLog {
stat := NewMetricRest()
if opts, err := json.Marshal(stat); err != nil {
log.Infof("marshal metric stat error[%v]", err)
@ -145,8 +146,9 @@ func (m *Metric) run() {
}()
}
func (m *Metric) AddPullCmdCount(val uint64) {
func (m *Metric) AddPullCmdCount(dbSyncerID int, val uint64) {
m.PullCmdCount.Set(val)
pullCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
}
func (m *Metric) GetPullCmdCount() interface{} {
@ -157,8 +159,9 @@ func (m *Metric) GetPullCmdCountTotal() interface{} {
return atomic.LoadUint64(&m.PullCmdCount.Total)
}
func (m *Metric) AddBypassCmdCount(val uint64) {
func (m *Metric) AddBypassCmdCount(dbSyncerID int, val uint64) {
m.BypassCmdCount.Set(val)
bypassCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
}
func (m *Metric) GetBypassCmdCount() interface{} {
@ -169,8 +172,9 @@ func (m *Metric) GetBypassCmdCountTotal() interface{} {
return atomic.LoadUint64(&m.BypassCmdCount.Total)
}
func (m *Metric) AddPushCmdCount(val uint64) {
func (m *Metric) AddPushCmdCount(dbSyncerID int, val uint64) {
m.PushCmdCount.Set(val)
pushCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
}
func (m *Metric) GetPushCmdCount() interface{} {
@ -181,8 +185,9 @@ func (m *Metric) GetPushCmdCountTotal() interface{} {
return atomic.LoadUint64(&m.PushCmdCount.Total)
}
func (m *Metric) AddSuccessCmdCount(val uint64) {
func (m *Metric) AddSuccessCmdCount(dbSyncerID int, val uint64) {
m.SuccessCmdCount.Set(val)
successCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
}
func (m *Metric) GetSuccessCmdCount() interface{} {
@ -193,8 +198,9 @@ func (m *Metric) GetSuccessCmdCountTotal() interface{} {
return atomic.LoadUint64(&m.SuccessCmdCount.Total)
}
func (m *Metric) AddFailCmdCount(val uint64) {
func (m *Metric) AddFailCmdCount(dbSyncerID int, val uint64) {
m.FailCmdCount.Set(val)
failCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
}
func (m *Metric) GetFailCmdCount() interface{} {
@ -218,9 +224,14 @@ func (m *Metric) GetAvgDelay() interface{} {
return m.AvgDelay.Get(true)
}
func (m *Metric) AddNetworkFlow(val uint64) {
func (m *Metric) GetAvgDelayFloat64() float64 {
return m.AvgDelay.Get(false).(float64)
}
func (m *Metric) AddNetworkFlow(dbSyncerID int, val uint64) {
// atomic.AddUint64(&m.NetworkFlow.Value, val)
m.NetworkFlow.Set(val)
networkFlowTotalInBytes.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
}
func (m *Metric) GetNetworkFlow() interface{} {
@ -231,8 +242,9 @@ func (m *Metric) GetNetworkFlowTotal() interface{} {
return atomic.LoadUint64(&m.NetworkFlow.Total)
}
func (m *Metric) SetFullSyncProgress(val uint64) {
func (m *Metric) SetFullSyncProgress(dbSyncerID int, val uint64) {
m.FullSyncProgress = val
fullSyncProcessPercent.WithLabelValues(strconv.Itoa(dbSyncerID)).Set(float64(val))
}
func (m *Metric) GetFullSyncProgress() interface{} {

@ -0,0 +1,95 @@
package metric
import (
"strconv"
"redis-shake/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const (
metricNamespace = "redisshake"
dbSyncerLabelName = "db_syncer"
)
var (
pullCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "pull_cmd_count_total",
Help: "RedisShake pull redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
bypassCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "bypass_cmd_count_total",
Help: "RedisShake bypass redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
pushCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "push_cmd_count_total",
Help: "RedisShake push redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
successCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "success_cmd_count_total",
Help: "RedisShake push redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
failCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "fail_cmd_count_total",
Help: "RedisShake push redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
networkFlowTotalInBytes = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "network_flow_total_in_bytes",
Help: "RedisShake total network flow in total (byte)",
},
[]string{dbSyncerLabelName},
)
fullSyncProcessPercent = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "full_sync_process_percent",
Help: "RedisShake full sync process (%)",
},
[]string{dbSyncerLabelName},
)
averageDelayInMs = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "average_delay_in_ms",
Help: "RedisShake average delay (ms)",
},
[]string{dbSyncerLabelName},
)
)
// CalcPrometheusMetrics calculates some prometheus metrics e.g. average delay.
func CalcPrometheusMetrics() {
total := utils.GetTotalLink()
for i := 0; i < total; i++ {
val, ok := MetricMap.Load(i)
if !ok {
continue
}
singleMetric := val.(*Metric)
averageDelayInMs.WithLabelValues(strconv.Itoa(i)).Set(singleMetric.GetAvgDelayFloat64())
}
}

@ -1,14 +1,18 @@
package restful
import (
"github.com/gugemichael/nimo4go"
"redis-shake/metric"
"net/http"
"redis-shake/common"
"redis-shake/metric"
"github.com/gugemichael/nimo4go"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// register all rest api
func RestAPI() {
registerMetric() // register metric
registerMetric() // register metric
registerPrometheusMetric() // register prometheus metrics
// add below if has more
}
@ -17,3 +21,10 @@ func registerMetric() {
return metric.NewMetricRest()
})
}
func registerPrometheusMetric() {
http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
metric.CalcPrometheusMetrics()
promhttp.Handler().ServeHTTP(w, req)
})
}

@ -464,7 +464,7 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type
fmt.Fprintf(&b, " ignore=%-12d", stat.ignore)
}
log.Info(b.String())
metric.GetMetric(ds.id).SetFullSyncProgress(uint64(100 * stat.rbytes / nsize))
metric.GetMetric(ds.id).SetFullSyncProgress(ds.id, uint64(100*stat.rbytes/nsize))
}
log.Infof("dbSyncer[%v] sync rdb done", ds.id)
}
@ -535,9 +535,9 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
}
if err == nil {
metric.GetMetric(ds.id).AddSuccessCmdCount(1)
metric.GetMetric(ds.id).AddSuccessCmdCount(ds.id, 1)
} else {
metric.GetMetric(ds.id).AddFailCmdCount(1)
metric.GetMetric(ds.id).AddFailCmdCount(ds.id, 1)
if utils.CheckHandleNetError(err) {
log.Panicf("dbSyncer[%v] Event:NetErrorWhileReceive\tId:%s\tError:%s",
ds.id, conf.Options.Id, err.Error())
@ -589,7 +589,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
if scmd, argv, err = redis.ParseArgs(resp); err != nil {
log.PanicErrorf(err, "dbSyncer[%v] parse command arguments failed", ds.id)
} else {
metric.GetMetric(ds.id).AddPullCmdCount(1)
metric.GetMetric(ds.id).AddPullCmdCount(ds.id, 1)
// print debug log of send command
if conf.Options.LogLevel == utils.LogLevelAll {
@ -619,7 +619,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
if bypass || ignorecmd {
ds.nbypass.Incr()
// ds.SyncStat.BypassCmdCount.Incr()
metric.GetMetric(ds.id).AddBypassCmdCount(1)
metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd)
continue
}
@ -654,7 +654,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
ds.sendBuf <- cmdDetail{Cmd: "SELECT", Args: [][]byte{[]byte(strconv.FormatInt(int64(lastdb), 10))}}
} else {
ds.nbypass.Incr()
metric.GetMetric(ds.id).AddBypassCmdCount(1)
metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
}
continue
}
@ -682,8 +682,8 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
ds.forward.Incr()
ds.wbytes.Add(int64(length))
metric.GetMetric(ds.id).AddPushCmdCount(1)
metric.GetMetric(ds.id).AddNetworkFlow(uint64(length))
metric.GetMetric(ds.id).AddPushCmdCount(ds.id, 1)
metric.GetMetric(ds.id).AddNetworkFlow(ds.id, uint64(length))
sendId.Incr()
if conf.Options.Metric {

@ -8,6 +8,12 @@
"revision": "76bd05e8e22f9f8f5e1dd6d3a85b7951da7cce57",
"revisionTime": "2017-12-04T08:54:13Z"
},
{
"checksumSHA1": "0rido7hYHQtfq3UJzVT5LClLAWc=",
"path": "github.com/beorn7/perks/quantile",
"revision": "4b2b341e8d7715fae06375aa633dbb6e91b3fb46",
"revisionTime": "2019-04-14T22:11:40Z"
},
{
"checksumSHA1": "1/Kf0ihi6RtfNt0JjAtZLKvfqJY=",
"path": "github.com/cupcake/rdb",
@ -50,12 +56,24 @@
"revision": "569eae59ada904ea8db7a225c2b47165af08735a",
"revisionTime": "2018-04-04T16:07:26Z"
},
{
"checksumSHA1": "CGj8VcI/CpzxaNqlqpEVM7qElD4=",
"path": "github.com/golang/protobuf/proto",
"revision": "b285ee9cfc6c881bb20c0d8dc73370ea9b9ec90f",
"revisionTime": "2019-05-17T06:12:10Z"
},
{
"checksumSHA1": "63olncsMU/K9MMKxt4zuKSyoNg0=",
"path": "github.com/gugemichael/nimo4go",
"revision": "cbcfac21339d78efaed9ed09dcba7296d9976118",
"revisionTime": "2018-09-04T03:08:18Z"
},
{
"checksumSHA1": "bKMZjd2wPw13VwoE7mBeSv5djFA=",
"path": "github.com/matttproud/golang_protobuf_extensions/pbutil",
"revision": "c182affec369e30f25d3eb8cd8a478dee585ae7d",
"revisionTime": "2018-12-31T17:19:20Z"
},
{
"checksumSHA1": "W7TFJ3aRSUenZvYVnzrOZPoIOjs=",
"path": "github.com/nightlyone/lockfile",
@ -68,6 +86,66 @@
"revision": "5d4384ee4fb2527b0a1256a821ebfc92f91efefc",
"revisionTime": "2018-12-26T10:54:42Z"
},
{
"checksumSHA1": "x5CuZuHwidIOrzy/g2NQMDJMIxQ=",
"path": "github.com/prometheus/client_golang/prometheus",
"revision": "3d8379da8fc2309dd563f593f15a739054c098bc",
"revisionTime": "2019-06-17T18:27:57Z"
},
{
"checksumSHA1": "UBqhkyjCz47+S19MVTigxJ2VjVQ=",
"path": "github.com/prometheus/client_golang/prometheus/internal",
"revision": "3d8379da8fc2309dd563f593f15a739054c098bc",
"revisionTime": "2019-06-17T18:27:57Z"
},
{
"checksumSHA1": "BFMAsj5z3cYaNKx9fwHrazQRFvI=",
"path": "github.com/prometheus/client_golang/prometheus/promauto",
"revision": "3d8379da8fc2309dd563f593f15a739054c098bc",
"revisionTime": "2019-06-17T18:27:57Z"
},
{
"checksumSHA1": "V51yx4gq61QCD9clxnps792Eq2Y=",
"path": "github.com/prometheus/client_golang/prometheus/promhttp",
"revision": "3d8379da8fc2309dd563f593f15a739054c098bc",
"revisionTime": "2019-06-17T18:27:57Z"
},
{
"checksumSHA1": "V8xkqgmP66sq2ZW4QO5wi9a4oZE=",
"path": "github.com/prometheus/client_model/go",
"revision": "fd36f4220a901265f90734c3183c5f0c91daa0b8",
"revisionTime": "2019-01-29T23:31:27Z"
},
{
"checksumSHA1": "ljxJzXiQ7dNsmuRIUhqqP+qjRWc=",
"path": "github.com/prometheus/common/expfmt",
"revision": "31bed53e4047fd6c510e43a941f90cb31be0972a",
"revisionTime": "2019-06-17T21:11:42Z"
},
{
"checksumSHA1": "1Mhfofk+wGZ94M0+Bd98K8imPD4=",
"path": "github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg",
"revision": "31bed53e4047fd6c510e43a941f90cb31be0972a",
"revisionTime": "2019-06-17T21:11:42Z"
},
{
"checksumSHA1": "ccmMs+h9Jo8kE7izqsUkWShD4d0=",
"path": "github.com/prometheus/common/model",
"revision": "31bed53e4047fd6c510e43a941f90cb31be0972a",
"revisionTime": "2019-06-17T21:11:42Z"
},
{
"checksumSHA1": "L4ayL3hh390Bk5vmXgtNGo9d/rc=",
"path": "github.com/prometheus/procfs",
"revision": "90b65b6334013b0a1cfce27ce5c79feb7da2f77a",
"revisionTime": "2019-06-14T00:01:41Z"
},
{
"checksumSHA1": "Kmjs49lbjGmlgUPx3pks0tVDed0=",
"path": "github.com/prometheus/procfs/internal/fs",
"revision": "90b65b6334013b0a1cfce27ce5c79feb7da2f77a",
"revisionTime": "2019-06-14T00:01:41Z"
},
{
"checksumSHA1": "Tutue3nEgM/87jitUcYv6ODwyNE=",
"path": "github.com/satori/go.uuid",
@ -92,6 +170,5 @@
"revision": "a96e63847dc3c67d17befa69c303767e2f84e54f",
"revisionTime": "2017-05-31T16:03:50Z"
}
],
"rootPath": "/Users/vinllen-ali/code/redis-shake-inner/redis-shake/src"
]
}

Loading…
Cancel
Save