Skip to content

Commit

Permalink
add support for upload speed / remaining in the cache upload step
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed May 10, 2024
1 parent f0f9ea3 commit c9e1b29
Show file tree
Hide file tree
Showing 14 changed files with 401 additions and 156 deletions.
197 changes: 68 additions & 129 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion crates/turborepo-api-client/Cargo.toml
Expand Up @@ -21,15 +21,18 @@ workspace = true

[dependencies]
anyhow = { workspace = true }
bytes.workspace = true
chrono = { workspace = true, features = ["serde"] }
lazy_static = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
reqwest = { workspace = true, features = ["json", "stream"] }
rustc_version_runtime = "0.2.1"
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.10", features = ["codec"] }
tracing = { workspace = true }
turbopath = { workspace = true }
turborepo-ci = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/turborepo-api-client/src/error.rs
Expand Up @@ -7,6 +7,8 @@ use crate::CachingStatus;

#[derive(Debug, Error)]
pub enum Error {
#[error("Error reading from disk: {0}")]
ReadError(#[from] std::io::Error),
#[error("Error making HTTP request: {0}")]
ReqwestError(#[from] reqwest::Error),
#[error("skipping HTTP Request, too many failures have occurred.\nLast error: {0}")]
Expand Down
10 changes: 6 additions & 4 deletions crates/turborepo-api-client/src/lib.rs
Expand Up @@ -8,7 +8,7 @@ use std::{backtrace::Backtrace, env, future::Future, time::Duration};
use lazy_static::lazy_static;
use regex::Regex;
pub use reqwest::Response;
use reqwest::{Method, RequestBuilder, StatusCode};
use reqwest::{Body, Method, RequestBuilder, StatusCode};
use serde::Deserialize;
use turborepo_ci::{is_ci, Vendor};
use turborepo_vercel_api::{
Expand Down Expand Up @@ -74,7 +74,7 @@ pub trait CacheClient {
fn put_artifact(
&self,
hash: &str,
artifact_body: &[u8],
artifact_body: impl tokio_stream::Stream<Item = Result<bytes::Bytes>> + Send + Sync + 'static,
duration: u64,
tag: Option<&str>,
token: &str,
Expand Down Expand Up @@ -358,7 +358,7 @@ impl CacheClient for APIClient {
async fn put_artifact(
&self,
hash: &str,
artifact_body: &[u8],
artifact_body: impl tokio_stream::Stream<Item = Result<bytes::Bytes>> + Send + Sync + 'static,
duration: u64,
tag: Option<&str>,
token: &str,
Expand All @@ -382,13 +382,15 @@ impl CacheClient for APIClient {
request_url = preflight_response.location.clone();
}

let stream = Body::wrap_stream(artifact_body);

let mut request_builder = self
.cache_client
.put(request_url)
.header("Content-Type", "application/octet-stream")
.header("x-artifact-duration", duration.to_string())
.header("User-Agent", self.user_agent.clone())
.body(artifact_body.to_vec());
.body(stream);

if allow_auth {
request_builder = request_builder.header("Authorization", format!("Bearer {}", token));
Expand Down
4 changes: 4 additions & 0 deletions crates/turborepo-cache/Cargo.toml
Expand Up @@ -24,19 +24,23 @@ workspace = true

[dependencies]
base64 = "0.21.0"
bytes.workspace = true
camino = { workspace = true }
futures = { workspace = true }
hmac = "0.12.1"
os_str_bytes = "6.5.0"
path-clean = { workspace = true }
petgraph = "0.6.3"
pin-project = "1.1.5"
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
tar = "0.4.38"
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.10", features = ["codec"] }
tracing = { workspace = true }
turbopath = { workspace = true }
turborepo-analytics = { workspace = true }
Expand Down
53 changes: 41 additions & 12 deletions crates/turborepo-cache/src/async_cache.rs
@@ -1,13 +1,15 @@
use std::sync::{atomic::AtomicU8, Arc};
use std::sync::{atomic::AtomicU8, Arc, Mutex};

use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::{mpsc, Semaphore};
use tokio::sync::{mpsc, oneshot, Semaphore};
use tracing::{warn, Instrument, Level};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf};
use turborepo_analytics::AnalyticsSender;
use turborepo_api_client::{APIAuth, APIClient};

use crate::{multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts};
use crate::{
http::UploadMap, multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts,
};

const WARNING_CUTOFF: u8 = 4;

Expand All @@ -24,8 +26,11 @@ enum WorkerRequest {
duration: u64,
files: Vec<AnchoredSystemPathBuf>,
},
Flush(tokio::sync::oneshot::Sender<()>),
Shutdown(tokio::sync::oneshot::Sender<()>),
Flush(oneshot::Sender<()>),
/// Shutdown the cache. The first oneshot notifies when shutdown starts and
/// allows the user to inspect the status of the uploads. The second
/// oneshot notifies when the shutdown is complete.
Shutdown(oneshot::Sender<Arc<Mutex<UploadMap>>>, oneshot::Sender<()>),
}

impl AsyncCache {
Expand Down Expand Up @@ -95,19 +100,27 @@ impl AsyncCache {
}
drop(callback);
}
WorkerRequest::Shutdown(callback) => {
shutdown_callback = Some(callback);
WorkerRequest::Shutdown(closing, done) => {
shutdown_callback = Some((closing, done));
break;
}
};
}
// Drop write consumer to immediately notify callers that cache is shutting down
drop(write_consumer);

let shutdown_callback = if let Some((closing, done)) = shutdown_callback {
closing.send(real_cache.requests().unwrap_or_default()).ok();
Some(done)
} else {
None
};

// wait for all writers to finish
while let Some(worker) = workers.next().await {
let _ = worker;
}

if let Some(callback) = shutdown_callback {
callback.send(()).ok();
}
Expand Down Expand Up @@ -162,7 +175,7 @@ impl AsyncCache {
// before checking the cache.
#[tracing::instrument(skip_all)]
pub async fn wait(&self) -> Result<(), CacheError> {
let (tx, rx) = tokio::sync::oneshot::channel();
let (tx, rx) = oneshot::channel();
self.writer_sender
.send(WorkerRequest::Flush(tx))
.await
Expand All @@ -172,14 +185,30 @@ impl AsyncCache {
Ok(())
}

/// Shut down the cache, waiting for all workers to finish writing.
/// This function returns as soon as the shut down has started,
/// returning a channel through which workers can report on their
/// progress.
#[tracing::instrument(skip_all)]
pub async fn shutdown(&self) -> Result<(), CacheError> {
let (tx, rx) = tokio::sync::oneshot::channel();
pub async fn start_shutdown(
&self,
) -> Result<(Arc<Mutex<UploadMap>>, oneshot::Receiver<()>), CacheError> {
let (closing_tx, closing_rx) = oneshot::channel::<Arc<Mutex<UploadMap>>>();
let (closed_tx, closed_rx) = oneshot::channel::<()>();
self.writer_sender
.send(WorkerRequest::Shutdown(tx))
.send(WorkerRequest::Shutdown(closing_tx, closed_tx))
.await
.map_err(|_| CacheError::CacheShuttingDown)?;
rx.await.ok();
Ok((closing_rx.await.unwrap(), closed_rx)) // todo
}

/// Shut down the cache, waiting for all workers to finish writing.
/// This function returns only when the last worker is complete.
/// It is a convenience wrapper around `start_shutdown`.
#[tracing::instrument(skip_all)]
pub async fn shutdown(&self) -> Result<(), CacheError> {
let (_, closed_rx) = self.start_shutdown().await?;
closed_rx.await.ok();
Ok(())
}
}
Expand Down
36 changes: 34 additions & 2 deletions crates/turborepo-cache/src/http.rs
@@ -1,5 +1,11 @@
use std::{backtrace::Backtrace, io::Write};
use std::{
backtrace::Backtrace,
collections::HashMap,
io::{Cursor, Write},
sync::{Arc, Mutex},
};

use tokio_stream::StreamExt;
use tracing::debug;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf};
use turborepo_analytics::AnalyticsSender;
Expand All @@ -11,15 +17,19 @@ use turborepo_api_client::{
use crate::{
cache_archive::{CacheReader, CacheWriter},
signature_authentication::ArtifactSignatureAuthenticator,
upload_progress::{UploadProgress, UploadProgressQuery},
CacheError, CacheHitMetadata, CacheOpts, CacheSource,
};

pub type UploadMap = HashMap<String, UploadProgressQuery<10, 100>>;

pub struct HTTPCache {
client: APIClient,
signer_verifier: Option<ArtifactSignatureAuthenticator>,
repo_root: AbsoluteSystemPathBuf,
api_auth: APIAuth,
analytics_recorder: Option<AnalyticsSender>,
uploads: Arc<Mutex<UploadMap>>,
}

impl HTTPCache {
Expand Down Expand Up @@ -53,6 +63,7 @@ impl HTTPCache {
client,
signer_verifier,
repo_root,
uploads: Arc::new(Mutex::new(HashMap::new())),
api_auth,
analytics_recorder,
}
Expand All @@ -68,20 +79,37 @@ impl HTTPCache {
) -> Result<(), CacheError> {
let mut artifact_body = Vec::new();
self.write(&mut artifact_body, anchor, files).await?;
let bytes = artifact_body.len();

let tag = self
.signer_verifier
.as_ref()
.map(|signer| signer.generate_tag(hash.as_bytes(), &artifact_body))
.transpose()?;

let stream = tokio_util::codec::FramedRead::new(
Cursor::new(artifact_body),
tokio_util::codec::BytesCodec::new(),
)
.map(|res| {
res.map(|bytes| bytes.freeze())
.map_err(|e| turborepo_api_client::Error::from(e))

Check failure on line 96 in crates/turborepo-cache/src/http.rs

View workflow job for this annotation

GitHub Actions / Turborepo rust clippy

redundant closure
});

let (progress, query) = UploadProgress::<10, 100, _>::new(stream, Some(bytes));

{
let mut uploads = self.uploads.lock().unwrap();
uploads.insert(hash.to_string(), query);
}

tracing::debug!("uploading {}", hash);

match self
.client
.put_artifact(
hash,
&artifact_body,
progress,
duration,
tag.as_deref(),
&self.api_auth.token,
Expand Down Expand Up @@ -237,6 +265,10 @@ impl HTTPCache {
)))
}

pub fn requests(&self) -> Arc<Mutex<UploadMap>> {
self.uploads.clone()
}

#[tracing::instrument(skip_all)]
pub(crate) fn restore_tar(
root: &AbsoluteSystemPath,
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-cache/src/lib.rs
Expand Up @@ -19,6 +19,7 @@ mod multiplexer;
pub mod signature_authentication;
#[cfg(test)]
mod test_cases;
mod upload_progress;

use std::{backtrace, backtrace::Backtrace};

Expand Down
15 changes: 13 additions & 2 deletions crates/turborepo-cache/src/multiplexer.rs
@@ -1,11 +1,18 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};

use tracing::{debug, warn};
use turbopath::{AbsoluteSystemPath, AnchoredSystemPathBuf};
use turborepo_analytics::AnalyticsSender;
use turborepo_api_client::{APIAuth, APIClient};

use crate::{fs::FSCache, http::HTTPCache, CacheError, CacheHitMetadata, CacheOpts};
use crate::{
fs::FSCache,
http::{HTTPCache, UploadMap},
CacheError, CacheHitMetadata, CacheOpts,
};

pub struct CacheMultiplexer {
// We use an `AtomicBool` instead of removing the cache because that would require
Expand Down Expand Up @@ -82,6 +89,10 @@ impl CacheMultiplexer {
}
}

pub fn requests(&self) -> Option<Arc<Mutex<UploadMap>>> {
self.http.as_ref().map(|http| http.requests())
}

#[tracing::instrument(skip_all)]
pub async fn put(
&self,
Expand Down

0 comments on commit c9e1b29

Please sign in to comment.