Skip to content

Commit

Permalink
Revert "Remove the urgent argument from iomgr tcp read API (#25494)" (
Browse files Browse the repository at this point in the history
#25592)

This reverts commit a3398f9. Justification: see b/181367644. tl;dr: assuming urgent==false does not hold in all situations.
  • Loading branch information
drfloob committed Mar 2, 2021
1 parent 5655204 commit 0e3a02e
Show file tree
Hide file tree
Showing 25 changed files with 67 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) {
handshaker->args_->endpoint, handshaker->args_->read_buffer,
GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
&HttpConnectHandshaker::OnReadDoneScheduler,
handshaker, grpc_schedule_on_exec_ctx));
handshaker, grpc_schedule_on_exec_ctx),
/*urgent=*/true);
}
}

Expand Down Expand Up @@ -235,7 +236,8 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) {
handshaker->args_->endpoint, handshaker->args_->read_buffer,
GRPC_CLOSURE_INIT(&handshaker->response_read_closure_,
&HttpConnectHandshaker::OnReadDoneScheduler,
handshaker, grpc_schedule_on_exec_ctx));
handshaker, grpc_schedule_on_exec_ctx),
/*urgent=*/true);
return;
}
// Make sure we got a 2xx response.
Expand Down
3 changes: 2 additions & 1 deletion src/core/ext/transport/chttp2/transport/chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2552,9 +2552,10 @@ static void read_action_locked(void* tp, grpc_error* error) {
}

static void continue_read_action_locked(grpc_chttp2_transport* t) {
const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked);
grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/http/httpcli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static void append_error(internal_request* req, grpc_error* error) {
}

static void do_read(internal_request* req) {
grpc_endpoint_read(req->ep, &req->incoming, &req->on_read);
grpc_endpoint_read(req->ep, &req->incoming, &req->on_read, /*urgent=*/true);
}

static void on_read(void* user_data, grpc_error* error) {
Expand Down
4 changes: 2 additions & 2 deletions src/core/lib/iomgr/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");

void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
ep->vtable->read(ep, slices, cb);
grpc_closure* cb, bool urgent) {
ep->vtable->read(ep, slices, cb, urgent);
}

void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
Expand Down
5 changes: 3 additions & 2 deletions src/core/lib/iomgr/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;

struct grpc_endpoint_vtable {
void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
bool urgent);
void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
void* arg);
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
Expand All @@ -58,7 +59,7 @@ struct grpc_endpoint_vtable {
Valid slices may be placed into \a slices even when the callback is
invoked with error != GRPC_ERROR_NONE. */
void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb);
grpc_closure* cb, bool urgent);

absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep);

Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/iomgr/endpoint_cfstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ static void CFStreamReadAllocationDone(void* arg, grpc_error* error) {
}

static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
grpc_closure* cb, bool urgent) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/iomgr/tcp_custom.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ static void tcp_read_allocation_done(void* tcpp, grpc_error* error) {
}

static void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
grpc_closure* cb) {
grpc_closure* cb, bool /*urgent*/) {
custom_tcp_endpoint* tcp = reinterpret_cast<custom_tcp_endpoint*>(ep);
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
GPR_ASSERT(tcp->read_cb == nullptr);
Expand Down
4 changes: 2 additions & 2 deletions src/core/lib/iomgr/tcp_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) {
}

