merge github & bugfix: GetAvgDelayFloat64

v4
vinllen 6 years ago
commit e5280c4e83
  1. 42
      src/redis-shake/metric/metric.go
  2. 95
      src/redis-shake/metric/prometheus_metrics.go
  3. 17
      src/redis-shake/restful/restful.go
  4. 16
      src/redis-shake/sync.go
  5. 81
      src/vendor/vendor.json

@ -1,15 +1,16 @@
package metric package metric
import ( import (
"time"
"sync/atomic"
"fmt"
"encoding/json" "encoding/json"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"pkg/libs/log"
"redis-shake/base" "redis-shake/base"
"redis-shake/configure" "redis-shake/configure"
"pkg/libs/log"
"sync"
) )
const ( const (
@ -18,7 +19,7 @@ const (
var ( var (
MetricMap = new(sync.Map) MetricMap = new(sync.Map)
runner base.Runner runner base.Runner
) )
type Op interface { type Op interface {
@ -48,7 +49,7 @@ func (p *Percent) Get(returnString bool) interface{} {
if returnString { if returnString {
return fmt.Sprintf("%.02f", float64(dividend)/float64(divisor)) return fmt.Sprintf("%.02f", float64(dividend)/float64(divisor))
} else { } else {
return dividend / divisor return float64(dividend) / float64(divisor)
} }
} }
} }
@ -131,7 +132,7 @@ func (m *Metric) run() {
tick := 0 tick := 0
for range time.NewTicker(1 * time.Second).C { for range time.NewTicker(1 * time.Second).C {
tick++ tick++
if tick % updateInterval == 0 && conf.Options.MetricPrintLog { if tick%updateInterval == 0 && conf.Options.MetricPrintLog {
stat := NewMetricRest() stat := NewMetricRest()
if opts, err := json.Marshal(stat); err != nil { if opts, err := json.Marshal(stat); err != nil {
log.Infof("marshal metric stat error[%v]", err) log.Infof("marshal metric stat error[%v]", err)
@ -145,8 +146,9 @@ func (m *Metric) run() {
}() }()
} }
func (m *Metric) AddPullCmdCount(val uint64) { func (m *Metric) AddPullCmdCount(dbSyncerID int, val uint64) {
m.PullCmdCount.Set(val) m.PullCmdCount.Set(val)
pullCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
} }
func (m *Metric) GetPullCmdCount() interface{} { func (m *Metric) GetPullCmdCount() interface{} {
@ -157,8 +159,9 @@ func (m *Metric) GetPullCmdCountTotal() interface{} {
return atomic.LoadUint64(&m.PullCmdCount.Total) return atomic.LoadUint64(&m.PullCmdCount.Total)
} }
func (m *Metric) AddBypassCmdCount(val uint64) { func (m *Metric) AddBypassCmdCount(dbSyncerID int, val uint64) {
m.BypassCmdCount.Set(val) m.BypassCmdCount.Set(val)
bypassCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
} }
func (m *Metric) GetBypassCmdCount() interface{} { func (m *Metric) GetBypassCmdCount() interface{} {
@ -169,8 +172,9 @@ func (m *Metric) GetBypassCmdCountTotal() interface{} {
return atomic.LoadUint64(&m.BypassCmdCount.Total) return atomic.LoadUint64(&m.BypassCmdCount.Total)
} }
func (m *Metric) AddPushCmdCount(val uint64) { func (m *Metric) AddPushCmdCount(dbSyncerID int, val uint64) {
m.PushCmdCount.Set(val) m.PushCmdCount.Set(val)
pushCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
} }
func (m *Metric) GetPushCmdCount() interface{} { func (m *Metric) GetPushCmdCount() interface{} {
@ -181,8 +185,9 @@ func (m *Metric) GetPushCmdCountTotal() interface{} {
return atomic.LoadUint64(&m.PushCmdCount.Total) return atomic.LoadUint64(&m.PushCmdCount.Total)
} }
func (m *Metric) AddSuccessCmdCount(val uint64) { func (m *Metric) AddSuccessCmdCount(dbSyncerID int, val uint64) {
m.SuccessCmdCount.Set(val) m.SuccessCmdCount.Set(val)
successCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
} }
func (m *Metric) GetSuccessCmdCount() interface{} { func (m *Metric) GetSuccessCmdCount() interface{} {
@ -193,8 +198,9 @@ func (m *Metric) GetSuccessCmdCountTotal() interface{} {
return atomic.LoadUint64(&m.SuccessCmdCount.Total) return atomic.LoadUint64(&m.SuccessCmdCount.Total)
} }
func (m *Metric) AddFailCmdCount(val uint64) { func (m *Metric) AddFailCmdCount(dbSyncerID int, val uint64) {
m.FailCmdCount.Set(val) m.FailCmdCount.Set(val)
failCmdCountTotal.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
} }
func (m *Metric) GetFailCmdCount() interface{} { func (m *Metric) GetFailCmdCount() interface{} {
@ -218,9 +224,14 @@ func (m *Metric) GetAvgDelay() interface{} {
return m.AvgDelay.Get(true) return m.AvgDelay.Get(true)
} }
func (m *Metric) AddNetworkFlow(val uint64) { func (m *Metric) GetAvgDelayFloat64() float64 {
return float64(m.AvgDelay.Get(false).(int64))
}
func (m *Metric) AddNetworkFlow(dbSyncerID int, val uint64) {
// atomic.AddUint64(&m.NetworkFlow.Value, val) // atomic.AddUint64(&m.NetworkFlow.Value, val)
m.NetworkFlow.Set(val) m.NetworkFlow.Set(val)
networkFlowTotalInBytes.WithLabelValues(strconv.Itoa(dbSyncerID)).Add(float64(val))
} }
func (m *Metric) GetNetworkFlow() interface{} { func (m *Metric) GetNetworkFlow() interface{} {
@ -231,8 +242,9 @@ func (m *Metric) GetNetworkFlowTotal() interface{} {
return atomic.LoadUint64(&m.NetworkFlow.Total) return atomic.LoadUint64(&m.NetworkFlow.Total)
} }
func (m *Metric) SetFullSyncProgress(val uint64) { func (m *Metric) SetFullSyncProgress(dbSyncerID int, val uint64) {
m.FullSyncProgress = val m.FullSyncProgress = val
fullSyncProcessPercent.WithLabelValues(strconv.Itoa(dbSyncerID)).Set(float64(val))
} }
func (m *Metric) GetFullSyncProgress() interface{} { func (m *Metric) GetFullSyncProgress() interface{} {

@ -0,0 +1,95 @@
package metric
import (
"strconv"
"redis-shake/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const (
metricNamespace = "redisshake"
dbSyncerLabelName = "db_syncer"
)
var (
pullCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "pull_cmd_count_total",
Help: "RedisShake pull redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
bypassCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "bypass_cmd_count_total",
Help: "RedisShake bypass redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
pushCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "push_cmd_count_total",
Help: "RedisShake push redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
successCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "success_cmd_count_total",
Help: "RedisShake push redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
failCmdCountTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "fail_cmd_count_total",
Help: "RedisShake push redis cmd count in total",
},
[]string{dbSyncerLabelName},
)
networkFlowTotalInBytes = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "network_flow_total_in_bytes",
Help: "RedisShake total network flow in total (byte)",
},
[]string{dbSyncerLabelName},
)
fullSyncProcessPercent = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "full_sync_process_percent",
Help: "RedisShake full sync process (%)",
},
[]string{dbSyncerLabelName},
)
averageDelayInMs = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "average_delay_in_ms",
Help: "RedisShake average delay (ms)",
},
[]string{dbSyncerLabelName},
)
)
// CalcPrometheusMetrics calculates some prometheus metrics e.g. average delay.
func CalcPrometheusMetrics() {
total := utils.GetTotalLink()
for i := 0; i < total; i++ {
val, ok := MetricMap.Load(i)
if !ok {
continue
}
singleMetric := val.(*Metric)
averageDelayInMs.WithLabelValues(strconv.Itoa(i)).Set(singleMetric.GetAvgDelayFloat64())
}
}

@ -1,14 +1,18 @@
package restful package restful
import ( import (
"github.com/gugemichael/nimo4go" "net/http"
"redis-shake/metric"
"redis-shake/common" "redis-shake/common"
"redis-shake/metric"
"github.com/gugemichael/nimo4go"
"github.com/prometheus/client_golang/prometheus/promhttp"
) )
// register all rest api // register all rest api
func RestAPI() { func RestAPI() {
registerMetric() // register metric registerMetric() // register metric
registerPrometheusMetric() // register prometheus metrics
// add below if has more // add below if has more
} }
@ -17,3 +21,10 @@ func registerMetric() {
return metric.NewMetricRest() return metric.NewMetricRest()
}) })
} }
func registerPrometheusMetric() {
http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
metric.CalcPrometheusMetrics()
promhttp.Handler().ServeHTTP(w, req)
})
}

