You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
3.6 KiB

package writer
import (
type RedisWriterOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
type redisStandaloneWriter struct {
address string
client *client.Redis
DbId int
chWaitReply chan *entry.Entry
chWg sync.WaitGroup
stat struct {
Name string `json:"name"`
UnansweredBytes int64 `json:"unanswered_bytes"`
UnansweredEntries int64 `json:"unanswered_entries"`
func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer {
rw := new(redisStandaloneWriter)
rw.address = opts.Address
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
rw.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit)
go rw.processReply()
return rw
func (w *redisStandaloneWriter) Close() {
func (w *redisStandaloneWriter) Write(e *entry.Entry) {
// switch db if we need
if w.DbId != e.DbId {
// send
bytes := e.Serialize()
for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
time.Sleep(1 * time.Nanosecond)
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
log.Debugf("[%s] switch db to [%d]", w.stat.Name, newDbId)
w.client.Send("select", strconv.Itoa(newDbId))
w.DbId = newDbId
w.chWaitReply <- &entry.Entry{
Argv: []string{"select", strconv.Itoa(newDbId)},
CmdName: "select",
func (w *redisStandaloneWriter) processReply() {
for e := range w.chWaitReply {
reply, err := w.client.Receive()
log.Debugf("[%s] receive reply. reply=[%v], cmd=[%s]", w.stat.Name, reply, e.String())
if err == proto.Nil {
log.Warnf("[%s] receive nil reply. cmd=[%s]", w.stat.Name, e.String())
} else if err != nil {
if err.Error() == "BUSYKEY Target key name already exists." {
if config.Opt.Advanced.RDBRestoreCommandBehavior == "skip" {
log.Debugf("[%s] redisStandaloneWriter received BUSYKEY reply. cmd=[%s]", w.stat.Name, e.String())
} else if config.Opt.Advanced.RDBRestoreCommandBehavior == "panic" {
log.Panicf("[%s] redisStandaloneWriter received BUSYKEY reply. cmd=[%s]", w.stat.Name, e.String())
} else {
log.Panicf("[%s] receive reply failed. cmd=[%s], error=[%v]", w.stat.Name, e.String(), err)
if strings.EqualFold(e.CmdName, "select") { // skip select command
atomic.AddInt64(&w.stat.UnansweredBytes, -e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, -1)
func (w *redisStandaloneWriter) Status() interface{} {
return w.stat
func (w *redisStandaloneWriter) StatusString() string {
return fmt.Sprintf("[%s]: unanswered_entries=%d", w.stat.Name, atomic.LoadInt64(&w.stat.UnansweredEntries))
func (w *redisStandaloneWriter) StatusConsistent() bool {
return atomic.LoadInt64(&w.stat.UnansweredBytes) == 0 && atomic.LoadInt64(&w.stat.UnansweredEntries) == 0