redis-shake工具
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

106 lines
3.3 KiB

import typing
import pybbt
import requests
import toml
from helpers.constant import PATH_REDIS_SHAKE
from helpers.redis import Redis
from helpers.utils.filesystem import create_empty_dir
from helpers.utils.network import get_free_port
from helpers.utils.timer import Timer
class ShakeOpts:
@staticmethod
def create_sync_opts(src: Redis, dst: Redis) -> typing.Dict:
d = {
"sync_reader": {
"cluster": src.is_cluster(),
"address": src.get_address()
},
"redis_writer": {
"cluster": dst.is_cluster(),
"address": dst.get_address()
}
}
return d
@staticmethod
def create_scan_opts(src: Redis, dst: Redis) -> typing.Dict:
d = {
"scan_reader": {
"cluster": src.is_cluster(),
"address": src.get_address()
},
"redis_writer": {
"cluster": dst.is_cluster(),
"address": dst.get_address()
}
}
return d
@staticmethod
def create_rdb_opts(rdb_path: str, dts: Redis) -> typing.Dict:
d = {
"rdb_reader": {"filepath": rdb_path},
"redis_writer": {
"cluster": dts.is_cluster(),
"address": dts.get_address()
}
}
return d
@staticmethod
def create_aof_opts(aof_path: str, dts: Redis, timestamp: int = 0) -> typing.Dict:
d = {
"aof_reader": {"filepath": aof_path, "timestamp": timestamp},
"redis_writer": {
"cluster": dts.is_cluster(),
"address": dts.get_address()
}
}
return d
class Shake:
def __init__(self, opts: typing.Dict):
self.case_ctx = pybbt.get_case_context()
self.status_port = get_free_port()
self.status_url = f"http://localhost:{self.status_port}"
opts["advanced"] = {"status_port": self.status_port, "log_level": "debug"}
self.dir = f"{self.case_ctx.dir}/shake{self.status_port}"
create_empty_dir(self.dir)
with open(f"{self.dir}/shake.toml", "w") as f:
toml.dump(opts, f)
self.server = pybbt.Launcher(args=[PATH_REDIS_SHAKE, "shake.toml"], work_dir=self.dir)
self.case_ctx.add_exit_hook(lambda: self.server.stop())
self._wait_start()
@staticmethod
def run_once(opts: typing.Dict):
status_port = get_free_port()
run_dir = f"{pybbt.get_case_context().dir}/shake{status_port}"
create_empty_dir(run_dir)
with open(f"{run_dir}/shake.toml", "w") as f:
toml.dump(opts, f)
server = pybbt.Launcher(args=[PATH_REDIS_SHAKE, "shake.toml"], work_dir=run_dir)
server.wait_stop()
def get_status(self):
ret = requests.get(self.status_url)
return ret.json()
def _wait_start(self, timeout=5):
timer = Timer()
while True:
try:
self.get_status()
return
except requests.exceptions.ConnectionError:
pass
if timer.elapsed() > timeout:
raise Exception(f"Shake server not started in {timeout} seconds")
def is_consistent(self):
return self.get_status()["consistent"]