Skip to content

Commit

Permalink
[export]
Browse files Browse the repository at this point in the history
Batch pending rpcs in client and server side before sending them down to tcp. If multiple rpcs get queued up into chaotic good, it should send them out over one tcp endpoint write.

----
DO NOT SUBMIT. This PR is for testing purposes only. [cl/627143927](http://cl/627143927)

PiperOrigin-RevId: 627143927
  • Loading branch information
Vignesh2208 authored and Copybara-Service committed Apr 29, 2024
1 parent 60dc14e commit c5a300e
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 23 deletions.
24 changes: 24 additions & 0 deletions src/core/ext/transport/chaotic_good/chaotic_good_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
data_endpoint_.Write(std::move(buffers.data)));
}

void SerializeFrameIntoBuffers(const FrameInterface& frame,
SliceBuffer& control_buffer,
SliceBuffer& data_buffer) {
auto buffers = frame.Serialize(&encoder_);
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: WriteFrame to:%s %s",
ResolvedAddressToString(control_endpoint_.GetPeerAddress())
.value_or("<<unknown peer address>>")
.c_str(),
frame.ToString().c_str());
}
buffers.control.MoveFirstNBytesIntoSliceBuffer(buffers.control.Length(),
control_buffer);
buffers.data.MoveFirstNBytesIntoSliceBuffer(buffers.data.Length(),
data_buffer);
}

auto WriteSerializedFrames(SliceBuffer& control_buffer,
SliceBuffer& data_buffer) {
return TryJoin<absl::StatusOr>(
control_endpoint_.Write(std::move(control_buffer)),
data_endpoint_.Write(std::move(data_buffer)));
}

