Skip to content

Commit 8ab8074

Browse files
committedNov 14, 2022
kgo connection: always allow one request after SASL
Previously, if `lifetime - pessimism ` resulted in immediate SASL expiry, we would immediately re-SASL. We now remove the loop so that at least one request always goes through. The next request will see expiry and will again re-auth. We also drop our min pessimism from 2s to 1s. Hopefully, this addresses #249.
1 parent b5a9a6b commit 8ab8074

File tree

1 file changed

+20
-49
lines changed

1 file changed

+20
-49
lines changed
 

‎pkg/kgo/broker.go

+20-49
Original file line numberDiff line numberDiff line change
@@ -349,19 +349,7 @@ func (b *broker) handleReq(pr promisedReq) {
349349
return
350350
}
351351

352-
for reauthentications := 1; !cxn.expiry.IsZero() && time.Now().After(cxn.expiry); reauthentications++ {
353-
// We allow 15 reauths, which is a lot. If a new lifetime is
354-
// <2.5s, we sleep 100ms and try again. Retrying 15x puts us at
355-
// <1s compared to the original lifetime. A broker should not
356-
// reply with a <1s lifetime, but if we end up here, then we
357-
// kill the connection ourselves and retry on a new connection.
358-
if reauthentications > 15 {
359-
cxn.cl.cfg.logger.Log(LogLevelError, "the broker has repeatedly given us short sasl lifetimes, we are forcefully killing our own connection to retry on a new connection ", "broker", logID(cxn.b.meta.NodeID))
360-
pr.promise(nil, errSaslReauthLoop)
361-
cxn.die()
362-
return
363-
}
364-
352+
if !cxn.expiry.IsZero() && time.Now().After(cxn.expiry) {
365353
// If we are after the reauth time, try to reauth. We
366354
// can only have an expiry if we went the authenticate
367355
// flow, so we know we are authenticating again.
@@ -916,60 +904,43 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
916904
if lifetimeMillis > 0 {
917905
// Lifetime is problematic. We need to be a bit pessimistic.
918906
//
919-
// We want a lowerbound: we use 2s (arbitrary), but if 1.1x our
920-
// e2e sasl latency is more than 2s, we use the latency.
907+
// We want a lowerbound: we use 1s (arbitrary), but if 1.1x our
908+
// e2e sasl latency is more than 1s, we use the latency.
921909
//
922910
// We do not want to reauthenticate too close to the lifetime
923911
// especially for larger lifetimes due to clock issues (#205).
924912
// We take 95% to 98% of the lifetime.
925-
minPessimismMillis := float64(2 * time.Second.Milliseconds())
913+
minPessimismMillis := float64(time.Second.Milliseconds())
926914
latencyMillis := 1.1 * float64(time.Since(prereq).Milliseconds())
927915
if latencyMillis > minPessimismMillis {
928916
minPessimismMillis = latencyMillis
929917
}
930918
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%)
931919

932-
// Our minimum lifetime is always 2s (or latency, if larger).
933-
//
934-
// If rng is 0, we begin using max lifetime at 40s:
935-
//
936-
// maxLifetime = 40s - (40s * 0.05) = 38s
937-
// minLifetime = 40s - 2s = 38s
938-
//
939-
// If rng is 1, we begin using max lifetime at 25s:
940-
//
941-
// maxLifetime = 25s - (25s * 0.08) = 23s
942-
// minLifetime = 25s - 2s = 23s
943-
//
944-
// Every second after, we add between 0.05s or 0.08s to our
945-
// backoff:
946-
//
947-
// rng@0: maxLifetime = 41s - (41s * 0.05) = 38.95
948-
// rng@1: maxLifetime = 26s - (26s * 0.08) = 23.92
949-
//
950-
// At 12hr, we reauth ~24 to 28min before the lifetime.
920+
// Our minimum lifetime is always 1s (or latency, if larger).
921+
// When our max pessimism becomes more than min pessimism,
922+
// every second after, we add between 0.05s or 0.08s to our
923+
// backoff. At 12hr, we reauth ~24 to 28min before the
924+
// lifetime.
951925
usePessimismMillis := maxPessimismMillis
952926
if minPessimismMillis > maxPessimismMillis {
953927
usePessimismMillis = minPessimismMillis
954928
}
955929
useLifetimeMillis := lifetimeMillis - int64(usePessimismMillis)
956930

957-
// If our lifetime is <0 (broker said our lifetime is less than
958-
// our client picked min), we sleep for 100ms and retry.
959-
// Brokers should give us longer lifetimes, but that may not
960-
// always happen (see #136). We sleep to avoid spin loop
961-
// reauthenticating.
931+
// Subtracting our min pessimism may result in our connection
932+
// immediately expiring. We always accept this one reauth to
933+
// issue our one request, and our next request will again
934+
// reauth. Brokers should give us longer lifetimes, but that
935+
// may not always happen (see #136, #249).
962936
now := time.Now()
963937
cxn.expiry = now.Add(time.Duration(useLifetimeMillis) * time.Millisecond)
964-
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", logID(cxn.b.meta.NodeID), "reauthenticate_in", cxn.expiry.Sub(now))
965-
if useLifetimeMillis < 0 {
966-
cxn.cl.cfg.logger.Log(LogLevelInfo, "sasl lifetime minus lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop",
967-
"broker", logID(cxn.b.meta.NodeID),
968-
"session_lifetime", time.Duration(lifetimeMillis)*time.Millisecond,
969-
"latency_lower_bound", time.Duration(latencyMillis)*time.Millisecond,
970-
)
971-
time.Sleep(100 * time.Millisecond)
972-
}
938+
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime",
939+
"broker", logID(cxn.b.meta.NodeID),
940+
"session_lifetime", time.Duration(lifetimeMillis)*time.Millisecond,
941+
"lifetime_pessimism", time.Duration(usePessimismMillis)*time.Millisecond,
942+
"reauthenticate_in", cxn.expiry.Sub(now),
943+
)
973944
}
974945
return nil
975946
}

0 commit comments

Comments
 (0)
Please sign in to comment.