You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
113 lines
3.1 KiB
113 lines
3.1 KiB
package main
|
|
|
|
import (
|
|
"RedisShake/internal/config"
|
|
"RedisShake/internal/function"
|
|
"RedisShake/internal/log"
|
|
"RedisShake/internal/reader"
|
|
"RedisShake/internal/status"
|
|
"RedisShake/internal/utils"
|
|
"RedisShake/internal/writer"
|
|
"github.com/mcuadros/go-defaults"
|
|
_ "net/http/pprof"
|
|
)
|
|
|
|
func main() {
|
|
v := config.LoadConfig()
|
|
|
|
log.Init(config.Opt.Advanced.LogLevel, config.Opt.Advanced.LogFile, config.Opt.Advanced.Dir)
|
|
utils.ChdirAndAcquireFileLock()
|
|
utils.SetNcpu()
|
|
utils.SetPprofPort()
|
|
function.Init()
|
|
|
|
// create reader
|
|
var theReader reader.Reader
|
|
if v.IsSet("sync_reader") {
|
|
opts := new(reader.SyncReaderOptions)
|
|
defaults.SetDefaults(opts)
|
|
err := v.UnmarshalKey("sync_reader", opts)
|
|
if err != nil {
|
|
log.Panicf("failed to read the SyncReader config entry. err: %v", err)
|
|
}
|
|
if opts.Cluster {
|
|
theReader = reader.NewSyncClusterReader(opts)
|
|
log.Infof("create SyncClusterReader: %v", opts.Address)
|
|
} else {
|
|
theReader = reader.NewSyncStandaloneReader(opts)
|
|
log.Infof("create SyncStandaloneReader: %v", opts.Address)
|
|
}
|
|
} else if v.IsSet("scan_reader") {
|
|
opts := new(reader.ScanReaderOptions)
|
|
defaults.SetDefaults(opts)
|
|
err := v.UnmarshalKey("scan_reader", opts)
|
|
if err != nil {
|
|
log.Panicf("failed to read the ScanReader config entry. err: %v", err)
|
|
}
|
|
if opts.Cluster {
|
|
theReader = reader.NewScanClusterReader(opts)
|
|
log.Infof("create ScanClusterReader: %v", opts.Address)
|
|
} else {
|
|
theReader = reader.NewScanStandaloneReader(opts)
|
|
log.Infof("create ScanStandaloneReader: %v", opts.Address)
|
|
}
|
|
} else if v.IsSet("rdb_reader") {
|
|
opts := new(reader.RdbReaderOptions)
|
|
defaults.SetDefaults(opts)
|
|
err := v.UnmarshalKey("rdb_reader", opts)
|
|
if err != nil {
|
|
log.Panicf("failed to read the RdbReader config entry. err: %v", err)
|
|
}
|
|
theReader = reader.NewRDBReader(opts)
|
|
log.Infof("create RdbReader: %v", opts.Filepath)
|
|
} else {
|
|
log.Panicf("no reader config entry found")
|
|
}
|
|
|
|
// create writer
|
|
var theWriter writer.Writer
|
|
if v.IsSet("redis_writer") {
|
|
opts := new(writer.RedisWriterOptions)
|
|
defaults.SetDefaults(opts)
|
|
err := v.UnmarshalKey("redis_writer", opts)
|
|
if err != nil {
|
|
log.Panicf("failed to read the RedisStandaloneWriter config entry. err: %v", err)
|
|
}
|
|
if opts.Cluster {
|
|
theWriter = writer.NewRedisClusterWriter(opts)
|
|
log.Infof("create RedisClusterWriter: %v", opts.Address)
|
|
} else {
|
|
theWriter = writer.NewRedisStandaloneWriter(opts)
|
|
log.Infof("create RedisStandaloneWriter: %v", opts.Address)
|
|
}
|
|
} else {
|
|
log.Panicf("no writer config entry found")
|
|
}
|
|
|
|
// create status
|
|
status.Init(theReader, theWriter)
|
|
|
|
log.Infof("start syncing...")
|
|
|
|
ch := theReader.StartRead()
|
|
for e := range ch {
|
|
// calc arguments
|
|
e.Parse()
|
|
status.AddReadCount(e.CmdName)
|
|
|
|
// filter
|
|
log.Debugf("function before: %v", e)
|
|
entries := function.RunFunction(e)
|
|
log.Debugf("function after: %v", entries)
|
|
|
|
for _, entry := range entries {
|
|
entry.Parse()
|
|
theWriter.Write(entry)
|
|
status.AddWriteCount(entry.CmdName)
|
|
}
|
|
}
|
|
|
|
theWriter.Close() // Wait for all writing operations to complete
|
|
utils.ReleaseFileLock() // Release file lock
|
|
log.Infof("all done")
|
|
}
|
|
|