@@ -14,9 +14,15 @@ import (
14
14
)
15
15
16
16
type producer struct {
17
- bufferedRecords atomicI64
18
- bufferedBytes atomicI64
19
- inflight atomicI64 // high 16: # waiters, low 48: # inflight
17
+ inflight atomicI64 // high 16: # waiters, low 48: # inflight
18
+
19
+ // mu and c are used for flush and drain notifications; mu is used for
20
+ // a few other tight locks.
21
+ mu sync.Mutex
22
+ c * sync.Cond
23
+
24
+ bufferedRecords int64
25
+ bufferedBytes int64
20
26
21
27
cl * Client
22
28
@@ -45,19 +51,14 @@ type producer struct {
45
51
// We must have a producer field for flushing; we cannot just have a
46
52
// field on recBufs that is toggled on flush. If we did, then a new
47
53
// recBuf could be created and records sent to while we are flushing.
48
- flushing atomicI32 // >0 if flushing, can Flush many times concurrently
49
- blocked atomicI32 // >0 if over max recs or bytes
54
+ flushing atomicI32 // >0 if flushing, can Flush many times concurrently
55
+ blocked atomicI32 // >0 if over max recs or bytes
56
+ blockedBytes int64
50
57
51
58
aborting atomicI32 // >0 if aborting, can abort many times concurrently
52
59
53
- idMu sync.Mutex
54
- idVersion int16
55
- waitBuffer chan struct {}
56
-
57
- // mu and c are used for flush and drain notifications; mu is used for
58
- // a few other tight locks.
59
- mu sync.Mutex
60
- c * sync.Cond
60
+ idMu sync.Mutex
61
+ idVersion int16
61
62
62
63
batchPromises ringBatchPromise
63
64
promisesMu sync.Mutex
@@ -86,14 +87,18 @@ type producer struct {
86
87
// flushing records produced by your client (which can help determine network /
87
88
// cluster health).
88
89
func (cl * Client ) BufferedProduceRecords () int64 {
89
- return cl .producer .bufferedRecords .Load ()
90
+ cl .producer .mu .Lock ()
91
+ defer cl .producer .mu .Unlock ()
92
+ return cl .producer .bufferedRecords + int64 (cl .producer .blocked .Load ())
90
93
}
91
94
92
95
// BufferedProduceBytes returns the number of bytes currently buffered for
93
96
// producing within the client. This is the sum of all keys, values, and header
94
97
// keys/values. See the related [BufferedProduceRecords] for more information.
95
98
func (cl * Client ) BufferedProduceBytes () int64 {
96
- return cl .producer .bufferedBytes .Load ()
99
+ cl .producer .mu .Lock ()
100
+ defer cl .producer .mu .Unlock ()
101
+ return cl .producer .bufferedBytes + cl .producer .blockedBytes
97
102
}
98
103
99
104
type unknownTopicProduces struct {
@@ -106,7 +111,6 @@ func (p *producer) init(cl *Client) {
106
111
p .cl = cl
107
112
p .topics = newTopicsPartitions ()
108
113
p .unknownTopics = make (map [string ]* unknownTopicProduces )
109
- p .waitBuffer = make (chan struct {}, math .MaxInt32 )
110
114
p .idVersion = - 1
111
115
p .id .Store (& producerID {
112
116
id : - 1 ,
@@ -397,58 +401,93 @@ func (cl *Client) produce(
397
401
}
398
402
}
399
403
400
- var (
401
- userSize = r .userSize ()
402
- bufRecs = p .bufferedRecords .Add (1 )
403
- bufBytes = p .bufferedBytes .Add (userSize )
404
- overMaxRecs = bufRecs > cl .cfg .maxBufferedRecords
405
- overMaxBytes bool
406
- )
407
- if cl .cfg .maxBufferedBytes > 0 {
408
- if userSize > cl .cfg .maxBufferedBytes {
409
- p .promiseRecord (promisedRec {ctx , promise , r }, kerr .MessageTooLarge )
410
- return
411
- }
412
- overMaxBytes = bufBytes > cl .cfg .maxBufferedBytes
413
- }
414
-
404
+ // We can now fail the rec after the buffered hook.
415
405
if r .Topic == "" {
416
- p .promiseRecord (promisedRec {ctx , promise , r }, errNoTopic )
406
+ p .promiseRecordBeforeBuf (promisedRec {ctx , promise , r }, errNoTopic )
417
407
return
418
408
}
419
409
if cl .cfg .txnID != nil && ! p .producingTxn .Load () {
420
- p .promiseRecord (promisedRec {ctx , promise , r }, errNotInTransaction )
410
+ p .promiseRecordBeforeBuf (promisedRec {ctx , promise , r }, errNotInTransaction )
421
411
return
422
412
}
423
413
414
+ userSize := r .userSize ()
415
+ if cl .cfg .maxBufferedBytes > 0 && userSize > cl .cfg .maxBufferedBytes {
416
+ p .promiseRecordBeforeBuf (promisedRec {ctx , promise , r }, kerr .MessageTooLarge )
417
+ return
418
+ }
419
+
420
+ // We have to grab the produce lock to check if this record will exceed
421
+ // configured limits. We try to keep the logic tight since this is
422
+ // effectively a global lock around producing.
423
+ var (
424
+ nextBufRecs , nextBufBytes int64
425
+ overMaxRecs , overMaxBytes bool
426
+
427
+ calcNums = func () {
428
+ nextBufRecs = p .bufferedRecords + 1
429
+ nextBufBytes = p .bufferedBytes + userSize
430
+ overMaxRecs = nextBufRecs > cl .cfg .maxBufferedRecords
431
+ overMaxBytes = cl .cfg .maxBufferedBytes > 0 && nextBufBytes > cl .cfg .maxBufferedBytes
432
+ }
433
+ )
434
+ p .mu .Lock ()
435
+ calcNums ()
424
436
if overMaxRecs || overMaxBytes {
437
+ if ! block || cl .cfg .manualFlushing {
438
+ p .mu .Unlock ()
439
+ p .promiseRecordBeforeBuf (promisedRec {ctx , promise , r }, ErrMaxBuffered )
440
+ return
441
+ }
442
+
443
+ // Before we potentially unlinger, add that we are blocked to
444
+ // ensure we do NOT start a linger anymore. We THEN wakeup
445
+ // anything that is actively lingering. Note that blocked is
446
+ // also used when finishing promises to see if we need to be
447
+ // notified.
448
+ p .blocked .Add (1 )
449
+ p .blockedBytes += userSize
450
+ p .mu .Unlock ()
451
+
425
452
cl .cfg .logger .Log (LogLevelDebug , "blocking Produce because we are either over max buffered records or max buffered bytes" ,
426
453
"over_max_records" , overMaxRecs ,
427
454
"over_max_bytes" , overMaxBytes ,
428
455
)
429
- // Before we potentially unlinger, add that we are blocked.
430
- // Lingering always checks blocked, so we will not start a
431
- // linger while we are blocked. We THEN wakeup anything that
432
- // is actively lingering.
433
- cl .producer .blocked .Add (1 )
456
+
434
457
cl .unlingerDueToMaxRecsBuffered ()
435
- // If the client ctx cancels or the produce ctx cancels, we
436
- // need to un-count our buffering of this record. We also need
437
- // to drain a slot from the waitBuffer chan, which could be
438
- // sent to right when we are erroring.
458
+
459
+ // We keep the lock when we exit. If we are flushing, we want
460
+ // this blocked record to be produced before we return from
461
+ // flushing. This blocked record will be accounted for in the
462
+ // bufferedRecords addition below, after being removed from
463
+ // blocked in the goroutine.
464
+ wait := make (chan struct {})
465
+ var quit bool
466
+ go func () {
467
+ defer close (wait )
468
+ p .mu .Lock ()
469
+ calcNums ()
470
+ for ! quit && (overMaxRecs || overMaxBytes ) {
471
+ p .c .Wait ()
472
+ calcNums ()
473
+ }
474
+ p .blocked .Add (- 1 )
475
+ p .blockedBytes -= userSize
476
+ }()
477
+
439
478
drainBuffered := func (err error ) {
440
- p .promiseRecord ( promisedRec { ctx , promise , r }, err )
441
- <- p . waitBuffer
442
- cl . producer . blocked . Add ( - 1 )
443
- }
444
- if ! block || cl . cfg . manualFlushing {
445
- drainBuffered ( ErrMaxBuffered )
446
- return
479
+ p .mu . Lock ( )
480
+ quit = true
481
+ p . mu . Unlock ( )
482
+ p . c . Broadcast () // wake the goroutine above
483
+ <- wait
484
+ p . mu . Unlock () // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked )
485
+ p . promiseRecordBeforeBuf ( promisedRec { ctx , promise , r }, err )
447
486
}
487
+
448
488
select {
449
- case <- p .waitBuffer :
450
- cl .cfg .logger .Log (LogLevelDebug , "Produce block signaled, continuing to produce" )
451
- cl .producer .blocked .Add (- 1 )
489
+ case <- wait :
490
+ cl .cfg .logger .Log (LogLevelDebug , "Produce block awoken, we now have space to produce, continuing to partition and produce" )
452
491
case <- cl .ctx .Done ():
453
492
drainBuffered (ErrClientClosed )
454
493
cl .cfg .logger .Log (LogLevelDebug , "client ctx canceled while blocked in Produce, returning" )
@@ -459,6 +498,9 @@ func (cl *Client) produce(
459
498
return
460
499
}
461
500
}
501
+ p .bufferedRecords = nextBufRecs
502
+ p .bufferedBytes = nextBufBytes
503
+ p .mu .Unlock ()
462
504
463
505
cl .partitionRecord (promisedRec {ctx , promise , r })
464
506
}
@@ -468,6 +510,7 @@ type batchPromise struct {
468
510
pid int64
469
511
epoch int16
470
512
attrs RecordAttrs
513
+ beforeBuf bool
471
514
partition int32
472
515
recs []promisedRec
473
516
err error
@@ -483,6 +526,10 @@ func (p *producer) promiseRecord(pr promisedRec, err error) {
483
526
p .promiseBatch (batchPromise {recs : []promisedRec {pr }, err : err })
484
527
}
485
528
529
+ func (p * producer ) promiseRecordBeforeBuf (pr promisedRec , err error ) {
530
+ p .promiseBatch (batchPromise {recs : []promisedRec {pr }, beforeBuf : true , err : err })
531
+ }
532
+
486
533
func (p * producer ) finishPromises (b batchPromise ) {
487
534
cl := p .cl
488
535
var more bool
@@ -495,7 +542,7 @@ start:
495
542
pr .ProducerID = b .pid
496
543
pr .ProducerEpoch = b .epoch
497
544
pr .Attrs = b .attrs
498
- cl .finishRecordPromise (pr , b .err )
545
+ cl .finishRecordPromise (pr , b .err , b . beforeBuf )
499
546
b .recs [i ] = promisedRec {}
500
547
}
501
548
p .promisesMu .Unlock ()
@@ -509,7 +556,7 @@ start:
509
556
}
510
557
}
511
558
512
- func (cl * Client ) finishRecordPromise (pr promisedRec , err error ) {
559
+ func (cl * Client ) finishRecordPromise (pr promisedRec , err error , beforeBuffering bool ) {
513
560
p := & cl .producer
514
561
515
562
if p .hooks != nil && len (p .hooks .unbuffered ) > 0 {
@@ -519,22 +566,27 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error) {
519
566
}
520
567
521
568
// Capture user size before potential modification by the promise.
569
+ //
570
+ // We call the promise before finishing the flush notification,
571
+ // allowing users of Flush to know all buf recs are done by the
572
+ // time we notify flush below.
522
573
userSize := pr .userSize ()
523
- nowBufBytes := p .bufferedBytes .Add (- userSize )
524
- nowBufRecs := p .bufferedRecords .Add (- 1 )
525
- wasOverMaxRecs := nowBufRecs >= cl .cfg .maxBufferedRecords
526
- wasOverMaxBytes := cl .cfg .maxBufferedBytes > 0 && nowBufBytes + userSize > cl .cfg .maxBufferedBytes
527
-
528
- // We call the promise before finishing the record; this allows users
529
- // of Flush to know that all buffered records are completely done
530
- // before Flush returns.
531
574
pr .promise (pr .Record , err )
532
575
533
- if wasOverMaxRecs || wasOverMaxBytes {
534
- p .waitBuffer <- struct {}{}
535
- } else if nowBufRecs == 0 && p .flushing .Load () > 0 {
536
- p .mu .Lock ()
537
- p .mu .Unlock () //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
576
+ // If this record was never buffered, it's size was never accounted
577
+ // for on any p field: return early.
578
+ if beforeBuffering {
579
+ return
580
+ }
581
+
582
+ // Keep the lock as tight as possible: the broadcast can come after.
583
+ p .mu .Lock ()
584
+ p .bufferedBytes -= userSize
585
+ p .bufferedRecords --
586
+ broadcast := p .blocked .Load () > 0 || p .bufferedRecords == 0 && p .flushing .Load () > 0
587
+ p .mu .Unlock ()
588
+
589
+ if broadcast {
538
590
p .c .Broadcast ()
539
591
}
540
592
}
@@ -1021,7 +1073,7 @@ func (cl *Client) Flush(ctx context.Context) error {
1021
1073
defer p .mu .Unlock ()
1022
1074
defer close (done )
1023
1075
1024
- for ! quit && p .bufferedRecords . Load () > 0 {
1076
+ for ! quit && p .bufferedRecords + int64 ( p . blocked . Load () ) > 0 {
1025
1077
p .c .Wait ()
1026
1078
}
1027
1079
}()
0 commit comments