Skip to content

Commit

Permalink
[export]
Browse files Browse the repository at this point in the history
Batch pending rpcs in client and server side before sending them down to tcp. If multiple rpcs get queued up into chaotic good, it should send them out over one tcp endpoint write.

----
DO NOT SUBMIT. This PR is for testing purposes only. [cl/627143927](http://cl/627143927)

PiperOrigin-RevId: 627143927
  • Loading branch information
Vignesh2208 authored and Copybara-Service committed Apr 22, 2024
1 parent 297e22c commit f85c9f8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 8 deletions.
24 changes: 24 additions & 0 deletions src/core/ext/transport/chaotic_good/chaotic_good_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
data_endpoint_.Write(std::move(buffers.data)));
}

void SerializeFrameIntoBuffers(const FrameInterface& frame,
SliceBuffer& control_buffer,
SliceBuffer& data_buffer) {
auto buffers = frame.Serialize(&encoder_);
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: WriteFrame to:%s %s",
ResolvedAddressToString(control_endpoint_.GetPeerAddress())
.value_or("<<unknown peer address>>")
.c_str(),
frame.ToString().c_str());
}
buffers.control.MoveFirstNBytesIntoSliceBuffer(buffers.control.Length(),
control_buffer);
buffers.data.MoveFirstNBytesIntoSliceBuffer(buffers.data.Length(),
data_buffer);
}

auto WriteSerializedFrames(SliceBuffer& control_buffer,
SliceBuffer& data_buffer) {
return TryJoin<absl::StatusOr>(
control_endpoint_.Write(std::move(control_buffer)),
data_endpoint_.Write(std::move(data_buffer)));
}

// Read frame header and payloads for control and data portions of one frame.
// Resolves to StatusOr<tuple<FrameHeader, BufferPair>>.
auto ReadFrameBytes() {
Expand Down
14 changes: 10 additions & 4 deletions src/core/ext/transport/chaotic_good/client_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,17 @@ auto ChaoticGoodClientTransport::TransportWriteLoop(
RefCountedPtr<ChaoticGoodTransport> transport) {
return Loop([this, transport = std::move(transport)] {
return TrySeq(
// Get next outgoing frame.
outgoing_frames_.Next(),
// Get all the next outgoing frames.
outgoing_frames_.AllNext(),
// Serialize and write it out.
[transport = transport.get()](ClientFrame client_frame) {
return transport->WriteFrame(GetFrameInterface(client_frame));
[transport = transport.get()](std::vector<ClientFrame> client_frame) {
SliceBuffer control_buffer;
SliceBuffer data_buffer;
for (auto& frame : client_frame) {
transport->SerializeFrameIntoBuffers(GetFrameInterface(frame),
control_buffer, data_buffer);
}
return transport->WriteSerializedFrames(control_buffer, data_buffer);
},
[]() -> LoopCtl<absl::Status> {
// The write failures will be caught in TrySeq and exit loop.
Expand Down
14 changes: 10 additions & 4 deletions src/core/ext/transport/chaotic_good/server_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,17 @@ auto ChaoticGoodServerTransport::TransportWriteLoop(
RefCountedPtr<ChaoticGoodTransport> transport) {
return Loop([this, transport = std::move(transport)] {
return TrySeq(
// Get next outgoing frame.
outgoing_frames_.Next(),
// Get all the next outgoing frames.
outgoing_frames_.AllNext(),
// Serialize and write it out.
[transport = transport.get()](ServerFrame client_frame) {
return transport->WriteFrame(GetFrameInterface(client_frame));
[transport = transport.get()](std::vector<ServerFrame> server_frame) {
SliceBuffer control_buffer;
SliceBuffer data_buffer;
for (auto& frame : server_frame) {
transport->SerializeFrameIntoBuffers(GetFrameInterface(frame),
control_buffer, data_buffer);
}
return transport->WriteSerializedFrames(control_buffer, data_buffer);
},
[]() -> LoopCtl<absl::Status> {
// The write failures will be caught in TrySeq and exit loop.
Expand Down
21 changes: 21 additions & 0 deletions src/core/lib/promise/mpsc.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,27 @@ class MpscReceiver {
};
}

// Return a promise that will resolve to a vector containing all available
// items.
auto AllNext() {
return [this]() -> Poll<std::vector<T>> {
if (buffer_it_ == buffer_.end()) {
if (center_->PollReceiveBatch(buffer_)) {
buffer_it_ = buffer_.begin();
}
}
if (buffer_it_ != buffer_.end()) {
std::vector<T> dest;
buffer_it_ = buffer_.begin();
while (buffer_it_ != buffer_.end()) {
dest.push_back(std::move(*buffer_it_++));
}
return Poll<std::vector<T>>(std::move(dest));
}
return Pending{};
};
}

private:
// Received items. We move out of here one by one, but don't resize the
// vector. Instead, when we run out of items, we poll the center for more -
Expand Down
24 changes: 24 additions & 0 deletions test/core/promise/mpsc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ TEST(MpscTest, ImmediateSendWorks) {
activity.Deactivate();
}

TEST(MpscTest, AllNextReceiveWorks) {
StrictMock<MockActivity> activity;
MpscReceiver<Payload> receiver(1);
MpscSender<Payload> sender = receiver.MakeSender();

EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(1)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(2)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(3)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(4)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(5)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(6)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(7)), true);

activity.Activate();
auto payloads = NowOrNever(receiver.AllNext());
EXPECT_TRUE(payloads.has_value());
EXPECT_EQ(payloads.value().size(), 7);
int i = 1;
for (const auto& payload : *payloads) {
EXPECT_EQ(payload, MakePayload(i++));
}
activity.Deactivate();
}

} // namespace
} // namespace grpc_core

Expand Down

0 comments on commit f85c9f8

Please sign in to comment.