docs: add function documents

v4
suxb201 1 year ago committed by suxb201
parent 47e5b74f77
commit 6b3e723bb3
  1. 1
      .gitignore
  2. 4
      docs/src/zh/function/best_practices.md
  3. 47
      docs/src/zh/function/introduction.md
  4. 13
      docs/src/zh/reader/scan_reader.md
  5. 9
      docs/src/zh/reader/sync_reader.md
  6. 13
      docs/src/zh/writer/redis_writer.md
  7. 4
      internal/commands/keys.go
  8. 8
      internal/commands/keys_test.go
  9. 11
      internal/entry/entry.go
  10. 6
      internal/function/function.go

1
.gitignore vendored

@ -7,6 +7,7 @@ __pycache__/
bin/
dist/
tmp/
data/
*.log
*.rdb
*.aof

@ -4,4 +4,8 @@ outline: deep
# 最佳实践
## 修改
### 修改 Key 的前缀
TODO

@ -4,5 +4,50 @@ outline: deep
# 什么是 function
TODO
RedisShake 通过提供 function 功能,实现了全面的 [ETL(提取-转换-加载)](https://en.wikipedia.org/wiki/Extract,_transform,_load) 功能,以便对数据进行处理。通过利用 function 功能,可以实现以下几种操作:
* 更改数据所属的 db,比如将源端的 `db` 0 写入到目的端的 `db` 1。
* 对数据进行筛选,例如,只将 key 以 `user:` 开头的源数据写入到目标端。
* 改变 Key 的前缀,例如,将源端的 key `prefix_old_key` 写入到目标端的 key `prefix_new_key`
* ...
要使用 function 功能,只需编写一份 lua 脚本。RedisShake 在从源端获取数据后,会将数据转换为 Redis 命令。然后,它会处理这些命令,从中解析出 `KEYS`、`ARGV`、`SLOTS`、`GROUP` 等信息,并将这些信息传递给 lua 脚本。lua 脚本会处理这些数据,并返回处理后的命令。最后,RedisShake 会将处理后的数据写入到目标端。
以下是一个具体的例子:
```toml
function = """
shake.log(DB)
if DB == 0
then
return
end
shake.call(DB, ARGV)
"""
[sync_reader]
address = "127.0.0.1:6379"
[redis_writer]
address = "127.0.0.1:6380"
```
`DB` 是 RedisShake 提供的信息,表示当前数据所属的 db。`shake.log` 用于打印日志,`shake.call` 用于调用 Redis 命令。上述脚本的目的是丢弃源端 `db` 0 的数据,将其他 `db` 的数据写入到目标端。
除了 `DB`,还有其他信息如 `KEYS`、`ARGV`、`SLOTS`、`GROUP` 等,可供调用的函数有 `shake.log``shake.call`,具体请参考 [function API](#function-api)。
关于更多的示例,可以参考 [最佳实践](./best_practices.md)。
## function API
### 变量
| 变量 | 类型 | 示例 | 描述 |
|-|-|-|-----|
| DB | number | 1 | 命令所属的 `db` |
| GROUP | string | "LIST" | 命令所属的 `group`,符合 [Command key specifications](https://redis.io/docs/reference/key-specs/),可以在 [commands](https://github.com/tair-opensource/RedisShake/tree/v4/scripts/commands) 中查询每个命令的 `group` 字段 |
| CMD | string | "XGROUP-DELCONSUMER" | 命令的名称 |
| KEYS | table | \{"key1", "key2"\} | 命令的所有 Key |
| KEY_INDEXES | table | \{2, 4\} | 命令的所有 Key 在 `ARGV` 中的索引 |
| SLOTS | table | \{9189, 4998\} | 当前命令的所有 Key 所属的 [slot](https://redis.io/docs/reference/cluster-spec/#key-distribution-model) |
| ARGV | table | \{"mset", "key1", "value1", "key2", "value2"\} | 命令的所有参数 |
### 函数
* `shake.call(DB, ARGV)`:返回一个 Redis 命令,RedisShake 会将该命令写入目标端。
* `shake.log(DB, ARGV)`:打印日志。

@ -22,13 +22,20 @@ cluster = false # set to true if source is a redis cluster
address = "127.0.0.1:6379" # when cluster is true, set address to one of the cluster node
username = "" # keep empty if not using ACL
password = "" # keep empty if no authentication is required
ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription
tls = false
ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription
```
* 当源端为集群时,配置 cluster 为 true,address 为集群中的任意一个节点即可。`scan_reader` 会通过 `cluster nodes` 命令自动获取集群中的所有节点,并建立连接获取数据。
* 开启 `ksn` 参数后 RedisShake 会在 `SCAN` 之前使用 [Redis keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/)
* `cluster`:源端是否为集群
* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可
* 鉴权:
* 当源端使用 ACL 账号时,配置 `username``password`
* 当源端使用传统账号时,仅配置 `password`
* 当源端无鉴权时,不配置 `username``password`
* `tls`:源端是否开启 TLS/SSL,不需要配置证书因为 RedisShake 没有校验服务器证书
* `ksn`:开启 `ksn` 参数后 RedisShake 会在 `SCAN` 之前使用 [Redis keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/)
能力来订阅 Key 的变化。当 Key 发生变化时,RedisShake 会使用 `DUMP``RESTORE` 命令来从源端读取 Key 的内容,并写入目标端。
::: warning
Redis keyspace notifications 不会感知到 `FLUSHALL``FLUSHDB` 命令,因此在使用 `ksn` 参数时,需要确保源端数据库不会执行这两个命令。
:::

@ -22,5 +22,10 @@ password = "" # keep empty if no authentication is required
tls = false
```
* 当源端为集群时,配置 `cluster` 为 true,`address` 为集群中的任意一个节点即可。`sync_reader` 会通过 `cluster nodes` 命令获取集群中的所有节点信息,并建立连接获取数据。
* `cluster`:源端是否为集群
* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可
* 鉴权:
* 当源端使用 ACL 账号时,配置 `username``password`
* 当源端使用传统账号时,仅配置 `password`
* 当源端无鉴权时,不配置 `username``password`
* `tls`:源端是否开启 TLS/SSL,不需要配置证书因为 RedisShake 没有校验服务器证书

@ -15,5 +15,14 @@ password = "" # keep empty if no authentication is required
tls = false
```
* 当目的端为集群时,配置 cluster 为 true,address 为集群中的任意一个节点即可。`redis_writer` 会通过 `cluster nodes` 命令获取集群中的所有节点,并建立连接。
* 当目的端为集群时,应保证源端发过来的命令满足 [Key 的哈希值属于同一个 slot](https://redis.io/docs/reference/cluster-spec/#implemented-subset)。
* `cluster`:源端是否为集群
* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可
* 鉴权:
* 当源端使用 ACL 账号时,配置 `username``password`
* 当源端使用传统账号时,仅配置 `password`
* 当源端无鉴权时,不配置 `username``password`
* `tls`:源端是否开启 TLS/SSL,不需要配置证书因为 RedisShake 没有校验服务器证书
注意事项:
1. 当目的端为集群时,应保证源端发过来的命令满足 [Key 的哈希值属于同一个 slot](https://redis.io/docs/reference/cluster-spec/#implemented-subset)。
2. 应尽量保证目的端版本大于等于源端版本,否则可能会出现不支持的命令。如确实需要降低版本,可以设置 `target_redis_proto_max_bulk_len` 为 0,来避免使用 `restore` 命令恢复数据。

@ -10,7 +10,7 @@ import (
)
// CalcKeys https://redis.io/docs/reference/key-specs/
func CalcKeys(argv []string) (cmaName string, group string, keys []string) {
func CalcKeys(argv []string) (cmaName string, group string, keys []string, keysIndexes []int) {
argc := len(argv)
group = "unknown"
cmaName = strings.ToUpper(argv[0])
@ -64,6 +64,7 @@ func CalcKeys(argv []string) (cmaName string, group string, keys []string) {
keyStep := spec.findKeysRangeKeyStep
for inx := begin; inx <= lastKeyInx && limitCount > 0; inx += keyStep {
keys = append(keys, argv[inx])
keysIndexes = append(keysIndexes, inx+1)
limitCount -= 1
}
case "keynum":
@ -79,6 +80,7 @@ func CalcKeys(argv []string) (cmaName string, group string, keys []string) {
step := spec.findKeysKeynumKeyStep
for inx := begin + firstKey; keyCount > 0; inx += step {
keys = append(keys, argv[inx])
keysIndexes = append(keysIndexes, inx+1)
keyCount -= 1
}
default:

@ -18,25 +18,25 @@ func testEq(a, b []string) bool {
func TestCalcKeys(t *testing.T) {
// SET
cmd, group, keys := CalcKeys([]string{"SET", "key", "value"})
cmd, group, keys, _ := CalcKeys([]string{"SET", "key", "value"})
if cmd != "SET" || group != "STRING" || !testEq(keys, []string{"key"}) {
t.Errorf("CalcKeys(SET key value) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys)
}
// MSET
cmd, group, keys = CalcKeys([]string{"MSET", "key1", "value1", "key2", "value2"})
cmd, group, keys, _ = CalcKeys([]string{"MSET", "key1", "value1", "key2", "value2"})
if cmd != "MSET" || group != "STRING" || !testEq(keys, []string{"key1", "key2"}) {
t.Errorf("CalcKeys(MSET key1 value1 key2 value2) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys)
}
// XADD
cmd, group, keys = CalcKeys([]string{"XADD", "key", "*", "field1", "value1", "field2", "value2"})
cmd, group, keys, _ = CalcKeys([]string{"XADD", "key", "*", "field1", "value1", "field2", "value2"})
if cmd != "XADD" || group != "STREAM" || !testEq(keys, []string{"key"}) {
t.Errorf("CalcKeys(XADD key * field1 value1 field2 value2) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys)
}
// ZUNIONSTORE
cmd, group, keys = CalcKeys([]string{"ZUNIONSTORE", "key", "2", "key1", "key2"})
cmd, group, keys, _ = CalcKeys([]string{"ZUNIONSTORE", "key", "2", "key1", "key2"})
if cmd != "ZUNIONSTORE" || group != "SORTED_SET" || !testEq(keys, []string{"key", "key1", "key2"}) {
t.Errorf("CalcKeys(ZUNIONSTORE key 2 key1 key2) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys)
}

@ -12,10 +12,11 @@ type Entry struct {
DbId int // required
Argv []string // required
CmdName string
Group string
Keys []string
Slots []int
CmdName string
Group string
Keys []string
KeyIndexes []int
Slots []int
// for stat
SerializedSize int64
@ -51,6 +52,6 @@ func (e *Entry) Serialize() []byte {
}
func (e *Entry) Parse() {
e.CmdName, e.Group, e.Keys = commands.CalcKeys(e.Argv)
e.CmdName, e.Group, e.Keys, e.KeyIndexes = commands.CalcKeys(e.Argv)
e.Slots = commands.CalcSlots(e.Keys)
}

@ -23,6 +23,7 @@ func Init() {
// GROUP
// CMD
// KEYS
// KEY_INDEXES
// SLOTS
// ARGV
@ -49,6 +50,11 @@ func RunFunction(e *entry.Entry) []*entry.Entry {
for _, slot := range e.Slots {
slots.Append(lua.LNumber(slot))
}
keyIndexes := L.NewTable()
for _, keyIndex := range e.KeyIndexes {
keyIndexes.Append(lua.LNumber(keyIndex))
}
L.SetGlobal("KEY_INDEXES", keyIndexes)
L.SetGlobal("SLOTS", slots)
argv := L.NewTable()
for _, arg := range e.Argv {

Loading…
Cancel
Save