5
5
"encoding/binary"
6
6
"fmt"
7
7
"hash/crc32"
8
+ "sort"
8
9
"sync"
9
10
"sync/atomic"
10
11
"time"
@@ -149,6 +150,10 @@ type cursorOffset struct {
149
150
// See kmsg.OffsetForLeaderEpochResponseTopicPartition for more
150
151
// details.
151
152
lastConsumedEpoch int32
153
+
154
+ // The current high watermark of the partition. Uninitialized (0) means
155
+ // we do not know the HWM, or there is no lag.
156
+ hwm int64
152
157
}
153
158
154
159
// use, for fetch requests, freezes a view of the cursorOffset.
@@ -172,6 +177,7 @@ func (c *cursor) unset() {
172
177
c .setOffset (cursorOffset {
173
178
offset : - 1 ,
174
179
lastConsumedEpoch : - 1 ,
180
+ hwm : 0 ,
175
181
})
176
182
}
177
183
@@ -395,6 +401,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
395
401
pCursor .from .setOffset (cursorOffset {
396
402
offset : lastReturnedRecord .Offset + 1 ,
397
403
lastConsumedEpoch : lastReturnedRecord .LeaderEpoch ,
404
+ hwm : p .HighWatermark ,
398
405
})
399
406
}
400
407
@@ -433,6 +440,7 @@ func (s *source) createReq() *fetchRequest {
433
440
maxPartBytes : s .cl .cfg .maxPartBytes ,
434
441
rack : s .cl .cfg .rack ,
435
442
isolationLevel : s .cl .cfg .isolationLevel ,
443
+ preferLagFn : s .cl .cfg .preferLagFn ,
436
444
437
445
// We copy a view of the session for the request, which allows
438
446
// modify source while the request may be reading its copy.
@@ -958,6 +966,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon
958
966
LastStableOffset : rp .LastStableOffset ,
959
967
LogStartOffset : rp .LogStartOffset ,
960
968
}
969
+ if rp .ErrorCode == 0 {
970
+ o .hwm = rp .HighWatermark
971
+ }
961
972
962
973
aborter := buildAborter (rp )
963
974
@@ -1530,6 +1541,7 @@ type fetchRequest struct {
1530
1541
rack string
1531
1542
1532
1543
isolationLevel int8
1544
+ preferLagFn PreferLagFn
1533
1545
1534
1546
numOffsets int
1535
1547
usedOffsets usedOffsets
@@ -1566,19 +1578,185 @@ func (f *fetchRequest) addCursor(c *cursor) {
1566
1578
f .numOffsets ++
1567
1579
}
1568
1580
1569
- // If the end user prefers to consume lag, we
1581
+ // PreferLagFn accepts topic and partition lag, the previously determined topic
1582
+ // order, and the previously determined per-topic partition order, and returns
1583
+ // a new topic and per-topic partition order.
1584
+ //
1585
+ // Most use cases will not need to look at the prior orders, but they exist
1586
+ // if you want to get fancy.
1587
+ //
1588
+ // You can return partial results: if you only return topics, partitions within
1589
+ // each topic keep their prior ordering. If you only return some topics but not
1590
+ // all, the topics you do not return / the partitions you do not return will
1591
+ // retain their original ordering *after* your given ordering.
1592
+ type PreferLagFn func (lag map [string ]map [int32 ]int64 , torderPrior []string , porderPrior map [string ][]int32 ) ([]string , map [string ][]int32 )
1593
+
1594
+ // PreferLagAt is a simple PreferLagFn that orders the largest lag first, for
1595
+ // any topic that is collectively lagging more than preferLagAt, and for any
1596
+ // partition that is lagging more than preferLagAt.
1597
+ //
1598
+ // The function does not prescribe any ordering for topics that have the same
1599
+ // lag. It is recommended to use a number more than 0 or 1: if you use 0, you
1600
+ // may just always undo client ordering when there is no actual lag.
1601
+ func PreferLagAt (preferLagAt int64 ) PreferLagFn {
1602
+ if preferLagAt < 0 {
1603
+ return nil
1604
+ }
1605
+ return func (lag map [string ]map [int32 ]int64 , _ []string , _ map [string ][]int32 ) ([]string , map [string ][]int32 ) {
1606
+ type plag struct {
1607
+ p int32
1608
+ lag int64
1609
+ }
1610
+ type tlag struct {
1611
+ t string
1612
+ lag int64
1613
+ ps []plag
1614
+ }
1615
+
1616
+ // First, collect all partition lag into per-topic lag.
1617
+ tlags := make (map [string ]tlag , len (lag ))
1618
+ for t , ps := range lag {
1619
+ for p , lag := range ps {
1620
+ prior := tlags [t ]
1621
+ tlags [t ] = tlag {
1622
+ t : t ,
1623
+ lag : prior .lag + lag ,
1624
+ ps : append (prior .ps , plag {p , lag }),
1625
+ }
1626
+ }
1627
+ }
1628
+
1629
+ // We now remove topics and partitions that are not lagging
1630
+ // enough. Collectively, the topic could be lagging too much,
1631
+ // but individually, no partition is lagging that much: we will
1632
+ // sort the topic first and keep the old partition ordering.
1633
+ for t , tlag := range tlags {
1634
+ if tlag .lag < preferLagAt {
1635
+ delete (tlags , t )
1636
+ continue
1637
+ }
1638
+ for i := 0 ; i < len (tlag .ps ); i ++ {
1639
+ plag := tlag .ps [i ]
1640
+ if plag .lag < preferLagAt {
1641
+ tlag .ps [i ] = tlag .ps [len (tlag .ps )- 1 ]
1642
+ tlag .ps = tlag .ps [:len (tlag .ps )- 1 ]
1643
+ i --
1644
+ }
1645
+ }
1646
+ }
1647
+ if len (tlags ) == 0 {
1648
+ return nil , nil
1649
+ }
1650
+
1651
+ var sortedLags []tlag
1652
+ for _ , tlag := range tlags {
1653
+ sort .Slice (tlag .ps , func (i , j int ) bool { return tlag .ps [i ].lag > tlag .ps [j ].lag })
1654
+ sortedLags = append (sortedLags , tlag )
1655
+ }
1656
+ sort .Slice (sortedLags , func (i , j int ) bool { return sortedLags [i ].lag > sortedLags [j ].lag })
1657
+
1658
+ // We now return our laggy topics and partitions, and let the
1659
+ // caller add back any missing topics / partitions in their
1660
+ // prior order.
1661
+ torder := make ([]string , 0 , len (sortedLags ))
1662
+ for _ , t := range sortedLags {
1663
+ torder = append (torder , t .t )
1664
+ }
1665
+ porder := make (map [string ][]int32 , len (sortedLags ))
1666
+ for _ , tlag := range sortedLags {
1667
+ ps := make ([]int32 , 0 , len (tlag .ps ))
1668
+ for _ , p := range tlag .ps {
1669
+ ps = append (ps , p .p )
1670
+ }
1671
+ porder [tlag .t ] = ps
1672
+ }
1673
+ return torder , porder
1674
+ }
1675
+ }
1676
+
1677
+ // If the end user prefers to consume lag, we reorder our previously ordered
1678
+ // partitions, preferring first the laggiest topics, and then within those, the
1679
+ // laggiest partitions.
1570
1680
func (f * fetchRequest ) adjustPreferringLag () {
1571
- if f .preferLagAt < 0 {
1681
+ if f .preferLagFn == nil {
1572
1682
return
1573
1683
}
1684
+
1685
+ tall := make (map [string ]struct {}, len (f .torder ))
1686
+ for _ , t := range f .torder {
1687
+ tall [t ] = struct {}{}
1688
+ }
1689
+ pall := make (map [string ][]int32 , len (f .porder ))
1690
+ for t , ps := range f .porder {
1691
+ pall [t ] = append ([]int32 (nil ), ps ... )
1692
+ }
1693
+
1694
+ lag := make (map [string ]map [int32 ]int64 , len (f .torder ))
1574
1695
for t , ps := range f .usedOffsets {
1696
+ plag := make (map [int32 ]int64 , len (ps ))
1697
+ lag [t ] = plag
1575
1698
for p , c := range ps {
1576
- lag := c .hwm - c .offset
1577
- if lag < f .preferLagAt {
1578
- continue
1699
+ hwm := c .hwm
1700
+ if c .hwm < 0 {
1701
+ hwm = 0
1702
+ }
1703
+ lag := hwm - c .offset
1704
+ if c .offset <= 0 {
1705
+ lag = hwm
1706
+ }
1707
+ plag [p ] = lag
1708
+ }
1709
+ }
1710
+
1711
+ torder , porder := f .preferLagFn (lag , f .torder , f .porder )
1712
+ if torder == nil && porder == nil {
1713
+ return
1714
+ }
1715
+ if torder == nil {
1716
+ torder = f .torder
1717
+ }
1718
+ if porder == nil {
1719
+ porder = f .porder
1720
+ }
1721
+ defer func () { f .torder , f .porder = torder , porder }()
1722
+
1723
+ // Remove any extra topics the user returned that we were not
1724
+ // consuming, and add all topics they did not give back.
1725
+ for i := 0 ; i < len (torder ); i ++ {
1726
+ t := torder [i ]
1727
+ if _ , exists := tall [t ]; ! exists {
1728
+ torder = append (torder [:i ], torder [i + 1 :]... ) // user gave topic we were not fetching
1729
+ i --
1730
+ }
1731
+ delete (tall , t )
1732
+ }
1733
+ for t := range tall {
1734
+ torder = append (torder , t ) // user did not return topic we were fetching
1735
+ }
1736
+
1737
+ // Now, same thing for partitions.
1738
+ pused := make (map [int32 ]struct {})
1739
+ for t , ps := range pall {
1740
+ order , exists := porder [t ]
1741
+ if ! exists {
1742
+ porder [t ] = ps // shortcut: user did not define this partition's oorder, keep old order
1743
+ continue
1744
+ }
1745
+ for _ , p := range ps {
1746
+ pused [p ] = struct {}{}
1747
+ }
1748
+ for i := 0 ; i < len (order ); i ++ {
1749
+ p := order [i ]
1750
+ if _ , exists := pused [p ]; ! exists {
1751
+ order = append (order [:i ], order [i + 1 :]... )
1752
+ i --
1579
1753
}
1580
- _ , _ , _ = t , p , c
1754
+ delete (pused , p )
1755
+ }
1756
+ for p := range pused {
1757
+ order = append (order , p )
1581
1758
}
1759
+ porder [t ] = order
1582
1760
}
1583
1761
}
1584
1762
@@ -1607,6 +1785,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
1607
1785
sessionUsed = make (map [string ]map [int32 ]struct {}, len (f .usedOffsets ))
1608
1786
}
1609
1787
1788
+ f .adjustPreferringLag ()
1789
+
1610
1790
for _ , topic := range f .torder {
1611
1791
partitions := f .usedOffsets [topic ]
1612
1792
0 commit comments