diff --git a/src/redis-shake/metric/metric.go b/src/redis-shake/metric/metric.go index d085741..a5fbd35 100644 --- a/src/redis-shake/metric/metric.go +++ b/src/redis-shake/metric/metric.go @@ -1,15 +1,16 @@ package metric import ( - "time" - "sync/atomic" - "fmt" "encoding/json" + "fmt" + "strconv" + "sync" + "sync/atomic" + "time" + "pkg/libs/log" "redis-shake/base" "redis-shake/configure" - "pkg/libs/log" - "sync" ) const ( @@ -18,7 +19,7 @@ const ( var ( MetricMap = new(sync.Map) - runner base.Runner + runner base.Runner ) type Op interface { @@ -48,7 +49,7 @@ func (p *Percent) Get(returnString bool) interface{} { if returnString { return fmt.Sprintf("%.02f", float64(dividend)/float64(divisor)) } else { - return dividend / divisor + return float64(dividend) / float64(divisor) } } } @@ -131,7 +132,7 @@ func (m *Metric) run() { tick := 0 for range time.NewTicker(1 * time.Second).C { tick++ - if tick % updateInterval == 0 && conf.Options.MetricPrintLog { + if tick%updateInterval == 0 && conf.Options.MetricPrintLog { stat := NewMetricRest() if opts, err := json.Marshal(stat); err != nil { log.Infof("marshal metric stat error[%v]", err) @@ -145,8 +146,9 @@ func (m *Metric) run() { }() } -func (m *Metric) AddPullCmdCount(val uint64) { +func (m *Metric) AddPullCmdCount(dbSyncerID int, val uint64) { m.PullCmdCount.Set(val) + pullCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val)) } func (m *Metric) GetPullCmdCount() interface{} { @@ -157,8 +159,9 @@ func (m *Metric) GetPullCmdCountTotal() interface{} { return atomic.LoadUint64(&m.PullCmdCount.Total) } -func (m *Metric) AddBypassCmdCount(val uint64) { +func (m *Metric) AddBypassCmdCount(dbSyncerID int, val uint64) { m.BypassCmdCount.Set(val) + bypassCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val)) } func (m *Metric) GetBypassCmdCount() interface{} { @@ -169,8 +172,9 @@ func (m *Metric) GetBypassCmdCountTotal() interface{} { return atomic.LoadUint64(&m.BypassCmdCount.Total) } -func (m *Metric) AddPushCmdCount(val uint64) { +func (m *Metric) AddPushCmdCount(dbSyncerID int, val uint64) { m.PushCmdCount.Set(val) + pushCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val)) } func (m *Metric) GetPushCmdCount() interface{} { @@ -181,8 +185,9 @@ func (m *Metric) GetPushCmdCountTotal() interface{} { return atomic.LoadUint64(&m.PushCmdCount.Total) } -func (m *Metric) AddSuccessCmdCount(val uint64) { +func (m *Metric) AddSuccessCmdCount(dbSyncerID int, val uint64) { m.SuccessCmdCount.Set(val) + successCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val)) } func (m *Metric) GetSuccessCmdCount() interface{} { @@ -193,8 +198,9 @@ func (m *Metric) GetSuccessCmdCountTotal() interface{} { return atomic.LoadUint64(&m.SuccessCmdCount.Total) } -func (m *Metric) AddFailCmdCount(val uint64) { +func (m *Metric) AddFailCmdCount(dbSyncerID int, val uint64) { m.FailCmdCount.Set(val) + failCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val)) } func (m *Metric) GetFailCmdCount() interface{} { @@ -218,9 +224,14 @@ func (m *Metric) GetAvgDelay() interface{} { return m.AvgDelay.Get(true) } -func (m *Metric) AddNetworkFlow(val uint64) { +func (m *Metric) GetAvgDelayFloat64() float64 { + return float64(m.AvgDelay.Get(false).(int64)) +} + +func (m *Metric) AddNetworkFlow(dbSyncerID int, val uint64) { // atomic.AddUint64(&m.NetworkFlow.Value, val) m.NetworkFlow.Set(val) + networkFlowTotalInBytes.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val)) } func (m *Metric) GetNetworkFlow() interface{} { @@ -231,8 +242,9 @@ func (m *Metric) GetNetworkFlowTotal() interface{} { return atomic.LoadUint64(&m.NetworkFlow.Total) } -func (m *Metric) SetFullSyncProgress(val uint64) { +func (m *Metric) SetFullSyncProgress(dbSyncerID int, val uint64) { m.FullSyncProgress = val + fullSyncProcessPercent.WithLabelValues(strconv.Itoa(dbSyncerID)).Set(float64(val)) } func (m *Metric) GetFullSyncProgress() interface{} { diff --git a/src/redis-shake/metric/prometheus_metrics.go b/src/redis-shake/metric/prometheus_metrics.go new file mode 100644 index 0000000..573e3eb --- /dev/null +++ b/src/redis-shake/metric/prometheus_metrics.go @@ -0,0 +1,95 @@ +package metric + +import ( + "strconv" + + "redis-shake/common" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + metricNamespace = "redisshake" + dbSyncerLabelName = "db_syncer" +) + +var ( + pullCmdCountTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricNamespace, + Name: "pull_cmd_count_total", + Help: "RedisShake pull redis cmd count in total", + }, + []string{dbSyncerLabelName}, + ) + bypassCmdCountTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricNamespace, + Name: "bypass_cmd_count_total", + Help: "RedisShake bypass redis cmd count in total", + }, + []string{dbSyncerLabelName}, + ) + pushCmdCountTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricNamespace, + Name: "push_cmd_count_total", + Help: "RedisShake push redis cmd count in total", + }, + []string{dbSyncerLabelName}, + ) + successCmdCountTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricNamespace, + Name: "success_cmd_count_total", + Help: "RedisShake push redis cmd count in total", + }, + []string{dbSyncerLabelName}, + ) + failCmdCountTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricNamespace, + Name: "fail_cmd_count_total", + Help: "RedisShake push redis cmd count in total", + }, + []string{dbSyncerLabelName}, + ) + networkFlowTotalInBytes = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricNamespace, + Name: "network_flow_total_in_bytes", + Help: "RedisShake total network flow in total (byte)", + }, + []string{dbSyncerLabelName}, + ) + fullSyncProcessPercent = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "full_sync_process_percent", + Help: "RedisShake full sync process (%)", + }, + []string{dbSyncerLabelName}, + ) + averageDelayInMs = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricNamespace, + Name: "average_delay_in_ms", + Help: "RedisShake average delay (ms)", + }, + []string{dbSyncerLabelName}, + ) +) + +// CalcPrometheusMetrics calculates some prometheus metrics e.g. average delay. +func CalcPrometheusMetrics() { + total := utils.GetTotalLink() + for i := 0; i < total; i++ { + val, ok := MetricMap.Load(i) + if !ok { + continue + } + singleMetric := val.(*Metric) + averageDelayInMs.WithLabelValues(strconv.Itoa(i)).Set(singleMetric.GetAvgDelayFloat64()) + } +} diff --git a/src/redis-shake/restful/restful.go b/src/redis-shake/restful/restful.go index 06eb06f..ec3e87d 100644 --- a/src/redis-shake/restful/restful.go +++ b/src/redis-shake/restful/restful.go @@ -1,14 +1,18 @@ package restful import ( - "github.com/gugemichael/nimo4go" - "redis-shake/metric" + "net/http" "redis-shake/common" + "redis-shake/metric" + + "github.com/gugemichael/nimo4go" + "github.com/prometheus/client_golang/prometheus/promhttp" ) // register all rest api func RestAPI() { - registerMetric() // register metric + registerMetric() // register metric + registerPrometheusMetric() // register prometheus metrics // add below if has more } @@ -17,3 +21,10 @@ func registerMetric() { return metric.NewMetricRest() }) } + +func registerPrometheusMetric() { + http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) { + metric.CalcPrometheusMetrics() + promhttp.Handler().ServeHTTP(w, req) + }) +} diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 90f51ab..f53a83c 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -464,7 +464,7 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type fmt.Fprintf(&b, " ignore=%-12d", stat.ignore) } log.Info(b.String()) - metric.GetMetric(ds.id).SetFullSyncProgress(uint64(100 * stat.rbytes / nsize)) + metric.GetMetric(ds.id).SetFullSyncProgress(ds.id, uint64(100*stat.rbytes/nsize)) } log.Infof("dbSyncer[%v] sync rdb done", ds.id) } @@ -535,9 +535,9 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type } if err == nil { - metric.GetMetric(ds.id).AddSuccessCmdCount(1) + metric.GetMetric(ds.id).AddSuccessCmdCount(ds.id, 1) } else { - metric.GetMetric(ds.id).AddFailCmdCount(1) + metric.GetMetric(ds.id).AddFailCmdCount(ds.id, 1) if utils.CheckHandleNetError(err) { log.Panicf("dbSyncer[%v] Event:NetErrorWhileReceive\tId:%s\tError:%s", ds.id, conf.Options.Id, err.Error()) @@ -589,7 +589,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type if scmd, argv, err = redis.ParseArgs(resp); err != nil { log.PanicErrorf(err, "dbSyncer[%v] parse command arguments failed", ds.id) } else { - metric.GetMetric(ds.id).AddPullCmdCount(1) + metric.GetMetric(ds.id).AddPullCmdCount(ds.id, 1) // print debug log of send command if conf.Options.LogLevel == utils.LogLevelAll { @@ -619,7 +619,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type if bypass || ignorecmd { ds.nbypass.Incr() // ds.SyncStat.BypassCmdCount.Incr() - metric.GetMetric(ds.id).AddBypassCmdCount(1) + metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1) log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd) continue } @@ -654,7 +654,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type ds.sendBuf <- cmdDetail{Cmd: "SELECT", Args: [][]byte{[]byte(strconv.FormatInt(int64(lastdb), 10))}} } else { ds.nbypass.Incr() - metric.GetMetric(ds.id).AddBypassCmdCount(1) + metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1) } continue } @@ -682,8 +682,8 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type ds.forward.Incr() ds.wbytes.Add(int64(length)) - metric.GetMetric(ds.id).AddPushCmdCount(1) - metric.GetMetric(ds.id).AddNetworkFlow(uint64(length)) + metric.GetMetric(ds.id).AddPushCmdCount(ds.id, 1) + metric.GetMetric(ds.id).AddNetworkFlow(ds.id, uint64(length)) sendId.Incr() if conf.Options.Metric { diff --git a/src/vendor/vendor.json b/src/vendor/vendor.json index 2d4730a..82240fa 100644 --- a/src/vendor/vendor.json +++ b/src/vendor/vendor.json @@ -8,6 +8,12 @@ "revision": "76bd05e8e22f9f8f5e1dd6d3a85b7951da7cce57", "revisionTime": "2017-12-04T08:54:13Z" }, + { + "checksumSHA1": "0rido7hYHQtfq3UJzVT5LClLAWc=", + "path": "github.com/beorn7/perks/quantile", + "revision": "4b2b341e8d7715fae06375aa633dbb6e91b3fb46", + "revisionTime": "2019-04-14T22:11:40Z" + }, { "checksumSHA1": "1/Kf0ihi6RtfNt0JjAtZLKvfqJY=", "path": "github.com/cupcake/rdb", @@ -50,12 +56,24 @@ "revision": "569eae59ada904ea8db7a225c2b47165af08735a", "revisionTime": "2018-04-04T16:07:26Z" }, + { + "checksumSHA1": "CGj8VcI/CpzxaNqlqpEVM7qElD4=", + "path": "github.com/golang/protobuf/proto", + "revision": "b285ee9cfc6c881bb20c0d8dc73370ea9b9ec90f", + "revisionTime": "2019-05-17T06:12:10Z" + }, { "checksumSHA1": "63olncsMU/K9MMKxt4zuKSyoNg0=", "path": "github.com/gugemichael/nimo4go", "revision": "cbcfac21339d78efaed9ed09dcba7296d9976118", "revisionTime": "2018-09-04T03:08:18Z" }, + { + "checksumSHA1": "bKMZjd2wPw13VwoE7mBeSv5djFA=", + "path": "github.com/matttproud/golang_protobuf_extensions/pbutil", + "revision": "c182affec369e30f25d3eb8cd8a478dee585ae7d", + "revisionTime": "2018-12-31T17:19:20Z" + }, { "checksumSHA1": "W7TFJ3aRSUenZvYVnzrOZPoIOjs=", "path": "github.com/nightlyone/lockfile", @@ -68,6 +86,66 @@ "revision": "5d4384ee4fb2527b0a1256a821ebfc92f91efefc", "revisionTime": "2018-12-26T10:54:42Z" }, + { + "checksumSHA1": "x5CuZuHwidIOrzy/g2NQMDJMIxQ=", + "path": "github.com/prometheus/client_golang/prometheus", + "revision": "3d8379da8fc2309dd563f593f15a739054c098bc", + "revisionTime": "2019-06-17T18:27:57Z" + }, + { + "checksumSHA1": "UBqhkyjCz47+S19MVTigxJ2VjVQ=", + "path": "github.com/prometheus/client_golang/prometheus/internal", + "revision": "3d8379da8fc2309dd563f593f15a739054c098bc", + "revisionTime": "2019-06-17T18:27:57Z" + }, + { + "checksumSHA1": "BFMAsj5z3cYaNKx9fwHrazQRFvI=", + "path": "github.com/prometheus/client_golang/prometheus/promauto", + "revision": "3d8379da8fc2309dd563f593f15a739054c098bc", + "revisionTime": "2019-06-17T18:27:57Z" + }, + { + "checksumSHA1": "V51yx4gq61QCD9clxnps792Eq2Y=", + "path": "github.com/prometheus/client_golang/prometheus/promhttp", + "revision": "3d8379da8fc2309dd563f593f15a739054c098bc", + "revisionTime": "2019-06-17T18:27:57Z" + }, + { + "checksumSHA1": "V8xkqgmP66sq2ZW4QO5wi9a4oZE=", + "path": "github.com/prometheus/client_model/go", + "revision": "fd36f4220a901265f90734c3183c5f0c91daa0b8", + "revisionTime": "2019-01-29T23:31:27Z" + }, + { + "checksumSHA1": "ljxJzXiQ7dNsmuRIUhqqP+qjRWc=", + "path": "github.com/prometheus/common/expfmt", + "revision": "31bed53e4047fd6c510e43a941f90cb31be0972a", + "revisionTime": "2019-06-17T21:11:42Z" + }, + { + "checksumSHA1": "1Mhfofk+wGZ94M0+Bd98K8imPD4=", + "path": "github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg", + "revision": "31bed53e4047fd6c510e43a941f90cb31be0972a", + "revisionTime": "2019-06-17T21:11:42Z" + }, + { + "checksumSHA1": "ccmMs+h9Jo8kE7izqsUkWShD4d0=", + "path": "github.com/prometheus/common/model", + "revision": "31bed53e4047fd6c510e43a941f90cb31be0972a", + "revisionTime": "2019-06-17T21:11:42Z" + }, + { + "checksumSHA1": "L4ayL3hh390Bk5vmXgtNGo9d/rc=", + "path": "github.com/prometheus/procfs", + "revision": "90b65b6334013b0a1cfce27ce5c79feb7da2f77a", + "revisionTime": "2019-06-14T00:01:41Z" + }, + { + "checksumSHA1": "Kmjs49lbjGmlgUPx3pks0tVDed0=", + "path": "github.com/prometheus/procfs/internal/fs", + "revision": "90b65b6334013b0a1cfce27ce5c79feb7da2f77a", + "revisionTime": "2019-06-14T00:01:41Z" + }, { "checksumSHA1": "Tutue3nEgM/87jitUcYv6ODwyNE=", "path": "github.com/satori/go.uuid", @@ -92,6 +170,5 @@ "revision": "a96e63847dc3c67d17befa69c303767e2f84e54f", "revisionTime": "2017-05-31T16:03:50Z" } - ], - "rootPath": "/Users/vinllen-ali/code/redis-shake-inner/redis-shake/src" + ] }