static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
grpc_closure* cb) {
grpc_closure* cb, bool urgent) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
GPR_ASSERT(tcp->read_cb == nullptr);
tcp->read_cb = cb;
Expand All @@ -917,7 +917,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
* the polling engine */
tcp->is_first_read = false;
notify_on_read(tcp);
} else if (tcp->inq == 0) {
} else if (!urgent && tcp->inq == 0) {
/* Upper layer asked to read more but we know there is no pending data
* to read from previous reads. So, wait for POLLIN.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/iomgr/tcp_windows.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ static void on_read(void* tcpp, grpc_error* error) {
#define DEFAULT_TARGET_READ_SIZE 8192
#define MAX_WSABUF_COUNT 16
static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
grpc_closure* cb) {
grpc_closure* cb, bool urgent) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* handle = tcp->socket;
grpc_winsocket_callback_info* info = &handle->read_info;
Expand Down
4 changes: 2 additions & 2 deletions src/core/lib/security/transport/secure_endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ static void on_read(void* user_data, grpc_error* error) {
}

static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
grpc_closure* cb, bool urgent) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
ep->read_cb = cb;
ep->read_buffer = slices;
Expand All @@ -269,7 +269,7 @@ static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
return;
}

grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read);
grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent);
}

static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
Expand Down
9 changes: 6 additions & 3 deletions src/core/lib/security/transport/security_handshaker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked(
GRPC_CLOSURE_INIT(
&on_handshake_data_received_from_peer_,
&SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler,
this, grpc_schedule_on_exec_ctx));
this, grpc_schedule_on_exec_ctx),
/*urgent=*/true);
return error;
}
if (result != TSI_OK) {
Expand Down Expand Up @@ -328,7 +329,8 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked(
GRPC_CLOSURE_INIT(
&on_handshake_data_received_from_peer_,
&SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler,
this, grpc_schedule_on_exec_ctx));
this, grpc_schedule_on_exec_ctx),
/*urgent=*/true);
} else {
// Handshake has finished, check peer and so on.
error = CheckPeerLocked();
Expand Down Expand Up @@ -434,7 +436,8 @@ void SecurityHandshaker::OnHandshakeDataSentToPeerFn(void* arg,
GRPC_CLOSURE_INIT(
&h->on_handshake_data_received_from_peer_,
&SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler,
h.get(), grpc_schedule_on_exec_ctx));
h.get(), grpc_schedule_on_exec_ctx),
/*urgent=*/true);
} else {
error = h->CheckPeerLocked();
if (error != GRPC_ERROR_NONE) {
Expand Down
3 changes: 2 additions & 1 deletion test/core/bad_client/bad_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags,
grpc_closure read_done_closure;
GRPC_CLOSURE_INIT(&read_done_closure, set_read_done, &read_done_event,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(sfd->client, &incoming, &read_done_closure);
grpc_endpoint_read(sfd->client, &incoming, &read_done_closure,
/*urgent=*/true);
grpc_core::ExecCtx::Get()->Flush();
do {
GPR_ASSERT(gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0);
Expand Down
9 changes: 6 additions & 3 deletions test/core/end2end/bad_server_response_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ static void done_write(void* /*arg*/, grpc_error* error) {

static void done_writing_settings_frame(void* /* arg */, grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read);
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false);
}

static void handle_write() {
Expand Down Expand Up @@ -139,7 +140,8 @@ static void handle_read(void* /*arg*/, grpc_error* error) {
!state.http2_response) {
handle_write();
} else {
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read);
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false);
}
}

Expand All @@ -166,7 +168,8 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
grpc_endpoint_write(state.tcp, &state.outgoing_buffer,
&on_writing_settings_frame, nullptr);
} else {
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read);
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false);
}
}

Expand Down
12 changes: 6 additions & 6 deletions test/core/end2end/fixtures/http_proxy_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ static void on_client_read_done_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done);
&conn->on_client_read_done, /*urgent=*/false);
}

static void on_client_read_done(void* arg, grpc_error* error) {
Expand Down Expand Up @@ -350,7 +350,7 @@ static void on_server_read_done_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done);
&conn->on_server_read_done, /*urgent=*/false);
}

static void on_server_read_done(void* arg, grpc_error* error) {
Expand Down Expand Up @@ -380,11 +380,11 @@ static void on_write_response_done_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done);
&conn->on_client_read_done, /*urgent=*/false);
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done);
&conn->on_server_read_done, /*urgent=*/false);
}

static void on_write_response_done(void* arg, grpc_error* error) {
Expand Down Expand Up @@ -484,7 +484,7 @@ static void on_read_request_done_locked(void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done);
&conn->on_read_request_done, /*urgent=*/false);
return;
}
// Make sure we got a CONNECT request.
Expand Down Expand Up @@ -579,7 +579,7 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done);
&conn->on_read_request_done, /*urgent=*/false);
}

//
Expand Down
3 changes: 2 additions & 1 deletion test/core/handshake/readahead_handshaker_server_ssl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class ReadAheadHandshaker : public Handshaker {
void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
grpc_closure* on_handshake_done,
HandshakerArgs* args) override {
grpc_endpoint_read(args->endpoint, args->read_buffer, on_handshake_done);
grpc_endpoint_read(args->endpoint, args->read_buffer, on_handshake_done,
/*urgent=*/false);
}
};

