Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-19534 (5.0 patch): Unbounded queues in native transport requests lead to node instability #3274

Open
wants to merge 4 commits into
base: cassandra-5.0
Choose a base branch
from

Conversation

ifesdjeen
Copy link
Contributor

No description provided.

@ifesdjeen ifesdjeen changed the title CASSANDRA-19534 CASSANDRA-19534 (5.0 patch): Unbounded queues in native transport requests lead to node instability Apr 29, 2024
@@ -143,7 +144,7 @@ public ColumnFilter columnFilter()
* @param state client state
* @return the result of the query.
*/
public PartitionIterator execute(ConsistencyLevel consistency, ClientState state, long queryStartNanoTime) throws RequestExecutionException;
public PartitionIterator execute(ConsistencyLevel consistency, ClientState state, Dispatcher.RequestTime requestTime) throws RequestExecutionException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: JavaDoc update while we're here?

@@ -34,6 +34,7 @@
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlan.ForWrite;
import org.apache.cassandra.transport.Dispatcher;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: A bunch of imports out of place?

public volatile CQLStartTime cql_start_time = CQLStartTime.REQUEST;

public boolean native_transport_throw_on_overload = false;
public double native_transport_queue_max_item_age_threshold = Double.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I want to try to work the token "ratio" into the name of this one, but haven't been able to come up w/ something concrete :D

@@ -19,15 +19,19 @@

import io.airlift.airline.Command;

import io.airlift.airline.Option;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Move up to be next to Command import

@@ -178,7 +182,8 @@ protected boolean processOneContainedMessage(ShareableBytes bytes, Limit endpoin

// max CQL message size defaults to 256mb, so should be safe to downcast
int messageSize = Ints.checkedCast(header.bodySizeInBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to your patch, but I think I forgot to throw an @Override annotation on processOneContainedMessage()


if (delay > 0)
{
assert backpressure != Overload.NONE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should this be just asserting that backpressure isn't NONE or should it be asserting that it's REQUESTS or QUEUE_TIME, which are the only things that would have an associated delay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so backpressure != Overload.NONE is tauthological at that spot. Changed to

assert backpressure == Overload.REQUESTS || backpressure == Overload.QUEUE_TIME : backpressure;

public String toString()
{
return "RequestProcessor{" +
"request=" + request +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Message has a toString(), but Request does not, so we'll miss Request.createdAtNanos...if that matters.

// query that is stuck behind the EXECUTE query, we would rather time it out and catch up with a backlog, expecting
// that the bursts are going to be short-lived.
ClientMetrics.instance.queueTime(queueTime, TimeUnit.NANOSECONDS);
if (queueTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is a hot-ish (?) path. Would it make sene to memoize the native transport timeout so we don't have to call TimeUnit#covert() so much?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this:

    private static long native_transport_timeout_nanos_cached = -1;

    public static long getNativeTransportTimeout(TimeUnit timeUnit)
    {
        if (timeUnit == TimeUnit.NANOSECONDS)
        {
            if (native_transport_timeout_nanos_cached == -1)
                native_transport_timeout_nanos_cached = conf.native_transport_timeout.to(TimeUnit.NANOSECONDS);

            return native_transport_timeout_nanos_cached;
        }
        return conf.native_transport_timeout.to(timeUnit);
    }

But arguably we should have a more generic pattern for these things. Maybe even always precompute millis nanos and micros. Should be extremely cheap, and constant time, if we use a tiny array.

// Continuing incident: apply backpressure but do not bump severity level yet
else if (appliedTimes < 10)
{
return new Impl(minDelayNanos, maxDelayNanos, now, severityLevel == 0 ? 1 : severityLevel, appliedTimes + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we just start the severityLevel at 1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It seems like there's a lot of Impl creation going on here during a spike. Is there any way we could perhaps moderate that a tiny bit by perhaps making appliedTimes an AtomicInteger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately because we have at least two variables that we need to update atomically, now and appliedTimes, we will either have to create some sort of object, or do some binary math (but then we lose precision). I am afraid I could not find a quick and easy way to make this more lightweight.

I would also like to highlight that as soon as we have applied timeout, we will have the client back-off for the given amount of time, so this might be less of a problem: we do this only if there is no capacity in the queue.

Also, incident does start at 1, could you elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering why we had to do severityLevel == 0 ? 1 : severityLevel rather than just starting incidents at 1, but that means you have to check appliedAt or something in delay() to make sure you get zero before an incident has actually started.

…ated to a specific request

  * Add an ability to base _replica_ side queries on the queue tim
  * Use queue time as a base for message timeouts
  * Use native transport deadline for internode messages
  * Make sure that local runnables respect transport timeouts and deadlines
  * Make sure that remote mutation handler respects message expiration times
response.attach(request.connection);
FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response);
flush(toFlush);
System.out.println(123123);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: remove println

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops!


import com.google.common.base.Predicate;

import com.sun.jna.platform.win32.GL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused import

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants