|
|
@ -1,8 +1,10 @@ |
|
|
|
package utils |
|
|
|
package utils |
|
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
import ( |
|
|
|
|
|
|
|
|
|
|
|
redigoCluster "github.com/vinllen/redis-go-cluster" |
|
|
|
redigoCluster "github.com/vinllen/redis-go-cluster" |
|
|
|
redigo "github.com/garyburd/redigo/redis" |
|
|
|
redigo "github.com/garyburd/redigo/redis" |
|
|
|
|
|
|
|
"pkg/libs/log" |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
/* implement redigo.Conn(https://github.com/garyburd/redigo)
|
|
|
|
/* implement redigo.Conn(https://github.com/garyburd/redigo)
|
|
|
@ -52,12 +54,37 @@ func (cc *ClusterConn) Send(commandName string, args ...interface{}) error { |
|
|
|
// send batcher and put the return into recvChan
|
|
|
|
// send batcher and put the return into recvChan
|
|
|
|
func (cc *ClusterConn) Flush() error { |
|
|
|
func (cc *ClusterConn) Flush() error { |
|
|
|
ret, err := cc.client.RunBatch(cc.batcher) |
|
|
|
ret, err := cc.client.RunBatch(cc.batcher) |
|
|
|
|
|
|
|
defer func() { |
|
|
|
cc.batcher = nil // reset batcher
|
|
|
|
cc.batcher = nil // reset batcher
|
|
|
|
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
cc.recvChan <- reply{ |
|
|
|
cc.recvChan <- reply{ |
|
|
|
answer: ret, |
|
|
|
answer: nil, |
|
|
|
err: err, |
|
|
|
err: err, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// for redis-go-cluster driver, "Receive" function returns all the replies once flushed.
|
|
|
|
|
|
|
|
// However, this action is different with redigo driver that "Receive" only returns 1
|
|
|
|
|
|
|
|
// reply each time.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
retLength := len(ret) |
|
|
|
|
|
|
|
availableSize := cap(cc.recvChan) - len(cc.recvChan) |
|
|
|
|
|
|
|
if availableSize < retLength { |
|
|
|
|
|
|
|
log.Warnf("available channel size[%v] less than current returned batch size[%v]", availableSize, retLength) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
log.Debugf("cluster flush batch with size[%v], return replies size[%v]", cc.batcher.GetBatchSize(), retLength) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for _, ele := range ret { |
|
|
|
|
|
|
|
cc.recvChan <- reply{ |
|
|
|
|
|
|
|
answer: ele, |
|
|
|
|
|
|
|
err: err, |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|