Skip to content

Commit

Permalink
hubris bits, stamping, job limits
Browse files Browse the repository at this point in the history
  • Loading branch information
iliana committed May 12, 2024
1 parent e5f78ff commit a54cb20
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 8 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -338,6 +338,7 @@ nexus-test-utils = { path = "nexus/test-utils" }
nexus-types = { path = "nexus/types" }
num-integer = "0.1.46"
num = { version = "0.4.2", default-features = false, features = [ "libm" ] }
num_cpus = "1.16.0"
omicron-common = { path = "common" }
omicron-gateway = { path = "gateway" }
omicron-nexus = { path = "nexus" }
Expand Down
6 changes: 6 additions & 0 deletions dev-tools/releng/Cargo.toml
Expand Up @@ -12,16 +12,22 @@ chrono.workspace = true
clap.workspace = true
fs-err = { workspace = true, features = ["tokio"] }
futures.workspace = true
num_cpus.workspace = true
omicron-common.workspace = true
omicron-workspace-hack.workspace = true
omicron-zone-package.workspace = true
once_cell.workspace = true
reqwest.workspace = true
semver.workspace = true
serde.workspace = true
shell-words.workspace = true
slog.workspace = true
slog-async.workspace = true
slog-term.workspace = true
tar.workspace = true
tokio = { workspace = true, features = ["full"] }
toml.workspace = true
tufaceous-lib.workspace = true

[lints]
workspace = true
180 changes: 180 additions & 0 deletions dev-tools/releng/src/hubris.rs
@@ -0,0 +1,180 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::collections::HashMap;

use anyhow::Context;
use anyhow::Result;
use camino::Utf8PathBuf;
use fs_err::tokio as fs;
use fs_err::tokio::File;
use futures::future::TryFutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::TryStreamExt;
use omicron_common::api::external::SemverVersion;
use omicron_common::api::internal::nexus::KnownArtifactKind;
use semver::Version;
use serde::Deserialize;
use serde::Serialize;
use tokio::io::AsyncWriteExt;
use tufaceous_lib::assemble::DeserializedArtifactData;
use tufaceous_lib::assemble::DeserializedArtifactSource;
use tufaceous_lib::assemble::DeserializedFileArtifactSource;

async fn fetch_one(
base_url: &'static str,
client: reqwest::Client,
hash: &str,
) -> Result<Vec<u8>> {
client
.get(format!("{}/artifact/{}", base_url, hash))
.send()
.and_then(|response| response.json())
.await
.with_context(|| {
format!(
"failed to fetch hubris artifact {} from {}",
hash, base_url
)
})
}

pub(crate) async fn fetch_hubris_artifacts(
base_url: &'static str,
client: reqwest::Client,
manifest_list: Utf8PathBuf,
output_dir: Utf8PathBuf,
) -> Result<()> {
fs::create_dir_all(&output_dir).await?;

let (manifests, hashes) = fs::read_to_string(manifest_list)
.await?
.lines()
.filter_map(|line| line.split_whitespace().next())
.map(|hash| {
let hash = hash.to_owned();
let client = client.clone();
async move {
let data = fetch_one(base_url, client, &hash).await?;
let str = String::from_utf8(data)
.context("hubris artifact manifest was not UTF-8")?;
let hash_manifest: Manifest<Artifact> = toml::from_str(&str)
.context(
"failed to deserialize hubris artifact manifest",
)?;

let mut hashes = Vec::new();
for artifact in hash_manifest.artifacts.values().flatten() {
match &artifact.source {
Source::File(file) => hashes.push(file.hash.clone()),
Source::CompositeRot { archive_a, archive_b } => hashes
.extend([
archive_a.hash.clone(),
archive_b.hash.clone(),
]),
}
}

let path_manifest: Manifest<DeserializedArtifactData> =
hash_manifest.into();
anyhow::Ok((path_manifest, hashes))
}
})
.collect::<FuturesUnordered<_>>()
.try_collect::<(Vec<_>, Vec<_>)>()
.await?;

let mut output_manifest =
File::create(output_dir.join("manifest.toml")).await?;
for manifest in manifests {
output_manifest
.write_all(toml::to_string_pretty(&manifest)?.as_bytes())
.await?;
}

hashes
.into_iter()
.flatten()
.map(|hash| {
let client = client.clone();
let output_dir = output_dir.clone();
async move {
let data = fetch_one(base_url, client, &hash).await?;
fs::write(output_dir.join(hash).with_extension("zip"), data)
.await?;
anyhow::Ok(())
}
})
.collect::<FuturesUnordered<_>>()
.try_collect::<()>()
.await?;

output_manifest.sync_data().await?;
Ok(())
}

