Skip to content

Commit

Permalink
add signals support
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Dec 11, 2018
1 parent e6daca7 commit 42ec345
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 120 deletions.
245 changes: 128 additions & 117 deletions actix-server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ use net2::TcpBuilder;
use num_cpus;
use tokio_timer::sleep;

// use actix::{actors::signal};

use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfiguredService, ServiceConfig};
use super::server::{Server, ServerCommand};
use super::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory};
use super::services::{ServiceFactory, ServiceNewService};
use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::Token;
use crate::accept::{AcceptLoop, AcceptNotify, Command};
use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand};
use crate::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory};
use crate::services::{ServiceFactory, ServiceNewService};
use crate::signals::{Signal, Signals};
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::Token;

/// Server builder
pub struct ServerBuilder {
Expand Down Expand Up @@ -244,11 +243,12 @@ impl ServerBuilder {
self.accept
.start(mem::replace(&mut self.sockets, Vec::new()), workers);

// handle signals
if !self.no_signals {
Signals::start(self.server.clone());
}

// start http server actor
// let signals = self.subscribe_to_signals();
// if let Some(signals) = signals {
// signals.do_send(signal::Subscribe(addr.clone().recipient()))
// }
let server = self.server.clone();
spawn(self);
server
Expand All @@ -271,124 +271,135 @@ impl ServerBuilder {

worker
}
}

// /// Signals support
// /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
// /// message to `System` actor.
// impl Handler<signal::Signal> for Server {
// type Result = ();

// fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
// match msg.0 {
// signal::SignalType::Int => {
// info!("SIGINT received, exiting");
// self.exit = true;
// Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
// }
// signal::SignalType::Term => {
// info!("SIGTERM received, stopping");
// self.exit = true;
// Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
// }
// signal::SignalType::Quit => {
// info!("SIGQUIT received, exiting");
// self.exit = true;
// Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
// }
// _ => (),
// }
// }
// }

impl Future for ServerBuilder {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.cmd.poll() {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(item))) => match item {
ServerCommand::Pause(tx) => {
self.accept.send(Command::Pause);
let _ = tx.send(());
fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.accept.send(Command::Pause);
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
self.accept.send(Command::Resume);
let _ = tx.send(());
}
ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig {
Signal::Int => {
info!("SIGINT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
ServerCommand::Resume(tx) => {
self.accept.send(Command::Resume);
let _ = tx.send(());
Signal::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;

// stop accept thread
self.accept.send(Command::Stop);

// stop workers
if !self.workers.is_empty() {
spawn(
futures_unordered(
self.workers
.iter()
.map(move |worker| worker.1.stop(graceful)),
)
.collect()
.then(move |_| {
let _ = completion.send(());
if exit {
spawn(sleep(Duration::from_millis(300)).then(|_| {
System::current().stop();
ok(())
}));
}
ok(())
}),
)
} else {
// we need to stop system if server was spawned
if self.exit {
Signal::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
_ => (),
}
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;

// stop accept thread
self.accept.send(Command::Stop);

// stop workers
if !self.workers.is_empty() {
spawn(
futures_unordered(
self.workers
.iter()
.map(move |worker| worker.1.stop(graceful)),
)
.collect()
.then(move |_| {
if let Some(tx) = completion {
let _ = tx.send(());
}
if exit {
spawn(sleep(Duration::from_millis(300)).then(|_| {
System::current().stop();
ok(())
}));
}
let _ = completion.send(());
}
ok(())
}),
)
} else {
// we need to stop system if server was spawned
if self.exit {
spawn(sleep(Duration::from_millis(300)).then(|_| {
System::current().stop();
ok(())
}));
}
ServerCommand::WorkerDied(idx) => {
let mut found = false;
if let Some(tx) = completion {
let _ = tx.send(());
}
}
}
ServerCommand::WorkerDied(idx) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
}

if found {
error!("Worker has died {:?}, restarting", idx);

let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}

if found {
error!("Worker has died {:?}, restarting", idx);

let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let worker = self.start_worker(new_idx, self.accept.get_notify());
self.workers.push((new_idx, worker.clone()));
self.accept.send(Command::Worker(worker));
}
}
}
}
}

let worker = self.start_worker(new_idx, self.accept.get_notify());
self.workers.push((new_idx, worker.clone()));
self.accept.send(Command::Worker(worker));
}
}
},
impl Future for ServerBuilder {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.cmd.poll() {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(item))) => self.handle_cmd(item),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions actix-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod config;
mod counter;
mod server;
mod services;
mod signals;
pub mod ssl;
mod worker;

Expand Down
12 changes: 9 additions & 3 deletions actix-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ use futures::sync::mpsc::UnboundedSender;
use futures::sync::oneshot;
use futures::Future;

use super::builder::ServerBuilder;
use crate::builder::ServerBuilder;
use crate::signals::Signal;

pub(crate) enum ServerCommand {
WorkerDied(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Signal(Signal),
/// Whether to try and shut down gracefully
Stop {
graceful: bool,
completion: oneshot::Sender<()>,
completion: Option<oneshot::Sender<()>>,
},
}

Expand All @@ -28,6 +30,10 @@ impl Server {
ServerBuilder::default()
}

pub(crate) fn signal(&self, sig: Signal) {
let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
}

pub(crate) fn worker_died(&self, idx: usize) {
let _ = self.0.unbounded_send(ServerCommand::WorkerDied(idx));
}
Expand Down Expand Up @@ -56,7 +62,7 @@ impl Server {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Stop {
graceful,
completion: tx,
completion: Some(tx),
});
rx.map_err(|_| ())
}
Expand Down

0 comments on commit 42ec345

Please sign in to comment.