diff --git a/ChangeLog b/ChangeLog index 6400947..27036d2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,6 @@ +2019-03-11 Alibaba Cloud. + * version: 1.2.0 + * redis-shake: support 5.0. 2019-02-21 Alibaba Cloud. * version: 1.0.0 - * mongo-shake: initial release. + * redis-shake: initial release. diff --git a/conf/redis-shake.conf b/conf/redis-shake.conf index ed46eb3..fff9078 100644 --- a/conf/redis-shake.conf +++ b/conf/redis-shake.conf @@ -19,7 +19,7 @@ parallel = 4 # input RDB file. read from stdin, default is stdin ('/dev/stdin'). # used in `decode` and `restore`. # 如果是decode或者restore,这个参数表示读取的rdb文件 -input_rdb = local_dump +input_rdb = local # output RDB file. default is stdout ('/dev/stdout'). # used in `decode` and `dump`. @@ -35,8 +35,8 @@ source.address = 127.0.0.1:20441 source.password_raw = 123456 # auth type, don't modify it source.auth_type = auth -# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0) -source.version = 6 +# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0) +source.version = 9 # target redis configuration. used in `restore` and `sync`. # used in `restore` and `sync`. @@ -47,7 +47,7 @@ target.address = 127.0.0.1:20551 target.password_raw = 123456 # auth type, don't modify it target.auth_type = auth -# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0) +# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0, 9 for >= 5.0) target.version = 6 # all the data will come into this db. < 0 means disable. # used in `restore` and `sync`. @@ -94,7 +94,7 @@ psync = false metric = true # print in log # 是否将metric打印到log中 -metric.print_log = true +metric.print_log = false # heartbeat # send heartbeat to this url @@ -126,6 +126,12 @@ sender.count = 5000 # 用于metric统计时延的队列 sender.delay_channel_size = 65535 +# enable keep_alive option in TCP when connecting redis. +# the unit is second. +# 0 means disable. +# TCP keep-alive保活参数,单位秒,0表示不启用。 +keep_alive = 0 + # ----------------splitter---------------- # below variables are useless for current opensource version so don't set. diff --git a/src/pkg/rdb/loader.go b/src/pkg/rdb/loader.go index b174463..03997d9 100644 --- a/src/pkg/rdb/loader.go +++ b/src/pkg/rdb/loader.go @@ -154,12 +154,13 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) { default: var key []byte if l.remainMember == 0 { + // first time visit this key. rkey, err := l.ReadString() if err != nil { return nil, err } key = rkey - entry.NeedReadLen = 1 + entry.NeedReadLen = 1 // read value length when it's the first time. } else { key = l.lastEntry.Key } @@ -178,6 +179,7 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) { if l.lastReadCount == l.totMemberCount { entry.RealMemberCount = 0 } else { + // RealMemberCount > 0 means this is big entry which also is a split key. entry.RealMemberCount = l.lastReadCount } l.lastEntry = entry diff --git a/src/pkg/rdb/reader.go b/src/pkg/rdb/reader.go index 43cff17..f608b00 100644 --- a/src/pkg/rdb/reader.go +++ b/src/pkg/rdb/reader.go @@ -16,7 +16,7 @@ import ( // "libs/log" ) -var FromVersion int64 = 8 +var FromVersion int64 = 9 var ToVersion int64 = 6 const ( @@ -27,12 +27,13 @@ const ( RdbTypeHash = 4 RdbTypeZSet2 = 5 - RdbTypeHashZipmap = 9 - RdbTypeListZiplist = 10 - RdbTypeSetIntset = 11 - RdbTypeZSetZiplist = 12 - RdbTypeHashZiplist = 13 - RdbTypeQuicklist = 14 + RdbTypeHashZipmap = 9 + RdbTypeListZiplist = 10 + RdbTypeSetIntset = 11 + RdbTypeZSetZiplist = 12 + RdbTypeHashZiplist = 13 + RdbTypeQuicklist = 14 + RDBTypeStreamListPacks = 15 // stream rdbFlagOnlyValue = 0xf9 rdbFlagAUX = 0xfa @@ -46,7 +47,8 @@ const ( const ( rdb6bitLen = 0 rdb14bitLen = 1 - rdb32bitLen = 2 + rdb32bitLen = 0x80 + rdb64bitLen = 0x81 rdbEncVal = 3 rdbEncInt8 = 0 @@ -91,7 +93,7 @@ func (r *rdbReader) offset() int64 { func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) { var b bytes.Buffer - r = NewRdbReader(io.TeeReader(r, &b)) + r = NewRdbReader(io.TeeReader(r, &b)) // the result will be written into b when calls r.Read() lr := l.rdbReader switch t { default: @@ -181,6 +183,112 @@ func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) { if lr.lastReadCount == n { lr.remainMember = 0 } + case RDBTypeStreamListPacks: + // TODO, need to judge big key + lr.lastReadCount, lr.remainMember, lr.totMemberCount = 0, 0, 0 + // list pack length + nListPacks, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nListPacks); i++ { + // read twice + if _, err := r.ReadString(); err != nil { + return nil, err + } + if _, err := r.ReadString(); err != nil { + return nil, err + } + } + + // items + if _, err := r.ReadLength(); err != nil { + return nil, err + } + // last_entry_id timestamp second + if _, err := r.ReadLength(); err != nil { + return nil, err + } + // last_entry_id timestamp millisecond + if _, err := r.ReadLength(); err != nil { + return nil, err + } + + // cgroups length + nCgroups, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nCgroups); i++ { + // cname + if _, err := r.ReadString(); err != nil { + return nil, err + } + + // last_cg_entry_id timestamp second + if _, err := r.ReadLength(); err != nil { + return nil, err + } + // last_cg_entry_id timestamp millisecond + if _, err := r.ReadLength(); err != nil { + return nil, err + } + + // pending number + nPending, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nPending); i++ { + // eid, read 16 bytes + b := make([]byte, 16) + if err := r.readFull(b); err != nil { + return nil, err + } + + // seen_time + b = make([]byte, 8) + if err := r.readFull(b); err != nil { + return nil, err + } + + // delivery_count + if _, err := r.ReadLength(); err != nil { + return nil, err + } + } + + // consumers + nConsumers, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nConsumers); i++ { + // cname + if _, err := r.ReadString(); err != nil { + return nil, err + } + + // seen_time + b := make([]byte, 8) + if err := r.readFull(b); err != nil { + return nil, err + } + + // pending + nPending2, err := r.ReadLength() + if err != nil { + return nil, err + } + for i := 0; i < int(nPending2); i++ { + // eid, read 16 bytes + b := make([]byte, 16) + if err := r.readFull(b); err != nil { + return nil, err + } + } + } + } } return b.Bytes(), nil } @@ -226,16 +334,25 @@ func (r *rdbReader) readEncodedLength() (length uint32, encoded bool, err error) if err != nil { return } - length = uint32(u & 0x3f) switch u >> 6 { case rdb6bitLen: + length = uint32(u & 0x3f) case rdb14bitLen: - u, err = r.readUint8() - length = (length << 8) + uint32(u) + var u2 uint8 + u2, err = r.readUint8() + length = (uint32(u & 0x3f) << 8) + uint32(u2) case rdbEncVal: encoded = true + length = uint32(u & 0x3f) default: - length, err = r.readUint32BigEndian() + switch u { + case rdb32bitLen: + length, err = r.readUint32BigEndian() + case rdb64bitLen: + length, err = r.readUint64BigEndian() + default: + length, err = 0, fmt.Errorf("unknown encoding length[%v]", u) + } } return } @@ -325,6 +442,12 @@ func (r *rdbReader) readUint32BigEndian() (uint32, error) { return binary.BigEndian.Uint32(b), err } +func (r *rdbReader) readUint64BigEndian() (uint32, error) { + b := r.buf[:8] + err := r.readFull(b) + return binary.BigEndian.Uint32(b), err +} + func (r *rdbReader) readInt8() (int8, error) { u, err := r.readUint8() return int8(u), err diff --git a/src/redis-shake/common/utils.go b/src/redis-shake/common/utils.go index 98d5bd4..483a6ef 100644 --- a/src/redis-shake/common/utils.go +++ b/src/redis-shake/common/utils.go @@ -34,7 +34,10 @@ func OpenRedisConnWithTimeout(target, auth_type, passwd string, readTimeout, wri } func OpenNetConn(target, auth_type, passwd string) net.Conn { - c, err := net.Dial("tcp", target) + d := net.Dialer{ + KeepAlive: time.Duration(conf.Options.KeepAlive) * time.Second, + } + c, err := d.Dial("tcp", target) if err != nil { log.PanicErrorf(err, "cannot connect to '%s'", target) } @@ -301,15 +304,16 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) { if err != nil { log.PanicError(err, "read rdb ") } - //log.Info("restore quicklist key: ", string(e.Key), ", type: ", t) + // log.Info("restore quicklist key: ", string(e.Key), ", type: ", e.Type) count := 0 if n, err := r.ReadLength(); err != nil { log.PanicError(err, "read rdb ") } else { - //log.Info("quicklist item size: ", int(n)) + // log.Info("quicklist item size: ", int(n)) for i := 0; i < int(n); i++ { ziplist, err := r.ReadString() + // log.Info("zipList: ", ziplist) if err != nil { log.PanicError(err, "read rdb ") } @@ -317,12 +321,13 @@ func restoreQuicklistEntry(c redigo.Conn, e *rdb.BinEntry) { if zln, err := r.ReadZiplistLength(buf); err != nil { log.PanicError(err, "read rdb") } else { - //log.Info("ziplist one of quicklist, size: ", int(zln)) + // log.Info("ziplist one of quicklist, size: ", int(zln)) for i := int64(0); i < zln; i++ { entry, err := r.ReadZiplistEntry(buf) if err != nil { log.PanicError(err, "read rdb ") } + // log.Info("rpush key: ", e.Key, " value: ", entry) count++ c.Send("RPUSH", e.Key, entry) if count == 100 { @@ -674,7 +679,9 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { return } - if uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0 { + // TODO, need to judge big key + if e.Type != rdb.RDBTypeStreamListPacks && + (uint64(len(e.Value)) > conf.Options.BigKeyThreshold || e.RealMemberCount != 0) { //use command if conf.Options.Rewrite && e.NeedReadLen == 1 { if !conf.Options.Metric { @@ -694,6 +701,8 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) { } return } + + // fmt.Printf("kkkey: %v, value: %v\n", string(e.Key), e.Value) s, err := redigo.String(c.Do("restore", e.Key, ttlms, e.Value)) if err != nil { /*The reply value of busykey in 2.8 kernel is "target key name is busy", diff --git a/src/redis-shake/configure/configure.go b/src/redis-shake/configure/configure.go index ec16e8d..44437ad 100644 --- a/src/redis-shake/configure/configure.go +++ b/src/redis-shake/configure/configure.go @@ -39,6 +39,7 @@ type Configuration struct { SenderSize uint64 `config:"sender.size"` SenderCount uint `config:"sender.count"` SenderDelayChannelSize uint `config:"sender.delay_channel_size"` + KeepAlive uint `config:"keep_alive"` // inner variables ReplaceHashTag bool `config:"replace_hash_tag"` diff --git a/src/redis-shake/sync.go b/src/redis-shake/sync.go index 5c86495..6521d90 100644 --- a/src/redis-shake/sync.go +++ b/src/redis-shake/sync.go @@ -361,7 +361,7 @@ func (cmd *CmdSync) SyncCommand(reader *bufio.Reader, target, auth_type, passwd offset, err := utils.GetFakeSlaveOffset(srcConn) if err != nil { // log.PurePrintf("%s\n", NewLogItem("GetFakeSlaveOffsetFail", "WARN", NewErrorLogDetail("", err.Error()))) - log.Warnf("Event:GetFakeSlaveOffsetFail\tId:%s\tError:%s", conf.Options.Id, err.Error()) + log.Warnf("Event:GetFakeSlaveOffsetFail\tId:%s\tWarn:%s", conf.Options.Id, err.Error()) // Reconnect while network error happen if err == io.EOF {