#[derive(Serialize, Deserialize)]
struct Manifest<T> {
#[serde(rename = "artifact")]
artifacts: HashMap<KnownArtifactKind, Vec<T>>,
}

#[derive(Deserialize)]
struct Artifact {
name: String,
version: Version,
source: Source,
}

#[derive(Deserialize)]
#[serde(tag = "kind", rename_all = "kebab-case")]
enum Source {
File(FileSource),
CompositeRot { archive_a: FileSource, archive_b: FileSource },
}

#[derive(Deserialize)]
struct FileSource {
hash: String,
}

impl From<Manifest<Artifact>> for Manifest<DeserializedArtifactData> {
fn from(
manifest: Manifest<Artifact>,
) -> Manifest<DeserializedArtifactData> {
fn zip(hash: String) -> Utf8PathBuf {
Utf8PathBuf::from(hash).with_extension("zip")
}

let mut artifacts = HashMap::new();
for (kind, old_data) in manifest.artifacts {
let mut new_data = Vec::new();
for artifact in old_data {
let source = match artifact.source {
Source::File(file) => DeserializedArtifactSource::File {
path: zip(file.hash),
},
Source::CompositeRot { archive_a, archive_b } => {
DeserializedArtifactSource::CompositeRot {
archive_a: DeserializedFileArtifactSource::File {
path: zip(archive_a.hash),
},
archive_b: DeserializedFileArtifactSource::File {
path: zip(archive_b.hash),
},
}
}
};
new_data.push(DeserializedArtifactData {
name: artifact.name,
version: SemverVersion(artifact.version),
source,
});
}
artifacts.insert(kind, new_data);
}

Manifest { artifacts }
}
}
24 changes: 22 additions & 2 deletions dev-tools/releng/src/job.rs
Expand Up @@ -6,6 +6,7 @@ use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Instant;

use anyhow::anyhow;
Expand All @@ -26,11 +27,13 @@ use tokio::io::BufReader;
use tokio::process::Command;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;

use crate::cmd::CommandExt;

pub(crate) struct Jobs {
logger: Logger,
permits: Arc<Semaphore>,
log_dir: Utf8PathBuf,
map: HashMap<String, Job>,
}
Expand All @@ -47,9 +50,14 @@ pub(crate) struct Selector<'a> {
}

impl Jobs {
pub(crate) fn new(logger: &Logger, log_dir: &Utf8Path) -> Jobs {
pub(crate) fn new(
logger: &Logger,
permits: Arc<Semaphore>,
log_dir: &Utf8Path,
) -> Jobs {
Jobs {
logger: logger.clone(),
permits,
log_dir: log_dir.to_owned(),
map: HashMap::new(),
}
Expand All @@ -70,6 +78,7 @@ impl Jobs {
Job {
future: Box::pin(run_job(
self.logger.clone(),
self.permits.clone(),
name.clone(),
future,
)),
Expand All @@ -95,6 +104,7 @@ impl Jobs {
// returning &mut
std::mem::replace(command, Command::new("false")),
self.logger.clone(),
self.permits.clone(),
name.clone(),
self.log_dir.join(&name).with_extension("log"),
)),
Expand Down Expand Up @@ -167,10 +177,17 @@ macro_rules! info_or_error {
};
}

async fn run_job<F>(logger: Logger, name: String, future: F) -> Result<()>
async fn run_job<F>(
logger: Logger,
permits: Arc<Semaphore>,
name: String,
future: F,
) -> Result<()>
where
F: Future<Output = Result<()>> + 'static,
{
let _ = permits.acquire_owned().await?;

info!(logger, "[{}] running task", name);
let start = Instant::now();
let result = future.await;
Expand All @@ -189,9 +206,12 @@ where
async fn spawn_with_output(
mut command: Command,
logger: Logger,
permits: Arc<Semaphore>,
name: String,
log_path: Utf8PathBuf,
) -> Result<()> {
let _ = permits.acquire_owned().await?;

let log_file_1 = File::create(log_path).await?;
let log_file_2 = log_file_1.try_clone().await?;

Expand Down

0 comments on commit a54cb20

Please sign in to comment.