Expand Down
12 changes: 8 additions & 4 deletions test/core/iomgr/endpoint_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ struct read_and_write_test_state {
static void read_scheduler(void* data, grpc_error* /* error */) {
struct read_and_write_test_state* state =
static_cast<struct read_and_write_test_state*>(data);
grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read);
grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
/*urgent=*/false);
}

static void read_and_write_test_read_handler(void* data, grpc_error* error) {
Expand Down Expand Up @@ -242,7 +243,8 @@ static void read_and_write_test(grpc_endpoint_test_config config,
read_and_write_test_write_handler(&state, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush();

grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read);
grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read,
/*urgent=*/false);
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
grpc_endpoint_shutdown(
Expand Down Expand Up @@ -307,14 +309,16 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
grpc_endpoint_read(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
grpc_schedule_on_exec_ctx),
/*urgent=*/false);
wait_for_fail_count(&fail_count, 0);
grpc_endpoint_shutdown(f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
wait_for_fail_count(&fail_count, 1);
grpc_endpoint_read(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
grpc_schedule_on_exec_ctx),
/*urgent=*/false);
wait_for_fail_count(&fail_count, 2);
grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
grpc_endpoint_write(f.client_ep, &slice_buffer,
Expand Down
8 changes: 4 additions & 4 deletions test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ - (void)testReadWrite {
grpc_slice_buffer_init(&read_one_slice);
while (read_slices.length < kBufferSize) {
init_event_closure(&read_done, &read);
grpc_endpoint_read(ep_, &read_one_slice, &read_done);
grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false);
XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
Expand Down Expand Up @@ -218,7 +218,7 @@ - (void)testShutdownBeforeRead {

grpc_slice_buffer_init(&read_slices);
init_event_closure(&read_done, &read);
grpc_endpoint_read(ep_, &read_slices, &read_done);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);

grpc_slice_buffer_init(&write_slices);
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
Expand Down Expand Up @@ -267,7 +267,7 @@ - (void)testRemoteClosed {

init_event_closure(&read_done, &read);
grpc_slice_buffer_init(&read_slices);
grpc_endpoint_read(ep_, &read_slices, &read_done);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);

grpc_slice_buffer_init(&write_slices);
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
Expand Down Expand Up @@ -306,7 +306,7 @@ - (void)testRemoteReset {

init_event_closure(&read_done, &read);
grpc_slice_buffer_init(&read_slices);
grpc_endpoint_read(ep_, &read_slices, &read_done);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);

struct linger so_linger;
so_linger.l_onoff = 1;
Expand Down
9 changes: 5 additions & 4 deletions test/core/iomgr/tcp_posix_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ static void read_cb(void* user_data, grpc_error* error) {
gpr_mu_unlock(g_mu);
} else {
gpr_mu_unlock(g_mu);
grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb);
grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb,
/*urgent=*/false);
}
}

Expand Down Expand Up @@ -230,7 +231,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);

grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);

gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
Expand Down Expand Up @@ -281,7 +282,7 @@ static void large_read_test(size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);

grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);

gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
Expand Down Expand Up @@ -520,7 +521,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);

grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);

gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
Expand Down
2 changes: 1 addition & 1 deletion test/core/security/secure_endpoint_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {

grpc_slice_buffer_init(&incoming);
GRPC_CLOSURE_INIT(&done_closure, inc_call_ctr, &n, grpc_schedule_on_exec_ctx);
grpc_endpoint_read(f.client_ep, &incoming, &done_closure);
grpc_endpoint_read(f.client_ep, &incoming, &done_closure, /*urgent=*/false);

grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(n == 1);
Expand Down
3 changes: 2 additions & 1 deletion test/core/transport/chttp2/settings_timeout_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class Client {
grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000;
while (true) {
EventState state;
grpc_endpoint_read(endpoint_, &read_buffer, state.closure());
grpc_endpoint_read(endpoint_, &read_buffer, state.closure(),
/*urgent=*/true);
if (!PollUntilDone(&state, deadline)) {
retval = false;
break;
Expand Down

0 comments on commit 0e3a02e

Please sign in to comment.