-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CASSANDRA-19534 (5.0 patch): Unbounded queues in native transport requests lead to node instability #3274
base: cassandra-5.0
Are you sure you want to change the base?
Conversation
@@ -143,7 +144,7 @@ public ColumnFilter columnFilter() | |||
* @param state client state | |||
* @return the result of the query. | |||
*/ | |||
public PartitionIterator execute(ConsistencyLevel consistency, ClientState state, long queryStartNanoTime) throws RequestExecutionException; | |||
public PartitionIterator execute(ConsistencyLevel consistency, ClientState state, Dispatcher.RequestTime requestTime) throws RequestExecutionException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: JavaDoc update while we're here?
src/java/org/apache/cassandra/service/paxos/v1/AbstractPaxosCallback.java
Outdated
Show resolved
Hide resolved
@@ -34,6 +34,7 @@ | |||
import org.apache.cassandra.locator.EndpointsForToken; | |||
import org.apache.cassandra.locator.ReplicaPlan; | |||
import org.apache.cassandra.locator.ReplicaPlan.ForWrite; | |||
import org.apache.cassandra.transport.Dispatcher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: A bunch of imports out of place?
public volatile CQLStartTime cql_start_time = CQLStartTime.REQUEST; | ||
|
||
public boolean native_transport_throw_on_overload = false; | ||
public double native_transport_queue_max_item_age_threshold = Double.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I want to try to work the token "ratio" into the name of this one, but haven't been able to come up w/ something concrete :D
@@ -19,15 +19,19 @@ | |||
|
|||
import io.airlift.airline.Command; | |||
|
|||
import io.airlift.airline.Option; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Move up to be next to Command
import
@@ -178,7 +182,8 @@ protected boolean processOneContainedMessage(ShareableBytes bytes, Limit endpoin | |||
|
|||
// max CQL message size defaults to 256mb, so should be safe to downcast | |||
int messageSize = Ints.checkedCast(header.bodySizeInBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to your patch, but I think I forgot to throw an @Override
annotation on processOneContainedMessage()
|
||
if (delay > 0) | ||
{ | ||
assert backpressure != Overload.NONE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should this be just asserting that backpressure isn't NONE
or should it be asserting that it's REQUESTS
or QUEUE_TIME
, which are the only things that would have an associated delay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, so backpressure != Overload.NONE
is tauthological at that spot. Changed to
assert backpressure == Overload.REQUESTS || backpressure == Overload.QUEUE_TIME : backpressure;
public String toString() | ||
{ | ||
return "RequestProcessor{" + | ||
"request=" + request + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Message
has a toString()
, but Request
does not, so we'll miss Request.createdAtNanos
...if that matters.
// query that is stuck behind the EXECUTE query, we would rather time it out and catch up with a backlog, expecting | ||
// that the bursts are going to be short-lived. | ||
ClientMetrics.instance.queueTime(queueTime, TimeUnit.NANOSECONDS); | ||
if (queueTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is a hot-ish (?) path. Would it make sene to memoize the native transport timeout so we don't have to call TimeUnit#covert()
so much?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this:
private static long native_transport_timeout_nanos_cached = -1;
public static long getNativeTransportTimeout(TimeUnit timeUnit)
{
if (timeUnit == TimeUnit.NANOSECONDS)
{
if (native_transport_timeout_nanos_cached == -1)
native_transport_timeout_nanos_cached = conf.native_transport_timeout.to(TimeUnit.NANOSECONDS);
return native_transport_timeout_nanos_cached;
}
return conf.native_transport_timeout.to(timeUnit);
}
But arguably we should have a more generic pattern for these things. Maybe even always precompute millis nanos and micros. Should be extremely cheap, and constant time, if we use a tiny array.
// Continuing incident: apply backpressure but do not bump severity level yet | ||
else if (appliedTimes < 10) | ||
{ | ||
return new Impl(minDelayNanos, maxDelayNanos, now, severityLevel == 0 ? 1 : severityLevel, appliedTimes + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we just start the severityLevel
at 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It seems like there's a lot of Impl
creation going on here during a spike. Is there any way we could perhaps moderate that a tiny bit by perhaps making appliedTimes
an AtomicInteger
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately because we have at least two variables that we need to update atomically, now and appliedTimes, we will either have to create some sort of object, or do some binary math (but then we lose precision). I am afraid I could not find a quick and easy way to make this more lightweight.
I would also like to highlight that as soon as we have applied timeout, we will have the client back-off for the given amount of time, so this might be less of a problem: we do this only if there is no capacity in the queue.
Also, incident does start at 1, could you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just wondering why we had to do severityLevel == 0 ? 1 : severityLevel
rather than just starting incidents at 1, but that means you have to check appliedAt
or something in delay()
to make sure you get zero before an incident has actually started.
…ated to a specific request * Add an ability to base _replica_ side queries on the queue tim * Use queue time as a base for message timeouts * Use native transport deadline for internode messages * Make sure that local runnables respect transport timeouts and deadlines * Make sure that remote mutation handler respects message expiration times
a5ea126
to
649e114
Compare
response.attach(request.connection); | ||
FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response); | ||
flush(toFlush); | ||
System.out.println(123123); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: remove println
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops!
|
||
import com.google.common.base.Predicate; | ||
|
||
import com.sun.jna.platform.win32.GL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unused import
No description provided.