Skip to content

Commit

Permalink
Add a way to close the http server while waiting (#437)
Browse files Browse the repository at this point in the history
* Fix a few compiler warnings

* WIP

* cleanup

* Cleanup and docs

* Fix whitespace and grammar

* Update http/src/lib.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Now that wait() does not use the eventloop/signalling chans, just bundle them up in a tuple
  • Loading branch information
dvdplm authored and tomusdrw committed Jun 3, 2019
1 parent 8c0ada4 commit 1bbd403
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 38 deletions.
2 changes: 1 addition & 1 deletion http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ jsonrpc-server-utils = { version = "11.0", path = "../server-utils" }
log = "0.4"
net2 = "0.2"
unicase = "2.0"

parking_lot = "0.8.0"
[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
92 changes: 61 additions & 31 deletions http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use std::net::SocketAddr;
use std::sync::{mpsc, Arc};
use std::thread;

use parking_lot::Mutex;

use crate::jsonrpc::futures::sync::oneshot;
use crate::jsonrpc::futures::{self, future, Future, Stream};
use crate::jsonrpc::MetaIoHandler;
Expand Down Expand Up @@ -377,10 +379,12 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {

let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let eloop = self.executor.init_with_name("http.worker0")?;
let req_max_size = self.max_request_body_size;
// The first threads `Executor` is initialised differently from the others
serve(
(shutdown_signal, local_addr_tx),
(shutdown_signal, local_addr_tx, done_tx),
eloop.executor(),
addr.to_owned(),
cors_domains.clone(),
Expand All @@ -399,9 +403,10 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
.map(|i| {
let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
serve(
(shutdown_signal, local_addr_tx),
(shutdown_signal, local_addr_tx, done_tx),
eloop.executor(),
addr.to_owned(),
cors_domains.clone(),
Expand All @@ -416,27 +421,34 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
reuse_port,
req_max_size,
);
Ok((eloop, close, local_addr_rx))
Ok((eloop, close, local_addr_rx, done_rx))
})
.collect::<io::Result<Vec<_>>>()?;

// Wait for server initialization
let local_addr = recv_address(local_addr_rx);
// Wait for other threads as well.
let mut handles = handles
let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles
.into_iter()
.map(|(eloop, close, local_addr_rx)| {
.map(|(eloop, close, local_addr_rx, done_rx)| {
let _ = recv_address(local_addr_rx)?;
Ok((eloop, close))
Ok((eloop, close, done_rx))
})
.collect::<io::Result<(Vec<_>)>>()?;
handles.push((eloop, close));
let (executors, close) = handles.into_iter().unzip();
handles.push((eloop, close, done_rx));

let (executors, done_rxs) = handles
.into_iter()
.fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| {
acc.0.push((eloop, closer));
acc.1.push(done_rx);
acc
});

Ok(Server {
address: local_addr?,
executor: Some(executors),
close: Some(close),
executors: Arc::new(Mutex::new(Some(executors))),
done: Some(done_rxs),
})
}
}
Expand All @@ -448,7 +460,7 @@ fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Re
}

fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>, oneshot::Sender<()>),
executor: tokio::runtime::TaskExecutor,
addr: SocketAddr,
cors_domains: CorsDomains,
Expand All @@ -463,7 +475,7 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
reuse_port: bool,
max_request_body_size: usize,
) {
let (shutdown_signal, local_addr_tx) = signals;
let (shutdown_signal, local_addr_tx, done_tx) = signals;
executor.spawn(future::lazy(move || {
let handle = tokio::reactor::Handle::default();

Expand Down Expand Up @@ -537,12 +549,15 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
.map_err(|e| {
warn!("Incoming streams error, closing sever: {:?}", e);
})
.select(shutdown_signal.map_err(|e| {
.select(shutdown_signal
.map_err(|e| {
debug!("Shutdown signaller dropped, closing server: {:?}", e);
}))
.map(|_| ())
.map_err(|_| ())
})
}).and_then(|_| {
done_tx.send(())
}));
}

Expand All @@ -562,45 +577,60 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
Ok(())
}

/// Handle used to close the server. Can be cloned and passed around to different threads and be used
/// to close a server that is `wait()`ing.

#[derive(Clone)]
pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>);

