redis-shake工具
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

122 lines
3.4 KiB

package main
import (
"RedisShake/internal/config"
"RedisShake/internal/function"
"RedisShake/internal/log"
"RedisShake/internal/reader"
"RedisShake/internal/status"
"RedisShake/internal/utils"
"RedisShake/internal/writer"
"github.com/mcuadros/go-defaults"
_ "net/http/pprof"
)
func main() {
v := config.LoadConfig()
log.Init(config.Opt.Advanced.LogLevel, config.Opt.Advanced.LogFile, config.Opt.Advanced.Dir)
utils.ChdirAndAcquireFileLock()
utils.SetNcpu()
utils.SetPprofPort()
function.Init()
// create reader
var theReader reader.Reader
if v.IsSet("sync_reader") {
opts := new(reader.SyncReaderOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("sync_reader", opts)
if err != nil {
log.Panicf("failed to read the SyncReader config entry. err: %v", err)
}
if opts.Cluster {
theReader = reader.NewSyncClusterReader(opts)
log.Infof("create SyncClusterReader: %v", opts.Address)
} else {
theReader = reader.NewSyncStandaloneReader(opts)
log.Infof("create SyncStandaloneReader: %v", opts.Address)
}
} else if v.IsSet("scan_reader") {
opts := new(reader.ScanReaderOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("scan_reader", opts)
if err != nil {
log.Panicf("failed to read the ScanReader config entry. err: %v", err)
}
if opts.Cluster {
theReader = reader.NewScanClusterReader(opts)
log.Infof("create ScanClusterReader: %v", opts.Address)
} else {
theReader = reader.NewScanStandaloneReader(opts)
log.Infof("create ScanStandaloneReader: %v", opts.Address)
}
} else if v.IsSet("rdb_reader") {
opts := new(reader.RdbReaderOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("rdb_reader", opts)
if err != nil {
log.Panicf("failed to read the RdbReader config entry. err: %v", err)
}
theReader = reader.NewRDBReader(opts)
log.Infof("create RdbReader: %v", opts.Filepath)
} else if v.IsSet("aof_reader") {
opts := new(reader.AOFReaderOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("aof_reader", opts)
if err != nil {
log.Panicf("failed to read the AOFReader config entry. err: %v", err)
}
theReader = reader.NewAOFReader(opts)
log.Infof("create AOFReader: %v", opts.Filepath)
} else {
log.Panicf("no reader config entry found")
}
// create writer
var theWriter writer.Writer
if v.IsSet("redis_writer") {
opts := new(writer.RedisWriterOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("redis_writer", opts)
if err != nil {
log.Panicf("failed to read the RedisStandaloneWriter config entry. err: %v", err)
}
if opts.Cluster {
theWriter = writer.NewRedisClusterWriter(opts)
log.Infof("create RedisClusterWriter: %v", opts.Address)
} else {
theWriter = writer.NewRedisStandaloneWriter(opts)
log.Infof("create RedisStandaloneWriter: %v", opts.Address)
}
} else {
log.Panicf("no writer config entry found")
}
// create status
status.Init(theReader, theWriter)
log.Infof("start syncing...")
ch := theReader.StartRead()
for e := range ch {
// calc arguments
e.Parse()
status.AddReadCount(e.CmdName)
// filter
log.Debugf("function before: %v", e)
entries := function.RunFunction(e)
log.Debugf("function after: %v", entries)
for _, entry := range entries {
entry.Parse()
theWriter.Write(entry)
status.AddWriteCount(entry.CmdName)
}
}
theWriter.Close() // Wait for all writing operations to complete
utils.ReleaseFileLock() // Release file lock
log.Infof("all done")
}