// Read frame header and payloads for control and data portions of one frame.
// Resolves to StatusOr<tuple<FrameHeader, BufferPair>>.
auto ReadFrameBytes() {
Expand Down
14 changes: 10 additions & 4 deletions src/core/ext/transport/chaotic_good/client_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,17 @@ auto ChaoticGoodClientTransport::TransportWriteLoop(
RefCountedPtr<ChaoticGoodTransport> transport) {
return Loop([this, transport = std::move(transport)] {
return TrySeq(
// Get next outgoing frame.
outgoing_frames_.Next(),
// Get all the next outgoing frames.
outgoing_frames_.AllNext(),
// Serialize and write it out.
[transport = transport.get()](ClientFrame client_frame) {
return transport->WriteFrame(GetFrameInterface(client_frame));
[transport = transport.get()](std::vector<ClientFrame> client_frame) {
SliceBuffer control_buffer;
SliceBuffer data_buffer;
for (auto& frame : client_frame) {
transport->SerializeFrameIntoBuffers(GetFrameInterface(frame),
control_buffer, data_buffer);
}
return transport->WriteSerializedFrames(control_buffer, data_buffer);
},
[]() -> LoopCtl<absl::Status> {
// The write failures will be caught in TrySeq and exit loop.
Expand Down
14 changes: 10 additions & 4 deletions src/core/ext/transport/chaotic_good/server_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,17 @@ auto ChaoticGoodServerTransport::TransportWriteLoop(
RefCountedPtr<ChaoticGoodTransport> transport) {
return Loop([this, transport = std::move(transport)] {
return TrySeq(
// Get next outgoing frame.
outgoing_frames_.Next(),
// Get all the next outgoing frames.
outgoing_frames_.AllNext(),
// Serialize and write it out.
[transport = transport.get()](ServerFrame client_frame) {
return transport->WriteFrame(GetFrameInterface(client_frame));
[transport = transport.get()](std::vector<ServerFrame> server_frame) {
SliceBuffer control_buffer;
SliceBuffer data_buffer;
for (auto& frame : server_frame) {
transport->SerializeFrameIntoBuffers(GetFrameInterface(frame),
control_buffer, data_buffer);
}
return transport->WriteSerializedFrames(control_buffer, data_buffer);
},
[]() -> LoopCtl<absl::Status> {
// The write failures will be caught in TrySeq and exit loop.
Expand Down
21 changes: 21 additions & 0 deletions src/core/lib/promise/mpsc.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,27 @@ class MpscReceiver {
};
}

// Return a promise that will resolve to a vector containing all available
// items.
auto AllNext() {
return [this]() -> Poll<std::vector<T>> {
if (buffer_it_ == buffer_.end()) {
if (center_->PollReceiveBatch(buffer_) && !buffer_.empty()) {
return std::move(buffer_);
}
}
if (buffer_it_ != buffer_.end()) {
std::vector<T> dest;
buffer_it_ = buffer_.begin();
while (buffer_it_ != buffer_.end()) {
dest.push_back(std::move(*buffer_it_++));
}
return Poll<std::vector<T>>(std::move(dest));
}
return Pending{};
};
}

private:
// Received items. We move out of here one by one, but don't resize the
// vector. Instead, when we run out of items, we poll the center for more -
Expand Down
24 changes: 24 additions & 0 deletions test/core/promise/mpsc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ TEST(MpscTest, ImmediateSendWorks) {
activity.Deactivate();
}

TEST(MpscTest, AllNextReceiveWorks) {
StrictMock<MockActivity> activity;
MpscReceiver<Payload> receiver(1);
MpscSender<Payload> sender = receiver.MakeSender();

EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(1)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(2)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(3)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(4)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(5)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(6)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(7)), true);

activity.Activate();
auto payloads = NowOrNever(receiver.AllNext());
EXPECT_TRUE(payloads.has_value());
EXPECT_EQ(payloads.value().size(), 7);
int i = 1;
for (const auto& payload : *payloads) {
EXPECT_EQ(payload, MakePayload(i++));
}
activity.Deactivate();
}

} // namespace
} // namespace grpc_core

Expand Down
21 changes: 10 additions & 11 deletions test/core/transport/chaotic_good/client_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ TEST_F(TransportTest, AddOneStream) {
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
// With batching enabled, the control endpoint is expected to write
// the headers and message length frame in one go.
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 1, 1,
sizeof(kPathDemoServiceStep), 0, 0, 0),
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
sizeof(kPathDemoServiceStep))},
nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
sizeof(kPathDemoServiceStep)),
SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
nullptr);
data_endpoint.ExpectWrite(
{EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr);
Expand Down Expand Up @@ -213,20 +213,19 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
{SerializedFrameHeader(FrameType::kFragment, 1, 1,
sizeof(kPathDemoServiceStep), 0, 0, 0),
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
sizeof(kPathDemoServiceStep))},
nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
sizeof(kPathDemoServiceStep)),
SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
nullptr);
data_endpoint.ExpectWrite(
{EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr);
// With batching enabled, the control endpoint is expected to write
// the message length frame of the second message and trailers in one go.
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
{SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0),
SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)},
nullptr);
data_endpoint.ExpectWrite(
{EventEngineSlice::FromCopiedString("1"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 2);
Expand Down
8 changes: 4 additions & 4 deletions test/core/transport/chaotic_good/server_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
EXPECT_CALL(*control_endpoint.endpoint, Read)
.InSequence(control_endpoint.read_sequence)
.WillOnce(Return(false));
// With batching enabled, the control endpoint is expected to write
// the headers and message length frame in one go.
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 1, 1,
sizeof(kPathDemoServiceStep), 0, 0, 0),
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
sizeof(kPathDemoServiceStep))},
nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 8, 56, 0)},
sizeof(kPathDemoServiceStep)),
SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 8, 56, 0)},
nullptr);
data_endpoint.ExpectWrite(
{EventEngineSlice::FromCopiedString("87654321"), Zeros(56)}, nullptr);
Expand Down

0 comments on commit c5a300e

Please sign in to comment.