add http metrics

v4
suxb201 2 years ago committed by suxb201
parent b1e861a7a9
commit 83b84739d3
  1. 15
      cmd/redis-shake/main.go
  2. 4
      internal/config/config.go
  3. 2
      internal/rdb/rdb.go
  4. 6
      internal/reader/psync.go
  5. 138
      internal/statistics/statistics.go
  6. 3
      internal/writer/redis.go
  7. 3
      restore.toml
  8. 3
      sync.toml
  9. 3
      test/assets/empty.toml

@ -51,6 +51,19 @@ func main() {
}()
}
// start statistics
if config.Config.Advanced.MetricsPort != 0 {
go func() {
log.Infof("metrics url: http://localhost:%d", config.Config.Advanced.MetricsPort)
mux := http.NewServeMux()
mux.HandleFunc("/", statistics.Handler)
err := http.ListenAndServe(fmt.Sprintf("localhost:%d", config.Config.Advanced.MetricsPort), mux)
if err != nil {
log.PanicError(err)
}
}()
}
// create writer
var theWriter writer.Writer
target := &config.Config.Target
@ -79,6 +92,7 @@ func main() {
statistics.Init()
id := uint64(0)
for e := range ch {
statistics.UpdateInQueueEntriesCount(uint64(len(ch)))
// calc arguments
e.Id = id
id++
@ -87,6 +101,7 @@ func main() {
// filter
code := filter.Filter(e)
statistics.UpdateEntryId(e.Id)
if code == filter.Allow {
theWriter.Write(e)
statistics.AddAllowEntriesCount()

@ -34,7 +34,8 @@ type tomlAdvanced struct {
Ncpu int `toml:"ncpu"`
PprofPort int `toml:"pprof_port"`
PprofPort int `toml:"pprof_port"`
MetricsPort int `toml:"metrics_port"`
// log
LogFile string `toml:"log_file"`
@ -80,6 +81,7 @@ func init() {
Config.Advanced.Dir = "data"
Config.Advanced.Ncpu = 4
Config.Advanced.PprofPort = 0
Config.Advanced.MetricsPort = 0
Config.Advanced.LogFile = "redis-shake.log"
Config.Advanced.LogLevel = "info"
Config.Advanced.LogInterval = 5

@ -93,7 +93,7 @@ func (ld *Loader) parseRDBEntry(rd *bufio.Reader) {
if err != nil {
log.PanicError(err)
}
statistics.UpdateRDBSentSize(offset)
statistics.UpdateRDBSentSize(uint64(offset))
}
defer UpdateRDBSentSize()
// read one entry

@ -150,7 +150,7 @@ func (r *psyncReader) saveRDB() {
log.PanicError(err)
}
log.Infof("received rdb length. length=[%d]", length)
statistics.SetRDBFileSize(length)
statistics.SetRDBFileSize(uint64(length))
// create rdb file
rdbFilePath := "dump.rdb"
@ -174,7 +174,7 @@ func (r *psyncReader) saveRDB() {
log.PanicError(err)
}
remainder -= int64(n)
statistics.UpdateRDBReceivedSize(length - remainder)
statistics.UpdateRDBReceivedSize(uint64(length - remainder))
_, err = rdbFileHandle.Write(buf[:n])
if err != nil {
log.PanicError(err)
@ -199,7 +199,7 @@ func (r *psyncReader) saveAOF(rd io.Reader) {
log.PanicError(err)
}
r.receivedOffset += int64(n)
statistics.UpdateAOFReceivedOffset(r.receivedOffset)
statistics.UpdateAOFReceivedOffset(uint64(r.receivedOffset))
aofWriter.Write(buf[:n])
}
}

@ -1,26 +1,42 @@
package statistics
import (
"encoding/json"
"github.com/alibaba/RedisShake/internal/config"
"github.com/alibaba/RedisShake/internal/log"
"net/http"
"time"
)
var (
// ID
entryId uint64
type metrics struct {
// entries
EntryId uint64 `json:"entry_id"`
AllowEntriesCount uint64 `json:"allow_entries_count"`
DisallowEntriesCount uint64 `json:"disallow_entries_count"`
// rdb
rdbFileSize int64
rdbReceivedSize int64
rdbSendSize int64
RdbFileSize uint64 `json:"rdb_file_size"`
RdbReceivedSize uint64 `json:"rdb_received_size"`
RdbSendSize uint64 `json:"rdb_send_size"`
// aof
aofReceivedOffset int64
aofAppliedOffset int64
// ops
allowEntriesCount int64
disallowEntriesCount int64
unansweredBytesCount uint64
)
AofReceivedOffset uint64 `json:"aof_received_offset"`
AofAppliedOffset uint64 `json:"aof_applied_offset"`
// for performance debug
InQueueEntriesCount uint64 `json:"in_queue_entries_count"`
UnansweredBytesCount uint64 `json:"unanswered_bytes_count"`
}
var Metrics = &metrics{}
func Handler(w http.ResponseWriter, _ *http.Request) {
w.Header().Add("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(Metrics)
if err != nil {
log.PanicError(err)
}
}
func Init() {
go func() {
@ -29,64 +45,84 @@ func Init() {
log.Infof("statistics disabled. seconds=[%d]", seconds)
}
lastAllowEntriesCount := Metrics.AllowEntriesCount
lastDisallowEntriesCount := Metrics.DisallowEntriesCount
for range time.Tick(time.Duration(seconds) * time.Second) {
if rdbFileSize == 0 {
if Metrics.RdbFileSize == 0 {
continue
}
if rdbFileSize > rdbReceivedSize {
if Metrics.RdbSendSize > Metrics.RdbReceivedSize {
log.Infof("receiving rdb. percent=[%.2f]%%, rdbFileSize=[%.3f]G, rdbReceivedSize=[%.3f]G",
float64(rdbReceivedSize)/float64(rdbFileSize)*100,
float64(rdbFileSize)/1024/1024/1024,
float64(rdbReceivedSize)/1024/1024/1024)
} else if rdbFileSize > rdbSendSize {
log.Infof("syncing rdb. percent=[%.2f]%%, allowOps=[%.2f], disallowOps=[%.2f], entryId=[%d], unansweredBytesCount=[%d]bytes, rdbFileSize=[%.3f]G, rdbSendSize=[%.3f]G",
float64(rdbSendSize)*100/float64(rdbFileSize),
float32(allowEntriesCount)/float32(seconds),
float32(disallowEntriesCount)/float32(seconds),
entryId,
unansweredBytesCount,
float64(rdbFileSize)/1024/1024/1024,
float64(rdbSendSize)/1024/1024/1024)
float64(Metrics.RdbReceivedSize)/float64(Metrics.RdbFileSize)*100,
float64(Metrics.RdbFileSize)/1024/1024/1024,
float64(Metrics.RdbReceivedSize)/1024/1024/1024)
} else if Metrics.RdbFileSize > Metrics.RdbSendSize {
log.Infof("syncing rdb. percent=[%.2f]%%, allowOps=[%.2f], disallowOps=[%.2f], entryId=[%d], InQueueEntriesCount=[%d], unansweredBytesCount=[%d]bytes, rdbFileSize=[%.3f]G, rdbSendSize=[%.3f]G",
float64(Metrics.RdbSendSize)*100/float64(Metrics.RdbFileSize),
float32(Metrics.AllowEntriesCount-lastAllowEntriesCount)/float32(seconds),
float32(Metrics.DisallowEntriesCount-lastDisallowEntriesCount)/float32(seconds),
Metrics.EntryId,
Metrics.InQueueEntriesCount,
Metrics.UnansweredBytesCount,
float64(Metrics.RdbFileSize)/1024/1024/1024,
float64(Metrics.RdbSendSize)/1024/1024/1024)
} else {
log.Infof("syncing aof. allowOps=[%.2f], disallowOps=[%.2f], entryId=[%d], unansweredBytesCount=[%d]bytes, diff=[%d], aofReceivedOffset=[%d], aofAppliedOffset=[%d]",
float32(allowEntriesCount)/float32(seconds),
float32(disallowEntriesCount)/float32(seconds),
entryId,
unansweredBytesCount,
aofReceivedOffset-aofAppliedOffset,
aofReceivedOffset,
aofAppliedOffset)
log.Infof("syncing aof. allowOps=[%.2f], disallowOps=[%.2f], entryId=[%d], InQueueEntriesCount=[%d], unansweredBytesCount=[%d]bytes, diff=[%d], aofReceivedOffset=[%d], aofAppliedOffset=[%d]",
float32(Metrics.AllowEntriesCount)/float32(seconds),
float32(Metrics.DisallowEntriesCount)/float32(seconds),
Metrics.EntryId,
Metrics.InQueueEntriesCount,
Metrics.UnansweredBytesCount,
Metrics.AofReceivedOffset-Metrics.AofAppliedOffset,
Metrics.AofReceivedOffset,
Metrics.AofAppliedOffset)
}
allowEntriesCount = 0
disallowEntriesCount = 0
lastAllowEntriesCount = Metrics.AllowEntriesCount
lastDisallowEntriesCount = Metrics.DisallowEntriesCount
}
}()
}
// entry id
func UpdateEntryId(id uint64) {
entryId = id
Metrics.EntryId = id
}
func AddAllowEntriesCount() {
allowEntriesCount++
Metrics.AllowEntriesCount++
}
func AddDisallowEntriesCount() {
disallowEntriesCount++
Metrics.DisallowEntriesCount++
}
func SetRDBFileSize(size int64) {
rdbFileSize = size
// rdb
func SetRDBFileSize(size uint64) {
Metrics.RdbFileSize = size
}
func UpdateRDBReceivedSize(size int64) {
rdbReceivedSize = size
func UpdateRDBReceivedSize(size uint64) {
Metrics.RdbReceivedSize = size
}
func UpdateRDBSentSize(offset int64) {
rdbSendSize = offset
func UpdateRDBSentSize(offset uint64) {
Metrics.RdbSendSize = offset
}
func UpdateAOFReceivedOffset(offset int64) {
aofReceivedOffset = offset
// aof
func UpdateAOFReceivedOffset(offset uint64) {
Metrics.AofReceivedOffset = offset
}
func UpdateAOFAppliedOffset(offset uint64) {
Metrics.AofAppliedOffset = offset
}
func UpdateAOFAppliedOffset(offset int64) {
aofAppliedOffset = offset
// for debug
func UpdateInQueueEntriesCount(count uint64) {
Metrics.InQueueEntriesCount = count
}
func UpdateUnansweredBytesCount(count uint64) {
unansweredBytesCount = count
Metrics.UnansweredBytesCount = count
}

@ -75,8 +75,7 @@ func (w *redisWriter) flushInterval() {
}
}
atomic.AddUint64(&w.UpdateUnansweredBytesCount, ^(e.EncodedSize - 1))
statistics.UpdateEntryId(e.Id)
statistics.UpdateAOFAppliedOffset(e.Offset)
statistics.UpdateAOFAppliedOffset(uint64(e.Offset))
statistics.UpdateUnansweredBytesCount(atomic.LoadUint64(&w.UpdateUnansweredBytesCount))
}
}

@ -23,6 +23,9 @@ ncpu = 3
# pprof port, 0 means disable
pprof_port = 0
# metric port, 0 means disable
metrics_port = 0
# log
log_file = "redis-shake.log"
log_level = "info" # debug, info or warn

@ -25,6 +25,9 @@ ncpu = 4
# pprof port, 0 means disable
pprof_port = 0
# metric port, 0 means disable
metrics_port = 0
# log
log_file = "redis-shake.log"
log_level = "info" # debug, info or warn

@ -25,6 +25,9 @@ ncpu = 3
# pprof port, 0 means disable
pprof_port = 0
# metric port, 0 means disable
metrics_port = 0
# log
log_file = "redis-shake.log"
log_level = "info" # debug, info or warn

Loading…
Cancel
Save