Skip to content

Commit 76430a8

Browse files
committedNov 14, 2022
kgo: add option to consume preferring laggy partitions
This adds three new APIs: func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt type PreferLagFn func(lag map[string]map[int32]int64, torderPrior []string, porderPrior map[string][]int32) ([]string, map[string][]int32) func PreferLagAt(preferLagAt int64) PreferLagFn These functions allow an end user to adjust the order of partitions that are being fetched. Ideally, an end user will only need: kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50)) But, PreferLagFn exists to allow for more advanced use cases. Closes #222
1 parent cf392a3 commit 76430a8

File tree

3 files changed

+209
-6
lines changed

3 files changed

+209
-6
lines changed
 

‎pkg/kgo/config.go

+22
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ type cfg struct {
138138
isolationLevel int8
139139
keepControl bool
140140
rack string
141+
preferLagFn PreferLagFn
141142

142143
maxConcurrentFetches int
143144
disableFetchSessions bool
@@ -1306,6 +1307,27 @@ func DisableFetchSessions() ConsumerOpt {
13061307
return consumerOpt{func(cfg *cfg) { cfg.disableFetchSessions = true }}
13071308
}
13081309

1310+
// ConsumePreferringLagFn allows you to re-order partitions before they are
1311+
// fetched, given each partition's current lag.
1312+
//
1313+
// By default, the client rotates partitions fetched by one after every fetch
1314+
// request. Kafka answers fetch requests in the order that partitions are
1315+
// requested, filling the fetch response until FetchMaxBytes and
1316+
// FetchMaxPartitionBytes are hit. All partitions eventually rotate to the
1317+
// front, ensuring no partition is starved.
1318+
//
1319+
// With this option, you can return topic order and per-topic partition
1320+
// ordering. These orders will sort to the front (first by topic, then by
1321+
// partition). Any topic or partitions that you do not return are added to the
1322+
// end, preserving their original ordering.
1323+
//
1324+
// For a simple lag preference that sorts the laggiest topics and partitions
1325+
// first, use `kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50))` (or some other
1326+
// similar lag number).
1327+
func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt {
1328+
return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }}
1329+
}
1330+
13091331
//////////////////////////////////
13101332
// CONSUMER GROUP CONFIGURATION //
13111333
//////////////////////////////////

‎pkg/kgo/group_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
125125
ConsumeTopics(c.consumeFrom),
126126
Balancers(c.balancer),
127127
MaxBufferedRecords(10000),
128+
ConsumePreferringLagFn(PreferLagAt(10)),
128129

129130
// Even with autocommitting, autocommitting does not commit
130131
// *the latest* when being revoked. We always want to commit

‎pkg/kgo/source.go

+186-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"fmt"
77
"hash/crc32"
8+
"sort"
89
"sync"
910
"sync/atomic"
1011
"time"
@@ -149,6 +150,10 @@ type cursorOffset struct {
149150
// See kmsg.OffsetForLeaderEpochResponseTopicPartition for more
150151
// details.
151152
lastConsumedEpoch int32
153+
154+
// The current high watermark of the partition. Uninitialized (0) means
155+
// we do not know the HWM, or there is no lag.
156+
hwm int64
152157
}
153158

154159
// use, for fetch requests, freezes a view of the cursorOffset.
@@ -172,6 +177,7 @@ func (c *cursor) unset() {
172177
c.setOffset(cursorOffset{
173178
offset: -1,
174179
lastConsumedEpoch: -1,
180+
hwm: 0,
175181
})
176182
}
177183

@@ -395,6 +401,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
395401
pCursor.from.setOffset(cursorOffset{
396402
offset: lastReturnedRecord.Offset + 1,
397403
lastConsumedEpoch: lastReturnedRecord.LeaderEpoch,
404+
hwm: p.HighWatermark,
398405
})
399406
}
400407

@@ -433,6 +440,7 @@ func (s *source) createReq() *fetchRequest {
433440
maxPartBytes: s.cl.cfg.maxPartBytes,
434441
rack: s.cl.cfg.rack,
435442
isolationLevel: s.cl.cfg.isolationLevel,
443+
preferLagFn: s.cl.cfg.preferLagFn,
436444

437445
// We copy a view of the session for the request, which allows
438446
// modify source while the request may be reading its copy.
@@ -958,6 +966,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon
958966
LastStableOffset: rp.LastStableOffset,
959967
LogStartOffset: rp.LogStartOffset,
960968
}
969+
if rp.ErrorCode == 0 {
970+
o.hwm = rp.HighWatermark
971+
}
961972