@ -464,7 +464,7 @@ func (ds *dbSyncer) syncRDBFile(reader *bufio.Reader, target []string, auth_type
fmt.Fprintf(&b, " ignore=%-12d", stat.ignore) fmt.Fprintf(&b, " ignore=%-12d", stat.ignore)
} }
log.Info(b.String()) log.Info(b.String())
metric.GetMetric(ds.id).SetFullSyncProgress(uint64(100 * stat.rbytes / nsize)) metric.GetMetric(ds.id).SetFullSyncProgress(ds.id, uint64(100*stat.rbytes/nsize))
} }
log.Infof("dbSyncer[%v] sync rdb done", ds.id) log.Infof("dbSyncer[%v] sync rdb done", ds.id)
} }
@ -535,9 +535,9 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
} }
if err == nil { if err == nil {
metric.GetMetric(ds.id).AddSuccessCmdCount(1) metric.GetMetric(ds.id).AddSuccessCmdCount(ds.id, 1)
} else { } else {
metric.GetMetric(ds.id).AddFailCmdCount(1) metric.GetMetric(ds.id).AddFailCmdCount(ds.id, 1)
if utils.CheckHandleNetError(err) { if utils.CheckHandleNetError(err) {
log.Panicf("dbSyncer[%v] Event:NetErrorWhileReceive\tId:%s\tError:%s", log.Panicf("dbSyncer[%v] Event:NetErrorWhileReceive\tId:%s\tError:%s",
ds.id, conf.Options.Id, err.Error()) ds.id, conf.Options.Id, err.Error())
@ -589,7 +589,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
if scmd, argv, err = redis.ParseArgs(resp); err != nil { if scmd, argv, err = redis.ParseArgs(resp); err != nil {
log.PanicErrorf(err, "dbSyncer[%v] parse command arguments failed", ds.id) log.PanicErrorf(err, "dbSyncer[%v] parse command arguments failed", ds.id)
} else { } else {
metric.GetMetric(ds.id).AddPullCmdCount(1) metric.GetMetric(ds.id).AddPullCmdCount(ds.id, 1)
// print debug log of send command // print debug log of send command
if conf.Options.LogLevel == utils.LogLevelAll { if conf.Options.LogLevel == utils.LogLevelAll {
@ -619,7 +619,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
if bypass || ignorecmd { if bypass || ignorecmd {
ds.nbypass.Incr() ds.nbypass.Incr()
// ds.SyncStat.BypassCmdCount.Incr() // ds.SyncStat.BypassCmdCount.Incr()
metric.GetMetric(ds.id).AddBypassCmdCount(1) metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd) log.Debugf("dbSyncer[%v] ignore command[%v]", ds.id, scmd)
continue continue
} }
@ -654,7 +654,7 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
ds.sendBuf <- cmdDetail{Cmd: "SELECT", Args: [][]byte{[]byte(strconv.FormatInt(int64(lastdb), 10))}} ds.sendBuf <- cmdDetail{Cmd: "SELECT", Args: [][]byte{[]byte(strconv.FormatInt(int64(lastdb), 10))}}
} else { } else {
ds.nbypass.Incr() ds.nbypass.Incr()
metric.GetMetric(ds.id).AddBypassCmdCount(1) metric.GetMetric(ds.id).AddBypassCmdCount(ds.id, 1)
} }
continue continue
} }
@ -682,8 +682,8 @@ func (ds *dbSyncer) syncCommand(reader *bufio.Reader, target []string, auth_type
ds.forward.Incr() ds.forward.Incr()
ds.wbytes.Add(int64(length)) ds.wbytes.Add(int64(length))
metric.GetMetric(ds.id).AddPushCmdCount(1) metric.GetMetric(ds.id).AddPushCmdCount(ds.id, 1)
metric.GetMetric(ds.id).AddNetworkFlow(uint64(length)) metric.GetMetric(ds.id).AddNetworkFlow(ds.id, uint64(length))
sendId.Incr() sendId.Incr()
if conf.Options.Metric { if conf.Options.Metric {

@ -8,6 +8,12 @@
"revision": "76bd05e8e22f9f8f5e1dd6d3a85b7951da7cce57", "revision": "76bd05e8e22f9f8f5e1dd6d3a85b7951da7cce57",
"revisionTime": "2017-12-04T08:54:13Z" "revisionTime": "2017-12-04T08:54:13Z"
}, },
{
"checksumSHA1": "0rido7hYHQtfq3UJzVT5LClLAWc=",
"path": "github.com/beorn7/perks/quantile",
"revision": "4b2b341e8d7715fae06375aa633dbb6e91b3fb46",
"revisionTime": "2019-04-14T22:11:40Z"
},
{ {
"checksumSHA1": "1/Kf0ihi6RtfNt0JjAtZLKvfqJY=", "checksumSHA1": "1/Kf0ihi6RtfNt0JjAtZLKvfqJY=",
"path": "github.com/cupcake/rdb", "path": "github.com/cupcake/rdb",
@ -50,12 +56,24 @@
"revision": "569eae59ada904ea8db7a225c2b47165af08735a", "revision": "569eae59ada904ea8db7a225c2b47165af08735a",
"revisionTime": "2018-04-04T16:07:26Z" "revisionTime": "2018-04-04T16:07:26Z"
}, },
{
"checksumSHA1": "CGj8VcI/CpzxaNqlqpEVM7qElD4=",
"path": "github.com/golang/protobuf/proto",
"revision": "b285ee9cfc6c881bb20c0d8dc73370ea9b9ec90f",
"revisionTime": "2019-05-17T06:12:10Z"
},
{ {
"checksumSHA1": "63olncsMU/K9MMKxt4zuKSyoNg0=", "checksumSHA1": "63olncsMU/K9MMKxt4zuKSyoNg0=",
"path": "github.com/gugemichael/nimo4go", "path": "github.com/gugemichael/nimo4go",
"revision": "cbcfac21339d78efaed9ed09dcba7296d9976118", "revision": "cbcfac21339d78efaed9ed09dcba7296d9976118",
"revisionTime": "2018-09-04T03:08:18Z" "revisionTime": "2018-09-04T03:08:18Z"
}, },
{
"checksumSHA1": "bKMZjd2wPw13VwoE7mBeSv5djFA=",
"path": "github.com/matttproud/golang_protobuf_extensions/pbutil",
"revision": "c182affec369e30f25d3eb8cd8a478dee585ae7d",
"revisionTime": "2018-12-31T17:19:20Z"
},
{ {
"checksumSHA1": "W7TFJ3aRSUenZvYVnzrOZPoIOjs=", "checksumSHA1": "W7TFJ3aRSUenZvYVnzrOZPoIOjs=",
"path": "github.com/nightlyone/lockfile", "path": "github.com/nightlyone/lockfile",
@ -68,6 +86,66 @@
"revision": "5d4384ee4fb2527b0a1256a821ebfc92f91efefc", "revision": "5d4384ee4fb2527b0a1256a821ebfc92f91efefc",
"revisionTime": "2018-12-26T10:54:42Z" "revisionTime": "2018-12-26T10:54:42Z"
}, },
{
"checksumSHA1": "x5CuZuHwidIOrzy/g2NQMDJMIxQ=",
"path": "github.com/prometheus/client_golang/prometheus",
"revision": "3d8379da8fc2309dd563f593f15a739054c098bc",
"revisionTime": "2019-06-17T18:27:57Z"
},
{
"checksumSHA1": "UBqhkyjCz47+S19MVTigxJ2VjVQ=",
"path": "github.com/prometheus/client_golang/prometheus/internal",
"revision": "3d8379da8fc2309dd563f593f15a739054c098bc",
"revisionTime": "2019-06-17T18:27:57Z"
},
{
"checksumSHA1": "BFMAsj5z3cYaNKx9fwHrazQRFvI=",
"path": "github.com/prometheus/client_golang/prometheus/promauto",
"revision": "3d8379da8fc2309dd563f593f15a739054c098bc",
"revisionTime": "2019-06-17T18:27:57Z"
},
{
"checksumSHA1": "V51yx4gq61QCD9clxnps792Eq2Y=",
"path": "github.com/prometheus/client_golang/prometheus/promhttp",
"revision": "3d8379da8fc2309dd563f593f15a739054c098bc",
"revisionTime": "2019-06-17T18:27:57Z"
},
{
"checksumSHA1": "V8xkqgmP66sq2ZW4QO5wi9a4oZE=",
"path": "github.com/prometheus/client_model/go",
"revision": "fd36f4220a901265f90734c3183c5f0c91daa0b8",
"revisionTime": "2019-01-29T23:31:27Z"
},
{
"checksumSHA1": "ljxJzXiQ7dNsmuRIUhqqP+qjRWc=",
"path": "github.com/prometheus/common/expfmt",
"revision": "31bed53e4047fd6c510e43a941f90cb31be0972a",
"revisionTime": "2019-06-17T21:11:42Z"
},
{
"checksumSHA1": "1Mhfofk+wGZ94M0+Bd98K8imPD4=",
"path": "github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg",
"revision": "31bed53e4047fd6c510e43a941f90cb31be0972a",
"revisionTime": "2019-06-17T21:11:42Z"
},
{
"checksumSHA1": "ccmMs+h9Jo8kE7izqsUkWShD4d0=",
"path": "github.com/prometheus/common/model",
"revision": "31bed53e4047fd6c510e43a941f90cb31be0972a",
"revisionTime": "2019-06-17T21:11:42Z"
},
{
"checksumSHA1": "L4ayL3hh390Bk5vmXgtNGo9d/rc=",
"path": "github.com/prometheus/procfs",
"revision": "90b65b6334013b0a1cfce27ce5c79feb7da2f77a",
"revisionTime": "2019-06-14T00:01:41Z"
},
{
"checksumSHA1": "Kmjs49lbjGmlgUPx3pks0tVDed0=",
"path": "github.com/prometheus/procfs/internal/fs",
"revision": "90b65b6334013b0a1cfce27ce5c79feb7da2f77a",
"revisionTime": "2019-06-14T00:01:41Z"
},
{ {
"checksumSHA1": "Tutue3nEgM/87jitUcYv6ODwyNE=", "checksumSHA1": "Tutue3nEgM/87jitUcYv6ODwyNE=",
"path": "github.com/satori/go.uuid", "path": "github.com/satori/go.uuid",
@ -92,6 +170,5 @@
"revision": "a96e63847dc3c67d17befa69c303767e2f84e54f", "revision": "a96e63847dc3c67d17befa69c303767e2f84e54f",
"revisionTime": "2017-05-31T16:03:50Z" "revisionTime": "2017-05-31T16:03:50Z"
} }
], ]
"rootPath": "/Users/vinllen-ali/code/redis-shake-inner/redis-shake/src"
} }

Loading…
Cancel
Save