impl CloseHandle {
/// Shutdown a running server
pub fn close(self) {
if let Some(executors) = self.0.lock().take() {
for (executor, closer) in executors {
executor.close();
let _ = closer.send(());
}
}
}
}

/// jsonrpc http server instance
pub struct Server {
address: SocketAddr,
executor: Option<Vec<Executor>>,
close: Option<Vec<oneshot::Sender<()>>>,
executors: Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>,
done: Option<Vec<oneshot::Receiver<()>>>,
}

const PROOF: &str = "Server is always Some until self is consumed.";
impl Server {
/// Returns address of this server
pub fn address(&self) -> &SocketAddr {
&self.address
}

/// Closes the server.
pub fn close(mut self) {
for close in self.close.take().expect(PROOF) {
let _ = close.send(());
}

for executor in self.executor.take().expect(PROOF) {
executor.close();
}
pub fn close(self) {
self.close_handle().close()
}

/// Will block, waiting for the server to finish.
pub fn wait(mut self) {
for executor in self.executor.take().expect(PROOF) {
executor.wait();
if let Some(receivers) = self.done.take() {
for receiver in receivers {
let _ = receiver.wait();
}
}
}

/// Get a handle that allows us to close the server from a different thread and/or while the
/// server is `wait()`ing.
pub fn close_handle(&self) -> CloseHandle {
CloseHandle(self.executors.clone())
}
}

impl Drop for Server {
fn drop(&mut self) {
if let Some(executors) = self.executor.take() {
for executor in executors {
executor.close();
}
};
self.close_handle().close();
}
}
23 changes: 20 additions & 3 deletions http/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use self::jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value};
use std::io::{Read, Write};
use std::net::TcpStream;
use std::str::Lines;
use std::time::Duration;

use self::jsonrpc_core::futures::{self, Future};
use super::*;
Expand Down Expand Up @@ -52,8 +53,6 @@ fn serve_allow_headers(cors_allow_headers: cors::AccessControlAllowHeaders) -> S
}

fn io() -> IoHandler {
use std::{thread, time};

let mut io = IoHandler::default();
io.add_method("hello", |params: Params| match params.parse::<(u64,)>() {
Ok((num,)) => Ok(Value::String(format!("world: {}", num))),
Expand All @@ -66,7 +65,7 @@ fn io() -> IoHandler {
io.add_method("hello_async2", |_params: Params| {
let (c, p) = futures::oneshot();
thread::spawn(move || {
thread::sleep(time::Duration::from_millis(10));
thread::sleep(Duration::from_millis(10));
c.send(Value::String("world".into())).unwrap();
});
p.map_err(|_| Error::invalid_request())
Expand Down Expand Up @@ -1406,6 +1405,24 @@ fn should_return_connection_header() {
assert_eq!(response.body, world_batch());
}

#[test]
fn close_handle_makes_wait_return() {
let server = serve(id);
let close_handle = server.close_handle();

let (tx, rx) = mpsc::channel();

thread::spawn(move || {
tx.send(server.wait()).unwrap();
});

thread::sleep(Duration::from_secs(3));

close_handle.close();

rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close");
}

#[test]
fn should_close_connection_without_keep_alive() {
// given
Expand Down
2 changes: 1 addition & 1 deletion ipc/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ lazy_static! {
builder.filter(None, LevelFilter::Info);

if let Ok(log) = env::var("RUST_LOG") {
builder.parse(&log);
builder.parse_filters(&log);
}

if let Ok(_) = builder.try_init() {
Expand Down
2 changes: 1 addition & 1 deletion tcp/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ lazy_static! {
builder.filter(None, LevelFilter::Info);

if let Ok(log) = env::var("RUST_LOG") {
builder.parse(&log);
builder.parse_filters(&log);
}

if let Ok(_) = builder.try_init() {
Expand Down
1 change: 0 additions & 1 deletion ws/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ fn request(server: Server, request: &str) -> Response {

fn serve(port: u16) -> (Server, Arc<AtomicUsize>) {
use crate::core::futures::sync::oneshot;
use std::time::Duration;

let pending = Arc::new(AtomicUsize::new(0));

Expand Down

0 comments on commit 1bbd403

Please sign in to comment.