962973
aborter := buildAborter(rp)
963974

@@ -1530,6 +1541,7 @@ type fetchRequest struct {
15301541
rack string
15311542

15321543
isolationLevel int8
1544+
preferLagFn PreferLagFn
15331545

15341546
numOffsets int
15351547
usedOffsets usedOffsets
@@ -1566,19 +1578,185 @@ func (f *fetchRequest) addCursor(c *cursor) {
15661578
f.numOffsets++
15671579
}
15681580

1569-
// If the end user prefers to consume lag, we
1581+
// PreferLagFn accepts topic and partition lag, the previously determined topic
1582+
// order, and the previously determined per-topic partition order, and returns
1583+
// a new topic and per-topic partition order.
1584+
//
1585+
// Most use cases will not need to look at the prior orders, but they exist
1586+
// if you want to get fancy.
1587+
//
1588+
// You can return partial results: if you only return topics, partitions within
1589+
// each topic keep their prior ordering. If you only return some topics but not
1590+
// all, the topics you do not return / the partitions you do not return will
1591+
// retain their original ordering *after* your given ordering.
1592+
type PreferLagFn func(lag map[string]map[int32]int64, torderPrior []string, porderPrior map[string][]int32) ([]string, map[string][]int32)
1593+
1594+
// PreferLagAt is a simple PreferLagFn that orders the largest lag first, for
1595+
// any topic that is collectively lagging more than preferLagAt, and for any
1596+
// partition that is lagging more than preferLagAt.
1597+
//
1598+
// The function does not prescribe any ordering for topics that have the same
1599+
// lag. It is recommended to use a number more than 0 or 1: if you use 0, you
1600+
// may just always undo client ordering when there is no actual lag.
1601+
func PreferLagAt(preferLagAt int64) PreferLagFn {
1602+
if preferLagAt < 0 {
1603+
return nil
1604+
}
1605+
return func(lag map[string]map[int32]int64, _ []string, _ map[string][]int32) ([]string, map[string][]int32) {
1606+
type plag struct {
1607+
p int32
1608+
lag int64
1609+
}
1610+
type tlag struct {
1611+
t string
1612+
lag int64
1613+
ps []plag
1614+
}
1615+
1616+
// First, collect all partition lag into per-topic lag.
1617+
tlags := make(map[string]tlag, len(lag))
1618+
for t, ps := range lag {
1619+
for p, lag := range ps {
1620+
prior := tlags[t]
1621+
tlags[t] = tlag{
1622+
t: t,
1623+
lag: prior.lag + lag,
1624+
ps: append(prior.ps, plag{p, lag}),
1625+
}
1626+
}
1627+
}
1628+
1629+
// We now remove topics and partitions that are not lagging
1630+
// enough. Collectively, the topic could be lagging too much,
1631+
// but individually, no partition is lagging that much: we will
1632+
// sort the topic first and keep the old partition ordering.
1633+
for t, tlag := range tlags {
1634+
if tlag.lag < preferLagAt {
1635+
delete(tlags, t)
1636+
continue
1637+
}
1638+
for i := 0; i < len(tlag.ps); i++ {
1639+
plag := tlag.ps[i]
1640+
if plag.lag < preferLagAt {
1641+
tlag.ps[i] = tlag.ps[len(tlag.ps)-1]
1642+
tlag.ps = tlag.ps[:len(tlag.ps)-1]
1643+
i--
1644+
}
1645+
}
1646+
}
1647+
if len(tlags) == 0 {
1648+
return nil, nil
1649+
}
1650+
1651+
var sortedLags []tlag
1652+
for _, tlag := range tlags {
1653+
sort.Slice(tlag.ps, func(i, j int) bool { return tlag.ps[i].lag > tlag.ps[j].lag })
1654+
sortedLags = append(sortedLags, tlag)
1655+
}
1656+
sort.Slice(sortedLags, func(i, j int) bool { return sortedLags[i].lag > sortedLags[j].lag })
1657+
1658+
// We now return our laggy topics and partitions, and let the
1659+
// caller add back any missing topics / partitions in their
1660+
// prior order.
1661+
torder := make([]string, 0, len(sortedLags))
1662+
for _, t := range sortedLags {
1663+
torder = append(torder, t.t)
1664+
}
1665+
porder := make(map[string][]int32, len(sortedLags))
1666+
for _, tlag := range sortedLags {
1667+
ps := make([]int32, 0, len(tlag.ps))
1668+
for _, p := range tlag.ps {
1669+
ps = append(ps, p.p)
1670+
}
1671+
porder[tlag.t] = ps
1672+
}
1673+
return torder, porder
1674+
}
1675+
}
1676+
1677+
// If the end user prefers to consume lag, we reorder our previously ordered
1678+
// partitions, preferring first the laggiest topics, and then within those, the
1679+
// laggiest partitions.
15701680
func (f *fetchRequest) adjustPreferringLag() {
1571-
if f.preferLagAt < 0 {
1681+
if f.preferLagFn == nil {
15721682
return
15731683
}
1684+
1685+
tall := make(map[string]struct{}, len(f.torder))
1686+
for _, t := range f.torder {
1687+
tall[t] = struct{}{}
1688+
}
1689+
pall := make(map[string][]int32, len(f.porder))
1690+
for t, ps := range f.porder {
1691+
pall[t] = append([]int32(nil), ps...)
1692+
}
1693+
1694+
lag := make(map[string]map[int32]int64, len(f.torder))
15741695
for t, ps := range f.usedOffsets {
1696+
plag := make(map[int32]int64, len(ps))
1697+
lag[t] = plag
15751698
for p, c := range ps {
1576-
lag := c.hwm - c.offset
1577-
if lag < f.preferLagAt {
1578-
continue
1699+
hwm := c.hwm
1700+
if c.hwm < 0 {
1701+
hwm = 0
1702+
}
1703+
lag := hwm - c.offset
1704+
if c.offset <= 0 {
1705+
lag = hwm
1706+
}
1707+
plag[p] = lag
1708+
}
1709+
}
1710+
1711+
torder, porder := f.preferLagFn(lag, f.torder, f.porder)
1712+
if torder == nil && porder == nil {
1713+
return
1714+
}
1715+
if torder == nil {
1716+
torder = f.torder
1717+
}
1718+
if porder == nil {
1719+
porder = f.porder
1720+
}
1721+
defer func() { f.torder, f.porder = torder, porder }()
1722+
1723+
// Remove any extra topics the user returned that we were not
1724+
// consuming, and add all topics they did not give back.
1725+
for i := 0; i < len(torder); i++ {
1726+
t := torder[i]
1727+
if _, exists := tall[t]; !exists {
1728+
torder = append(torder[:i], torder[i+1:]...) // user gave topic we were not fetching
1729+
i--
1730+
}
1731+
delete(tall, t)
1732+
}
1733+
for t := range tall {
1734+
torder = append(torder, t) // user did not return topic we were fetching
1735+
}
1736+
1737+
// Now, same thing for partitions.
1738+
pused := make(map[int32]struct{})
1739+
for t, ps := range pall {
1740+
order, exists := porder[t]
1741+
if !exists {
1742+
porder[t] = ps // shortcut: user did not define this partition's oorder, keep old order
1743+
continue
1744+
}
1745+
for _, p := range ps {
1746+
pused[p] = struct{}{}
1747+
}
1748+
for i := 0; i < len(order); i++ {
1749+
p := order[i]
1750+
if _, exists := pused[p]; !exists {
1751+
order = append(order[:i], order[i+1:]...)
1752+
i--
15791753
}
1580-
_, _, _ = t, p, c
1754+
delete(pused, p)
1755+
}
1756+
for p := range pused {
1757+
order = append(order, p)
15811758
}
1759+
porder[t] = order
15821760
}
15831761
}
15841762

@@ -1607,6 +1785,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
16071785
sessionUsed = make(map[string]map[int32]struct{}, len(f.usedOffsets))
16081786
}
16091787

1788+
f.adjustPreferringLag()
1789+
16101790
for _, topic := range f.torder {
16111791
partitions := f.usedOffsets[topic]
16121792

0 commit comments

Comments
 (0)
Please sign in to comment.