@ -5,40 +5,45 @@
package main
package main
import (
import (
"encoding/json"
"flag"
"flag"
"fmt"
"fmt"
"math"
_ "net/http/pprof"
"os"
"os"
"os/signal"
"os/signal"
"syscall"
"reflect"
"runtime/debug"
"time"
"runtime"
"runtime"
"math"
"runtime/debug"
_ "net/http/pprof"
"strings"
"strconv"
"strconv"
"encoding/json"
"strings"
"reflect"
"syscall"
"time"
"pkg/libs/log"
"redis-shake"
"redis-shake/base"
"redis-shake/common"
"redis-shake/common"
"redis-shake/configure"
"redis-shake/configure"
"redis-shake/metric"
"redis-shake/metric"
"redis-shake"
"redis-shake/base"
"redis-shake/restful"
"redis-shake/restful"
"pkg/libs/log"
"github.com/gugemichael/nimo4go"
"github.com/gugemichael/nimo4go"
logRotate "gopkg.in/natefinch/lumberjack.v2"
logRotate "gopkg.in/natefinch/lumberjack.v2"
)
)
type Exit struct { Code int }
type Exit struct { Code int }
const (
const (
TypeDecode = "decode"
TypeDecode = "decode"
TypeRestore = "restore"
TypeRestore = "restore"
TypeDump = "dump"
TypeDump = "dump"
TypeSync = "sync"
TypeSync = "sync"
defaultHttpPort = 20881
defaultSystemPort = 20882
defaultSenderSize = 65535
defaultSenderCount = 1024
)
)
func main ( ) {
func main ( ) {
@ -83,7 +88,7 @@ func main() {
utils . Welcome ( )
utils . Welcome ( )
utils . StartTime = fmt . Sprintf ( "%v" , time . Now ( ) . Format ( utils . GolangSecurityTime ) )
utils . StartTime = fmt . Sprintf ( "%v" , time . Now ( ) . Format ( utils . GolangSecurityTime ) )
if err = utils . WritePidById ( conf . Options . Id ) ; err != nil {
if err = utils . WritePidById ( conf . Options . Id , conf . Options . PidPath ) ; err != nil {
crash ( fmt . Sprintf ( "write pid failed. %v" , err ) , - 5 )
crash ( fmt . Sprintf ( "write pid failed. %v" , err ) , - 5 )
}
}
@ -172,7 +177,9 @@ func sanitizeOptions(tp string) error {
runtime . GOMAXPROCS ( conf . Options . NCpu )
runtime . GOMAXPROCS ( conf . Options . NCpu )
}
}
if conf . Options . Parallel == 0 || conf . Options . Parallel > 1024 {
if conf . Options . Parallel == 0 { // not set
conf . Options . Parallel = 1
} else if conf . Options . Parallel > 1024 {
return fmt . Errorf ( "parallel[%v] should in (0, 1024]" , conf . Options . Parallel )
return fmt . Errorf ( "parallel[%v] should in (0, 1024]" , conf . Options . Parallel )
} else {
} else {
conf . Options . Parallel = int ( math . Max ( float64 ( conf . Options . Parallel ) , float64 ( conf . Options . NCpu ) ) )
conf . Options . Parallel = int ( math . Max ( float64 ( conf . Options . Parallel ) , float64 ( conf . Options . NCpu ) ) )
@ -204,7 +211,7 @@ func sanitizeOptions(tp string) error {
}
}
if conf . Options . LogFile != "" {
if conf . Options . LogFile != "" {
conf . Options . LogFile = fmt . Sprintf ( "%s.log" , conf . Options . Id )
//conf.Options.LogFile = fmt.Sprintf("%s.log", conf.Options.Id)
utils . LogRotater = & logRotate . Logger {
utils . LogRotater = & logRotate . Logger {
Filename : conf . Options . LogFile ,
Filename : conf . Options . LogFile ,
@ -215,9 +222,9 @@ func sanitizeOptions(tp string) error {
log . StdLog = log . New ( utils . LogRotater , "" )
log . StdLog = log . New ( utils . LogRotater , "" )
}
}
// heartbeat
// heartbeat, 86400 = 1 day
if conf . Options . HeartbeatInterval <= 0 || conf . Options . HeartbeatInterval > 86400 {
if conf . Options . HeartbeatInterval > 86400 {
return fmt . Errorf ( "HeartbeatInterval[%v] should in ( 0, 86400]" , conf . Options . HeartbeatInterval )
return fmt . Errorf ( "HeartbeatInterval[%v] should in [ 0, 86400]" , conf . Options . HeartbeatInterval )
}
}
if conf . Options . HeartbeatNetworkInterface == "" {
if conf . Options . HeartbeatNetworkInterface == "" {
conf . Options . HeartbeatIp = "127.0.0.1"
conf . Options . HeartbeatIp = "127.0.0.1"
@ -240,7 +247,7 @@ func sanitizeOptions(tp string) error {
if n , err := strconv . ParseInt ( conf . Options . FakeTime [ 1 : ] , 10 , 64 ) ; err != nil {
if n , err := strconv . ParseInt ( conf . Options . FakeTime [ 1 : ] , 10 , 64 ) ; err != nil {
return fmt . Errorf ( "parse fake_time failed[%v]" , err )
return fmt . Errorf ( "parse fake_time failed[%v]" , err )
} else {
} else {
conf . Options . ShiftTime = time . Duration ( n * int64 ( time . Millisecond ) - time . Now ( ) . UnixNano ( ) )
conf . Options . ShiftTime = time . Duration ( n * int64 ( time . Millisecond ) - time . Now ( ) . UnixNano ( ) )
}
}
default :
default :
if t , err := time . Parse ( "2006-01-02 15:04:05" , conf . Options . FakeTime ) ; err != nil {
if t , err := time . Parse ( "2006-01-02 15:04:05" , conf . Options . FakeTime ) ; err != nil {
@ -273,25 +280,38 @@ func sanitizeOptions(tp string) error {
// pass, >= 0 means enable
// pass, >= 0 means enable
}
}
if conf . Options . HttpProfile <= 0 || conf . Options . HttpProfile > 65535 {
if conf . Options . HttpProfile < 0 || conf . Options . HttpProfile > 65535 {
return fmt . Errorf ( "HttpProfile[%v] should in (0, 65535]" , conf . Options . HttpProfile )
return fmt . Errorf ( "HttpProfile[%v] should in [0, 65535]" , conf . Options . HttpProfile )
} else if conf . Options . HttpProfile == 0 {
// set to default when not set
conf . Options . HttpProfile = defaultHttpPort
}
}
if conf . Options . SystemProfile <= 0 || conf . Options . SystemProfile > 65535 {
return fmt . Errorf ( "SystemProfile[%v] should in (0, 65535]" , conf . Options . SystemProfile )
if conf . Options . SystemProfile < 0 || conf . Options . SystemProfile > 65535 {
return fmt . Errorf ( "SystemProfile[%v] should in [0, 65535]" , conf . Options . SystemProfile )
} else if conf . Options . SystemProfile == 0 {
// set to default when not set
conf . Options . SystemProfile = defaultSystemPort
}
}
if conf . Options . SenderSize <= 0 || conf . Options . SenderSize >= 1073741824 {
if conf . Options . SenderSize < 0 || conf . Options . SenderSize >= 1073741824 {
return fmt . Errorf ( "SenderSize[%v] should in (0, 1073741824]" , conf . Options . SenderSize )
return fmt . Errorf ( "SenderSize[%v] should in [0, 1073741824]" , conf . Options . SenderSize )
} else if conf . Options . SenderSize == 0 {
// set to default when not set
conf . Options . SenderSize = defaultSenderSize
}
}
if conf . Options . SenderCount <= 0 || conf . Options . SenderCount >= 100000 {
if conf . Options . SenderCount < 0 || conf . Options . SenderCount >= 100000 {
return fmt . Errorf ( "SenderCount[%v] should in (0, 100000]" , conf . Options . SenderCount )
return fmt . Errorf ( "SenderCount[%v] should in [0, 100000]" , conf . Options . SenderCount )
} else if conf . Options . SenderCount == 0 {
// set to default when not set
conf . Options . SenderCount = defaultSenderCount
}
}
if tp == TypeRestore || tp == TypeSync {
if tp == TypeRestore || tp == TypeSync {
// get target redis version and set TargetReplace.
// get target redis version and set TargetReplace.
if conf . Options . TargetRedisVersion , err = utils . GetRedisVersion ( conf . Options . TargetAddress ,
if conf . Options . TargetRedisVersion , err = utils . GetRedisVersion ( conf . Options . TargetAddress ,
conf . Options . TargetAuthType , conf . Options . TargetPasswordRaw ) ; err != nil {
conf . Options . TargetAuthType , conf . Options . TargetPasswordRaw ) ; err != nil {
return fmt . Errorf ( "get target redis version failed[%v]" , err )
return fmt . Errorf ( "get target redis version failed[%v]" , err )
} else {
} else {
if strings . HasPrefix ( conf . Options . TargetRedisVersion , "4." ) ||
if strings . HasPrefix ( conf . Options . TargetRedisVersion , "4." ) ||