Skip to content

Commit

Permalink
u
Browse files Browse the repository at this point in the history
  • Loading branch information
wxsms committed Jan 18, 2024
1 parent 6dac431 commit 2e61a24
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions source/_posts/etcd-queue-problem.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: etcd 队列引发的问题一则
title: etcd 队列引发的问题一则及源码解析
tags:
- go
- etcd
Expand Down Expand Up @@ -375,7 +375,7 @@ cli, err := clientv3.New(clientv3.Config{

## etcd 队列源码赏析

抛开上面遇到的问题不谈,etcd clientv3 中实现的多读多写分布式队列还是很值得学习的。队列源码文件位于 `client/v3/experimental/recipes/queue.go`,其暴露的接口如下:
抛开上面遇到的问题不谈,etcd clientv3 中实现的“多读多写”分布式队列还是很值得学习的。队列源码文件位于 `client/v3/experimental/recipes/queue.go`,其暴露的接口如下:

```go
package recipe // import "go.etcd.io/etcd/client/v3/experimental/recipes"
Expand All @@ -390,7 +390,7 @@ func (q *Queue) Dequeue() (string, error)
func (q *Queue) Enqueue(val string) error
```

该 package 非常简洁,只暴露了三个函数:创建一个队列,出队和入队。下面一一解读。
该 package 非常简洁,只暴露了三个函数:创建队列,出队和入队。下面一一解读。

### NewQueue

Expand All @@ -400,7 +400,7 @@ func NewQueue(client *v3.Client, keyPrefix string) *Queue {
}
```

`NewQueue` 函数接收一个 client 实例指针和一个 `keyPrefix` 字符串(可以认为是队列名),返回一个队列实例指针。队列实例还额外创建了一个 `context` 实例,但是并没有允许使用者自定义该参数。这里貌似是一个值得商榷的点`context.TODO` 也可以看出。正常来说,`context` 实例应该在调用 `Enqueue``Dequeue` 的时候单独传入,这样才能发挥它的预期作用(配置超时时间等)。
`NewQueue` 函数接收一个 client 实例指针和一个 `keyPrefix` 字符串(可以认为是队列名),返回一个队列实例指针。队列实例还额外创建了一个 `context` 实例,但是并没有允许使用者自定义该参数。这里貌似是一个值得商榷的点`context.TODO` 也可以看出。正常来说,`context` 实例应该在调用 `Enqueue``Dequeue` 的时候单独传入,这样才能发挥它的预期作用(配置超时时间等)。

### Enqueue

Expand All @@ -411,7 +411,7 @@ func (q *Queue) Enqueue(val string) error {
}
```

`Enqueue` 函数的实现非常简单:从 `newUniqueKV` 这个函数来看,是根据 `keyPrefix``value` 来创建了一个唯一的键值对。然而 etcd 要怎么在多读多写的环境下保证键的唯一性呢?深入 `newUniqueKV` 函数:
`Enqueue` 函数的实现非常简单:从 `newUniqueKV` 这个函数来看,是根据 `keyPrefix``value` 来创建了一个唯一的键值对。然而 etcd 要怎么在“多写”的环境下保证键的唯一性呢?继续深入 `newUniqueKV` 函数:

```go
func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {
Expand All @@ -432,7 +432,7 @@ func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {

1. 如果创建成功了,则返回键值对;
2. 如果出错了:
1. 出现了 `ErrKeyExists` 错误,回到循环开始处,继续尝试创建
1. 出现了 `ErrKeyExists` 错误,代表这个 key 已经被使用了,因此回到循环开始处继续尝试创建
2. 出现了其它任意错误,返回错误信息。

这里其实还有一个知识点:etcd 的储存模式是 MVCC (Multi-Version Concurrency Control,即多版本并发控制)。也就是说,如果对同一个键进行多次写,etcd 本身并不会报 `ErrKeyExists` 错,而是会为这个 key 创建一个新的版本号和值。那么这里 `putNewKV` 要如何保证创建该 key 时它必须是不存在的呢?继续深入源码:
Expand All @@ -454,12 +454,14 @@ func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
}
```

首先看注释`putNewKV` 尝试创建一个 key,且只有在这个 key 不存在的情况下才会成功。
先看注释`putNewKV` 尝试创建一个 key,且只有在这个 key 不存在的情况下才会成功。

这里其实利用了 etcd 提供的事务(`kv.Txn`)功能,etcd 设计了一套非常易读且强大的事务接口
这里其实利用了 etcd 提供的事务(`kv.Txn`)功能,etcd 设计了一套易读且强大的事务接口

```go
kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
cmp := v3.Compare(v3.Version(key), "=", 0)
req := v3.OpPut(key, val, v3.WithLease(leaseID))
txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
```

字面意义理解:如果 `cmp` 条件成立(`key``version``0`,即当前不存在这个 `key`),则执行 `req`(创建这个 `key`),然后提交(`Commit`)。结合 `newUniqueKV` 中的 `for` 循环来看,这其实是一把乐观锁。
Expand Down Expand Up @@ -510,18 +512,18 @@ func (q *Queue) Dequeue() (string, error) {
}
```

先看注释:`Dequeue` 会将入队的元素按 FIFO (先入先出)的顺序出队如果当前没有可出队的元素则会阻塞直到出队成功。
先看注释:`Dequeue` 会将入队的元素按 FIFO (先入先出)的顺序出队如果当前没有可出队的元素则会阻塞直到出队成功。

出队的流程总结:

1. 尝试通过 `keyPrefix` 为前缀获取一个元素,其中 `WithFirstRev` 指定获取最老的修订版本号(即最早入队的元素);
2. 尝试“claim”获取到的这个 key,“claim”的方式即删除:
1. 如果该客户端成功执行了删除,则认为出队成功,返回元素;
2. 如果没有成功删除(在多读的场景下,代表这个元素被其它客户端“claim”了),但还有更多元素可以获取,则递归调用自己,重复上述流程
2. 如果没有成功删除(在“多读”的场景下,代表这个元素被其它客户端“claim”了),但还有更多元素可以获取,则递归调用自己,重复上述流程
3. 如果即没有成功“claim”,也没有更多元素可以获取了,则启动一个 watch 进程,开始监听新创建的元素,同时函数在这里阻塞
4. 监听到新元素后,同样的,尝试将该元素删除:
1. 如果成功删除,则认为出队成功,返回元素
2. 如果删除失败(在多读的场景下,代表这个元素被其它客户端出队了),则递归调用自己,重复上述流程
2. 如果删除失败(在“多读”的场景下,代表这个元素被其它客户端出队了),则递归调用自己,重复上述流程

`claimFirstKey` 中调用了 `deleteRevKey` 来删除元素,该函数同样利用了事务来确保不重复删除(不重复出队):

Expand All @@ -540,11 +542,12 @@ func deleteRevKey(kv v3.KV, key string, rev int64) (bool, error) {
}
```

同样的,由于 MVCC 的特殊性,这里需要通过在删除前比较 key 的 `Revision`即版本号),确保执行删除时该版本号没有发生变化,然后才执行删除。因为在 etcd 中,删除一个 key 并不会真正将它物理意义上的删除,而是在版本数据库中插入一条“删除”的记录,并且修改版本号。如果不做事务判断,则有可能出现重复删除(重复出队)的情况。
同样的,由于 MVCC 的特殊性,这里需要通过在删除前比较 key 的 `ModRevision`即最新版本号),确保执行删除时该版本号没有发生变化,然后才执行删除。这是因为在 etcd 中,删除一个 key 并不会真正将它物理意义上的删除,而是在版本数据库中插入一条“删除”的记录,并且修改版本号。如果不做事务判断,则有可能出现重复删除(重复出队)的情况。

## 参考

* https://etcd.io/docs/v3.2/learning/auth_design/
* https://github.com/etcd-io/etcd/issues/12385
* https://github.com/etcd-io/etcd/pull/14322
* https://etcd.io/docs/v3.2/op-guide/security/
* https://etcd.io/docs/v3.2/op-guide/security/
* https://github.com/etcd-io/etcd/blob/main/client/v3/experimental/recipes/queue.go

0 comments on commit 2e61a24

Please sign in to comment.