Skip to content

Commit

Permalink
fix: backend bumps and hot fixes (#487)
Browse files Browse the repository at this point in the history
  • Loading branch information
brokad committed Nov 23, 2022
1 parent b748493 commit e3fb067
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 29 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shuttle-common"
version = "0.7.2"
version = "0.7.3"
edition = "2021"
license = "Apache-2.0"
description = "Common library for the shuttle platform (https://www.shuttle.rs/)"
Expand Down
2 changes: 1 addition & 1 deletion deployer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shuttle-deployer"
version = "0.7.2"
version = "0.7.3"
edition = "2021"
description = "Service with instances created per project for handling the compilation, loading, and execution of Shuttle services"

Expand Down
4 changes: 2 additions & 2 deletions deployer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn get_service(
async fn get_service_summary(
Extension(persistence): Extension<Persistence>,
Extension(proxy_fqdn): Extension<FQDN>,
Path((project_name, service_name)): Path<(String, String)>,
Path((_, service_name)): Path<(String, String)>,
) -> Result<Json<shuttle_common::models::service::Summary>> {
if let Some(service) = persistence.get_service_by_name(&service_name).await? {
let deployment = persistence
Expand All @@ -189,7 +189,7 @@ async fn get_service_summary(
.collect();

let response = shuttle_common::models::service::Summary {
uri: format!("https://{}.{proxy_fqdn}", project_name),
uri: format!("https://{proxy_fqdn}"),
name: service.name,
deployment,
resources,
Expand Down
2 changes: 1 addition & 1 deletion gateway/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shuttle-gateway"
version = "0.7.2"
version = "0.7.3"
edition = "2021"
publish = false

Expand Down
4 changes: 2 additions & 2 deletions gateway/src/acme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ impl AcmeClient {
let digest = order.key_authorization(challenge).dns_value();
warn!("dns-01 challenge: _acme-challenge.{domain} 300 IN TXT \"{digest}\"");

// Wait 120 secs to insert the record manually and for it to
// Wait 60 secs to insert the record manually and for it to
// propagate before moving on
sleep(Duration::from_secs(120)).await;
sleep(Duration::from_secs(60)).await;

order
.set_challenge_ready(&challenge.url)
Expand Down
29 changes: 24 additions & 5 deletions gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@ use futures::prelude::*;
use instant_acme::{AccountCredentials, ChallengeType};
use opentelemetry::global;
use shuttle_gateway::acme::{AcmeClient, CustomDomain};
use shuttle_gateway::api::latest::ApiBuilder;
use shuttle_gateway::api::latest::{ApiBuilder, SVC_DEGRADED_THRESHOLD};
use shuttle_gateway::args::StartArgs;
use shuttle_gateway::args::{Args, Commands, InitArgs, UseTls};
use shuttle_gateway::auth::Key;
use shuttle_gateway::proxy::UserServiceBuilder;
use shuttle_gateway::service::{GatewayService, MIGRATIONS};
use shuttle_gateway::task;
use shuttle_gateway::tls::{make_tls_acceptor, ChainAndPrivateKey};
use shuttle_gateway::worker::Worker;
use shuttle_gateway::worker::{Worker, WORKER_QUEUE_SIZE};
use sqlx::migrate::MigrateDatabase;
use sqlx::{query, Sqlite, SqlitePool};
use std::io::{self, Cursor};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, info_span, trace, warn};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main(flavor = "multi_thread")]
Expand Down Expand Up @@ -108,14 +108,33 @@ async fn start(db: SqlitePool, fs: PathBuf, args: StartArgs) -> io::Result<()> {
async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
if sender.capacity() < WORKER_QUEUE_SIZE - SVC_DEGRADED_THRESHOLD {
// if degraded, don't stack more health checks
warn!(
sender.capacity = sender.capacity(),
"skipping health checks"
);
continue;
}

if let Ok(projects) = gateway.iter_projects().await {
let span = info_span!(
"running health checks",
healthcheck.num_projects = projects.len()
);
let _ = span.enter();
for (project_name, _) in projects {
let _ = gateway
if let Ok(handle) = gateway
.new_task()
.project(project_name)
.and_then(task::check_health())
.send(&sender)
.await;
.await
{
// we wait for the check to be done before
// queuing up the next one
handle.await
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl GatewayService {

pub async fn iter_projects(
&self,
) -> Result<impl Iterator<Item = (ProjectName, AccountName)>, Error> {
) -> Result<impl ExactSizeIterator<Item = (ProjectName, AccountName)>, Error> {
let iter = query("SELECT project_name, account_name FROM projects")
.fetch_all(&self.db)
.await?
Expand Down
109 changes: 100 additions & 9 deletions gateway/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use futures::Future;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tokio::time::{sleep, timeout};
use tracing::{info, warn};
use tracing::{error, info, info_span, warn};
use uuid::Uuid;

use crate::project::*;
Expand Down Expand Up @@ -66,6 +68,23 @@ impl<R, E> TaskResult<R, E> {
}
}

pub fn to_str(&self) -> &str {
match self {
Self::Pending(_) => "pending",
Self::Done(_) => "done",
Self::TryAgain => "try again",
Self::Cancelled => "cancelled",
Self::Err(_) => "error",
}
}

pub fn is_done(&self) -> bool {
match self {
Self::Done(_) | Self::Cancelled | Self::Err(_) => true,
Self::TryAgain | Self::Pending(_) => false,
}
}

pub fn as_ref(&self) -> TaskResult<&R, &E> {
match self {
Self::Pending(r) => TaskResult::Pending(r),
Expand Down Expand Up @@ -179,9 +198,10 @@ impl TaskBuilder {
))
}

pub async fn send(self, sender: &Sender<BoxedTask>) -> Result<(), Error> {
match timeout(TASK_SEND_TIMEOUT, sender.send(self.build())).await {
Ok(Ok(_)) => Ok(()),
pub async fn send(self, sender: &Sender<BoxedTask>) -> Result<TaskHandle, Error> {
let (task, handle) = AndThenNotify::after(self.build());
match timeout(TASK_SEND_TIMEOUT, sender.send(Box::new(task))).await {
Ok(Ok(_)) => Ok(handle),
_ => Err(Error::from_kind(ErrorKind::ServiceUnavailable)),
}
}
Expand Down Expand Up @@ -225,6 +245,60 @@ impl Task<ProjectContext> for RunUntilDone {
}
}

pub struct TaskHandle {
rx: oneshot::Receiver<()>,
}

impl Future for TaskHandle {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
Pin::new(&mut self.rx).poll(cx).map(|_| ())
}
}

pub struct AndThenNotify<T> {
inner: T,
notify: Option<oneshot::Sender<()>>,
}

impl<T> AndThenNotify<T> {
pub fn after(task: T) -> (Self, TaskHandle) {
let (tx, rx) = oneshot::channel();
(
Self {
inner: task,
notify: Some(tx),
},
TaskHandle { rx },
)
}
}

#[async_trait]
impl<T, Ctx> Task<Ctx> for AndThenNotify<T>
where
Ctx: Send + 'static,
T: Task<Ctx>,
{
type Output = T::Output;

type Error = T::Error;

async fn poll(&mut self, ctx: Ctx) -> TaskResult<Self::Output, Self::Error> {
let out = self.inner.poll(ctx).await;

if out.is_done() {
let _ = self.notify.take().unwrap().send(());
}

out
}
}

pub struct WithTimeout<T> {
inner: T,
start: Option<Instant>,
Expand Down Expand Up @@ -322,8 +396,6 @@ where

let ctx = self.service.context();

info!(%self.project_name, "starting work on project");

let project = match self.service.find_project(&self.project_name).await {
Ok(project) => project,
Err(err) => return TaskResult::Err(err),
Expand All @@ -345,6 +417,14 @@ where
state: project,
};

let span = info_span!(
"polling project",
ctx.project = ?project_ctx.project_name,
ctx.account = ?project_ctx.account_name,
ctx.state = project_ctx.state.state()
);
let _ = span.enter();

let task = self.tasks.front_mut().unwrap();

let timeout = sleep(PROJECT_TASK_MAX_IDLE_TIMEOUT);
Expand All @@ -364,16 +444,24 @@ where
};

if let Some(update) = res.as_ref().ok() {
info!(new_state = ?update.state(), "new state");
match self
.service
.update_project(&self.project_name, update)
.await
{
Ok(_) => {}
Err(err) => return TaskResult::Err(err),
Ok(_) => {
info!(new_state = ?update.state(), "successfully updated project state");
}
Err(err) => {
error!(err = %err, "could not update project state");
return TaskResult::Err(err);
}
}
}

info!(result = res.to_str(), "poll result");

match res {
TaskResult::Pending(_) => TaskResult::Pending(()),
TaskResult::TryAgain => TaskResult::TryAgain,
Expand All @@ -386,7 +474,10 @@ where
}
}
TaskResult::Cancelled => TaskResult::Cancelled,
TaskResult::Err(err) => TaskResult::Err(err),
TaskResult::Err(err) => {
error!(err = %err, "project task failure");
TaskResult::Err(err)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion proto/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shuttle-proto"
version = "0.7.2"
version = "0.7.3"
edition = "2021"
publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
2 changes: 1 addition & 1 deletion provisioner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shuttle-provisioner"
version = "0.7.2"
version = "0.7.3"
edition = "2021"
description = "Service responsible for provisioning and managing resources for services"
publish = false
Expand Down

0 comments on commit e3fb067

Please sign in to comment.