redis-shake工具
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

103 lines
2.3 KiB

package writer
import (
"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/utils"
)
const KeySlots = 16384
type RedisClusterWriter struct {
addresses []string
writers []Writer
router [KeySlots]Writer
stat []interface{}
}
func NewRedisClusterWriter(opts *RedisWriterOptions) Writer {
rw := new(RedisClusterWriter)
rw.loadClusterNodes(opts)
log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses)
return rw
}
func (r *RedisClusterWriter) Close() {
for _, writer := range r.writers {
writer.Close()
}
}
func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) {
addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
r.addresses = addresses
for i, address := range addresses {
theOpts := *opts
theOpts.Address = address
redisWriter := NewRedisStandaloneWriter(&theOpts)
r.writers = append(r.writers, redisWriter)
for _, s := range slots[i] {
if r.router[s] != nil {
log.Panicf("redisClusterWriter: slot %d already occupied", s)
}
r.router[s] = redisWriter
}
}
for i := 0; i < KeySlots; i++ {
if r.router[i] == nil {
log.Panicf("redisClusterWriter: slot %d not occupied", i)
}
}
}
func (r *RedisClusterWriter) Write(entry *entry.Entry) {
if len(entry.Slots) == 0 {
for _, writer := range r.writers {
writer.Write(entry)
}
return
}
lastSlot := -1
for _, slot := range entry.Slots {
if lastSlot == -1 {
lastSlot = slot
}
if slot != lastSlot {
log.Panicf("CROSSSLOT Keys in request don't hash to the same slot. argv=%v", entry.Argv)
}
}
r.router[lastSlot].Write(entry)
}
func (r *RedisClusterWriter) Consistent() bool {
for _, writer := range r.writers {
if !writer.StatusConsistent() {
return false
}
}
return true
}
func (r *RedisClusterWriter) Status() interface{} {
r.stat = make([]interface{}, 0)
for _, writer := range r.writers {
r.stat = append(r.stat, writer.Status())
}
return r.stat
}
func (r *RedisClusterWriter) StatusString() string {
return "[redis_cluster_writer] writing to redis cluster"
}
func (r *RedisClusterWriter) StatusConsistent() bool {
for _, writer := range r.writers {
if !writer.StatusConsistent() {
return false
}
}
return true
}