Skip to content

Commit

Permalink
u
Browse files Browse the repository at this point in the history
  • Loading branch information
wxsms committed Jan 18, 2024
1 parent 700be3e commit 6dac431
Showing 1 changed file with 75 additions and 2 deletions.
77 changes: 75 additions & 2 deletions source/_posts/etcd-queue-problem.md
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,85 @@ func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
```

字面意义理解:如果 `cmp` 条件成立(`key``version``0`,即当前不存在这个 `key`),则执行 `req`(创建这个 `key`),然后提交(`Commit`)。
字面意义理解:如果 `cmp` 条件成立(`key``version``0`,即当前不存在这个 `key`),则执行 `req`(创建这个 `key`),然后提交(`Commit`)。结合 `newUniqueKV` 中的 `for` 循环来看,这其实是一把乐观锁。

入队函数到这里就结束了,一个看似简单的函数,里面包含了很多知识点!
`Enqueue` 函数到这里就结束了:该函数通过创建一个 `key` 来代表元素入队,同时通过乐观锁的方式,保证了多写的情况下一个 `key` 不会被重复入队。一个看似简单的函数,里面包含了很多知识点!

### Dequeue

`Dequeue` 是队列的另一个核心函数,同时也是本次实践中出现问题的函数。它的源码:

```go
// Dequeue returns Enqueue()'d elements in FIFO order. If the
// queue is empty, Dequeue blocks until elements are available.
func (q *Queue) Dequeue() (string, error) {
// TODO: fewer round trips by fetching more than one key
resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
if err != nil {
return "", err
}

kv, err := claimFirstKey(q.client, resp.Kvs)
if err != nil {
return "", err
} else if kv != nil {
return string(kv.Value), nil
} else if resp.More {
// missed some items, retry to read in more
return q.Dequeue()
}

// nothing yet; wait on elements
ev, err := WaitPrefixEvents(
q.client,
q.keyPrefix,
resp.Header.Revision,
[]mvccpb.Event_EventType{mvccpb.PUT})
if err != nil {
return "", err
}

ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
if err != nil {
return "", err
} else if !ok {
return q.Dequeue()
}
return string(ev.Kv.Value), err
}
```

先看注释:`Dequeue` 会将入队的元素按 FIFO (先入先出)的顺序出队。如果当前没有可出队的元素则会阻塞直到出队成功。

出队的流程总结:

1. 尝试通过 `keyPrefix` 为前缀获取一个元素,其中 `WithFirstRev` 指定获取最老的修订版本号(即最早入队的元素);
2. 尝试“claim”获取到的这个 key,“claim”的方式即删除:
1. 如果该客户端成功执行了删除,则认为出队成功,返回元素;
2. 如果没有成功删除(在多读的场景下,代表这个元素被其它客户端“claim”了),但还有更多元素可以获取,则递归调用自己,重复上述流程
3. 如果即没有成功“claim”,也没有更多元素可以获取了,则启动一个 watch 进程,开始监听新创建的元素,同时函数在这里阻塞
4. 监听到新元素后,同样的,尝试将该元素删除:
1. 如果成功删除,则认为出队成功,返回元素
2. 如果删除失败(在多读的场景下,代表这个元素被其它客户端出队了),则递归调用自己,重复上述流程

`claimFirstKey` 中调用了 `deleteRevKey` 来删除元素,该函数同样利用了事务来确保不重复删除(不重复出队):

```go
// deleteRevKey deletes a key by revision, returning false if key is missing
func deleteRevKey(kv v3.KV, key string, rev int64) (bool, error) {
cmp := v3.Compare(v3.ModRevision(key), "=", rev)
req := v3.OpDelete(key)
txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
if err != nil {
return false, err
} else if !txnresp.Succeeded {
return false, nil
}
return true, nil
}
```

同样的,由于 MVCC 的特殊性,这里需要通过在删除前比较 key 的 `Revision` (即版本号),确保执行删除时该版本号没有发生变化,然后才执行删除。因为在 etcd 中,删除一个 key 并不会真正将它物理意义上的删除,而是在版本数据库中插入一条“删除”的记录,并且修改版本号。如果不做事务判断,则有可能出现重复删除(重复出队)的情况。

## 参考

Expand Down

0 comments on commit 6dac431

Please sign in to comment.