From 83b84739d315c91326c42f8eac9ebc3f829896a2 Mon Sep 17 00:00:00 2001 From: suxb201 Date: Thu, 22 Sep 2022 16:27:18 +0800 Subject: [PATCH] add http metrics --- cmd/redis-shake/main.go | 15 ++++ internal/config/config.go | 4 +- internal/rdb/rdb.go | 2 +- internal/reader/psync.go | 6 +- internal/statistics/statistics.go | 138 +++++++++++++++++++----------- internal/writer/redis.go | 3 +- restore.toml | 3 + sync.toml | 3 + test/assets/empty.toml | 3 + 9 files changed, 119 insertions(+), 58 deletions(-) diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 5f7b7c7..7d0bcdb 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -51,6 +51,19 @@ func main() { }() } + // start statistics + if config.Config.Advanced.MetricsPort != 0 { + go func() { + log.Infof("metrics url: http://localhost:%d", config.Config.Advanced.MetricsPort) + mux := http.NewServeMux() + mux.HandleFunc("/", statistics.Handler) + err := http.ListenAndServe(fmt.Sprintf("localhost:%d", config.Config.Advanced.MetricsPort), mux) + if err != nil { + log.PanicError(err) + } + }() + } + // create writer var theWriter writer.Writer target := &config.Config.Target @@ -79,6 +92,7 @@ func main() { statistics.Init() id := uint64(0) for e := range ch { + statistics.UpdateInQueueEntriesCount(uint64(len(ch))) // calc arguments e.Id = id id++ @@ -87,6 +101,7 @@ func main() { // filter code := filter.Filter(e) + statistics.UpdateEntryId(e.Id) if code == filter.Allow { theWriter.Write(e) statistics.AddAllowEntriesCount() diff --git a/internal/config/config.go b/internal/config/config.go index 6ce919d..653d963 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -34,7 +34,8 @@ type tomlAdvanced struct { Ncpu int `toml:"ncpu"` - PprofPort int `toml:"pprof_port"` + PprofPort int `toml:"pprof_port"` + MetricsPort int `toml:"metrics_port"` // log LogFile string `toml:"log_file"` @@ -80,6 +81,7 @@ func init() { Config.Advanced.Dir = "data" Config.Advanced.Ncpu = 4 Config.Advanced.PprofPort = 0 + Config.Advanced.MetricsPort = 0 Config.Advanced.LogFile = "redis-shake.log" Config.Advanced.LogLevel = "info" Config.Advanced.LogInterval = 5 diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 0ac7dd0..fabad5a 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -93,7 +93,7 @@ func (ld *Loader) parseRDBEntry(rd *bufio.Reader) { if err != nil { log.PanicError(err) } - statistics.UpdateRDBSentSize(offset) + statistics.UpdateRDBSentSize(uint64(offset)) } defer UpdateRDBSentSize() // read one entry diff --git a/internal/reader/psync.go b/internal/reader/psync.go index 27b8903..1be94c6 100644 --- a/internal/reader/psync.go +++ b/internal/reader/psync.go @@ -150,7 +150,7 @@ func (r *psyncReader) saveRDB() { log.PanicError(err) } log.Infof("received rdb length. length=[%d]", length) - statistics.SetRDBFileSize(length) + statistics.SetRDBFileSize(uint64(length)) // create rdb file rdbFilePath := "dump.rdb" @@ -174,7 +174,7 @@ func (r *psyncReader) saveRDB() { log.PanicError(err) } remainder -= int64(n) - statistics.UpdateRDBReceivedSize(length - remainder) + statistics.UpdateRDBReceivedSize(uint64(length - remainder)) _, err = rdbFileHandle.Write(buf[:n]) if err != nil { log.PanicError(err) @@ -199,7 +199,7 @@ func (r *psyncReader) saveAOF(rd io.Reader) { log.PanicError(err) } r.receivedOffset += int64(n) - statistics.UpdateAOFReceivedOffset(r.receivedOffset) + statistics.UpdateAOFReceivedOffset(uint64(r.receivedOffset)) aofWriter.Write(buf[:n]) } } diff --git a/internal/statistics/statistics.go b/internal/statistics/statistics.go index 1d22a8e..6735d7f 100644 --- a/internal/statistics/statistics.go +++ b/internal/statistics/statistics.go @@ -1,26 +1,42 @@ package statistics import ( + "encoding/json" "github.com/alibaba/RedisShake/internal/config" "github.com/alibaba/RedisShake/internal/log" + "net/http" "time" ) -var ( - // ID - entryId uint64 +type metrics struct { + // entries + EntryId uint64 `json:"entry_id"` + AllowEntriesCount uint64 `json:"allow_entries_count"` + DisallowEntriesCount uint64 `json:"disallow_entries_count"` + // rdb - rdbFileSize int64 - rdbReceivedSize int64 - rdbSendSize int64 + RdbFileSize uint64 `json:"rdb_file_size"` + RdbReceivedSize uint64 `json:"rdb_received_size"` + RdbSendSize uint64 `json:"rdb_send_size"` + // aof - aofReceivedOffset int64 - aofAppliedOffset int64 - // ops - allowEntriesCount int64 - disallowEntriesCount int64 - unansweredBytesCount uint64 -) + AofReceivedOffset uint64 `json:"aof_received_offset"` + AofAppliedOffset uint64 `json:"aof_applied_offset"` + + // for performance debug + InQueueEntriesCount uint64 `json:"in_queue_entries_count"` + UnansweredBytesCount uint64 `json:"unanswered_bytes_count"` +} + +var Metrics = &metrics{} + +func Handler(w http.ResponseWriter, _ *http.Request) { + w.Header().Add("Content-Type", "application/json") + err := json.NewEncoder(w).Encode(Metrics) + if err != nil { + log.PanicError(err) + } +} func Init() { go func() { @@ -29,64 +45,84 @@ func Init() { log.Infof("statistics disabled. seconds=[%d]", seconds) } + lastAllowEntriesCount := Metrics.AllowEntriesCount + lastDisallowEntriesCount := Metrics.DisallowEntriesCount + for range time.Tick(time.Duration(seconds) * time.Second) { - if rdbFileSize == 0 { + if Metrics.RdbFileSize == 0 { continue } - if rdbFileSize > rdbReceivedSize { + if Metrics.RdbSendSize > Metrics.RdbReceivedSize { log.Infof("receiving rdb. percent=[%.2f]%%, rdbFileSize=[%.3f]G, rdbReceivedSize=[%.3f]G", - float64(rdbReceivedSize)/float64(rdbFileSize)*100, - float64(rdbFileSize)/1024/1024/1024, - float64(rdbReceivedSize)/1024/1024/1024) - } else if rdbFileSize > rdbSendSize { - log.Infof("syncing rdb. percent=[%.2f]%%, allowOps=[%.2f], disallowOps=[%.2f], entryId=[%d], unansweredBytesCount=[%d]bytes, rdbFileSize=[%.3f]G, rdbSendSize=[%.3f]G", - float64(rdbSendSize)*100/float64(rdbFileSize), - float32(allowEntriesCount)/float32(seconds), - float32(disallowEntriesCount)/float32(seconds), - entryId, - unansweredBytesCount, - float64(rdbFileSize)/1024/1024/1024, - float64(rdbSendSize)/1024/1024/1024) + float64(Metrics.RdbReceivedSize)/float64(Metrics.RdbFileSize)*100, + float64(Metrics.RdbFileSize)/1024/1024/1024, + float64(Metrics.RdbReceivedSize)/1024/1024/1024) + } else if Metrics.RdbFileSize > Metrics.RdbSendSize { + log.Infof("syncing rdb. percent=[%.2f]%%, allowOps=[%.2f], disallowOps=[%.2f], entryId=[%d], InQueueEntriesCount=[%d], unansweredBytesCount=[%d]bytes, rdbFileSize=[%.3f]G, rdbSendSize=[%.3f]G", + float64(Metrics.RdbSendSize)*100/float64(Metrics.RdbFileSize), + float32(Metrics.AllowEntriesCount-lastAllowEntriesCount)/float32(seconds), + float32(Metrics.DisallowEntriesCount-lastDisallowEntriesCount)/float32(seconds), + Metrics.EntryId, + Metrics.InQueueEntriesCount, + Metrics.UnansweredBytesCount, + float64(Metrics.RdbFileSize)/1024/1024/1024, + float64(Metrics.RdbSendSize)/1024/1024/1024) } else { - log.Infof("syncing aof. allowOps=[%.2f], disallowOps=[%.2f], entryId=[%d], unansweredBytesCount=[%d]bytes, diff=[%d], aofReceivedOffset=[%d], aofAppliedOffset=[%d]", - float32(allowEntriesCount)/float32(seconds), - float32(disallowEntriesCount)/float32(seconds), - entryId, - unansweredBytesCount, - aofReceivedOffset-aofAppliedOffset, - aofReceivedOffset, - aofAppliedOffset) + log.Infof("syncing aof. allowOps=[%.2f], disallowOps=[%.2f], entryId=[%d], InQueueEntriesCount=[%d], unansweredBytesCount=[%d]bytes, diff=[%d], aofReceivedOffset=[%d], aofAppliedOffset=[%d]", + float32(Metrics.AllowEntriesCount)/float32(seconds), + float32(Metrics.DisallowEntriesCount)/float32(seconds), + Metrics.EntryId, + Metrics.InQueueEntriesCount, + Metrics.UnansweredBytesCount, + Metrics.AofReceivedOffset-Metrics.AofAppliedOffset, + Metrics.AofReceivedOffset, + Metrics.AofAppliedOffset) } - allowEntriesCount = 0 - disallowEntriesCount = 0 + lastAllowEntriesCount = Metrics.AllowEntriesCount + lastDisallowEntriesCount = Metrics.DisallowEntriesCount } }() } + +// entry id + func UpdateEntryId(id uint64) { - entryId = id + Metrics.EntryId = id } func AddAllowEntriesCount() { - allowEntriesCount++ + Metrics.AllowEntriesCount++ } func AddDisallowEntriesCount() { - disallowEntriesCount++ + Metrics.DisallowEntriesCount++ } -func SetRDBFileSize(size int64) { - rdbFileSize = size + +// rdb + +func SetRDBFileSize(size uint64) { + Metrics.RdbFileSize = size } -func UpdateRDBReceivedSize(size int64) { - rdbReceivedSize = size +func UpdateRDBReceivedSize(size uint64) { + Metrics.RdbReceivedSize = size } -func UpdateRDBSentSize(offset int64) { - rdbSendSize = offset +func UpdateRDBSentSize(offset uint64) { + Metrics.RdbSendSize = offset } -func UpdateAOFReceivedOffset(offset int64) { - aofReceivedOffset = offset + +// aof + +func UpdateAOFReceivedOffset(offset uint64) { + Metrics.AofReceivedOffset = offset +} +func UpdateAOFAppliedOffset(offset uint64) { + Metrics.AofAppliedOffset = offset } -func UpdateAOFAppliedOffset(offset int64) { - aofAppliedOffset = offset + +// for debug + +func UpdateInQueueEntriesCount(count uint64) { + Metrics.InQueueEntriesCount = count } func UpdateUnansweredBytesCount(count uint64) { - unansweredBytesCount = count + Metrics.UnansweredBytesCount = count } diff --git a/internal/writer/redis.go b/internal/writer/redis.go index 50aca66..6321ebb 100644 --- a/internal/writer/redis.go +++ b/internal/writer/redis.go @@ -75,8 +75,7 @@ func (w *redisWriter) flushInterval() { } } atomic.AddUint64(&w.UpdateUnansweredBytesCount, ^(e.EncodedSize - 1)) - statistics.UpdateEntryId(e.Id) - statistics.UpdateAOFAppliedOffset(e.Offset) + statistics.UpdateAOFAppliedOffset(uint64(e.Offset)) statistics.UpdateUnansweredBytesCount(atomic.LoadUint64(&w.UpdateUnansweredBytesCount)) } } diff --git a/restore.toml b/restore.toml index dbf6e63..3cb406d 100644 --- a/restore.toml +++ b/restore.toml @@ -23,6 +23,9 @@ ncpu = 3 # pprof port, 0 means disable pprof_port = 0 +# metric port, 0 means disable +metrics_port = 0 + # log log_file = "redis-shake.log" log_level = "info" # debug, info or warn diff --git a/sync.toml b/sync.toml index 79eb676..c3bbf8c 100644 --- a/sync.toml +++ b/sync.toml @@ -25,6 +25,9 @@ ncpu = 4 # pprof port, 0 means disable pprof_port = 0 +# metric port, 0 means disable +metrics_port = 0 + # log log_file = "redis-shake.log" log_level = "info" # debug, info or warn diff --git a/test/assets/empty.toml b/test/assets/empty.toml index 9144564..6ac5ea6 100644 --- a/test/assets/empty.toml +++ b/test/assets/empty.toml @@ -25,6 +25,9 @@ ncpu = 3 # pprof port, 0 means disable pprof_port = 0 +# metric port, 0 means disable +metrics_port = 0 + # log log_file = "redis-shake.log" log_level = "info" # debug, info or warn