Skip to content

Commit

Permalink
feat(me): implement timing events for spans
Browse files Browse the repository at this point in the history
This is a minimal tracing layer that times how long each span takes and
logs an event with the result after the span is closed, for our own
needs in performance observability.

closes prisma/prisma#13693
  • Loading branch information
tomhoule committed Jun 21, 2022
1 parent 7a18b3b commit 2f40772
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 109 deletions.
54 changes: 12 additions & 42 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion introspection-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jsonrpc-derive = "17.0"
json-rpc-stdio = { path = "../../libs/json-rpc-stdio" }

tracing = "0.1"
tracing-subscriber = "0.2"
tracing-subscriber = "0.3"
tracing-futures = "0.2"

[dependencies.tokio]
Expand Down
1 change: 0 additions & 1 deletion libs/sql-schema-describer/src/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ impl<'a> SqlSchemaDescriber<'a> {
Ok(size as usize)
}

#[tracing::instrument(skip(self, columns, indexes, foreign_keys))]
fn get_table(
&self,
name: &str,
Expand Down
5 changes: 3 additions & 2 deletions libs/test-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,11 @@ fn init_logger() {

let subscriber = FmtSubscriber::builder()
.with_env_filter(EnvFilter::from_default_env())
.with_ansi(false)
.with_ansi(true)
.with_writer(std::io::stderr)
.finish()
.with(ErrorLayer::default());
.with(ErrorLayer::default())
.with(migration_core::TimingsLayer::default());

tracing::subscriber::set_global_default(subscriber)
.map_err(|err| eprintln!("Error initializing the global logger: {}", err))
Expand Down
4 changes: 3 additions & 1 deletion migration-engine/cli/src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use migration_connector::ConnectorError;
use migration_core::TimingsLayer;
use tracing_error::ErrorLayer;

pub(crate) fn init_logger() {
Expand All @@ -10,7 +11,8 @@ pub(crate) fn init_logger() {
.with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339())
.with_writer(std::io::stderr)
.finish()
.with(ErrorLayer::default());
.with(ErrorLayer::default())
.with(TimingsLayer::default());

tracing::subscriber::set_global_default(subscriber)
.map_err(|err| eprintln!("Error initializing the global logger: {}", err))
Expand Down
1 change: 1 addition & 0 deletions migration-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
tokio = { version = "1.0", default_features = false }
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-futures = "0.2"
url = "2.1.1"

Expand Down
104 changes: 46 additions & 58 deletions migration-engine/core/src/commands/apply_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use migration_connector::{
ConnectorError, MigrationConnector, MigrationRecord, PersistenceNotInitializedError,
};
use std::{path::Path, time::Instant};
use tracing::Instrument;
use user_facing_errors::migration_engine::FoundFailedMigrations;

pub async fn apply_migrations(
Expand Down Expand Up @@ -43,70 +44,57 @@ pub async fn apply_migrations(
tracing::info!(analysis_duration_ms, "Analysis run in {}ms", analysis_duration_ms,);

let mut applied_migration_names: Vec<String> = Vec::with_capacity(unapplied_migrations.len());
let apply_migrations_start = Instant::now();

for unapplied_migration in unapplied_migrations {
let span = tracing::info_span!(
let fut = async {
let script = unapplied_migration
.read_migration_script()
.map_err(ConnectorError::from)?;

tracing::info!(
script = script.as_str(),
"Applying `{}`",
unapplied_migration.migration_name()
);

let migration_id = connector
.migration_persistence()
.record_migration_started(unapplied_migration.migration_name(), &script)
.await?;

match connector
.apply_script(unapplied_migration.migration_name(), &script)
.await
{
Ok(()) => {
tracing::debug!("Successfully applied the script.");
let p = connector.migration_persistence();
p.record_successful_step(&migration_id).await?;
p.record_migration_finished(&migration_id).await?;
applied_migration_names.push(unapplied_migration.migration_name().to_owned());
Ok(())
}
Err(err) => {
tracing::debug!("Failed to apply the script.");

let logs = err.to_string();

connector
.migration_persistence()
.record_failed_step(&migration_id, &logs)
.await?;

Err(err)
}
}
};
fut.instrument(tracing::info_span!(
"Applying migration",
migration_name = unapplied_migration.migration_name(),
);
let _span = span.enter();
let migration_apply_start = Instant::now();

let script = unapplied_migration
.read_migration_script()
.map_err(ConnectorError::from)?;

tracing::info!(
script = script.as_str(),
"Applying `{}`",
unapplied_migration.migration_name()
);

let migration_id = connector
.migration_persistence()
.record_migration_started(unapplied_migration.migration_name(), &script)
.await?;

match connector
.apply_script(unapplied_migration.migration_name(), &script)
.await
{
Ok(()) => {
tracing::debug!("Successfully applied the script.");
let p = connector.migration_persistence();
p.record_successful_step(&migration_id).await?;
p.record_migration_finished(&migration_id).await?;
applied_migration_names.push(unapplied_migration.migration_name().to_owned());
let migration_duration_ms = Instant::now().duration_since(migration_apply_start).as_millis() as u64;
tracing::info!(
migration_duration_ms = migration_duration_ms,
"Migration executed in {}ms",
migration_duration_ms
);
}
Err(err) => {
tracing::debug!("Failed to apply the script.");

let logs = err.to_string();

connector
.migration_persistence()
.record_failed_step(&migration_id, &logs)
.await?;

return Err(err);
}
}
))
.await?
}

let apply_migrations_ms = Instant::now().duration_since(apply_migrations_start).as_millis() as u64;
tracing::info!(
apply_migrations_duration_ms = apply_migrations_ms,
"All the migrations executed in {}ms",
apply_migrations_ms
);

Ok(ApplyMigrationsOutput {
applied_migration_names,
})
Expand Down
3 changes: 2 additions & 1 deletion migration-engine/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ mod api;
mod core_error;
mod rpc;
mod state;
mod timings;

pub use self::{api::GenericApi, core_error::*, rpc::rpc_api};
pub use self::{api::GenericApi, core_error::*, rpc::rpc_api, timings::TimingsLayer};
pub use migration_connector;

use datamodel::ValidatedSchema;
Expand Down
36 changes: 36 additions & 0 deletions migration-engine/core/src/timings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::time;
use tracing::Id as SpanId;

/// Gather and display timings of tracing spans.
#[derive(Default)]
pub struct TimingsLayer;

struct TimerTime(pub time::Instant, String);

impl<S> tracing_subscriber::Layer<S> for TimingsLayer
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
fn on_new_span(
&self,
attrs: &tracing::span::Attributes<'_>,
id: &SpanId,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let span_ctx = ctx.span(id).unwrap();
let mut extensions = span_ctx.extensions_mut();
extensions.insert(TimerTime(time::Instant::now(), attrs.values().to_string()));
}

fn on_close(&self, id: SpanId, ctx: tracing_subscriber::layer::Context<'_, S>) {
let span_ctx = ctx.span(&id).unwrap();
let span_name = span_ctx.name();
let mut extensions = span_ctx.extensions_mut();
let TimerTime(start, values) = extensions.remove::<TimerTime>().unwrap();
let elapsed = time::Instant::now().duration_since(start);
tracing::debug!(
span_timing_μs = elapsed.as_micros() as u32,
"{span_name}{values}: Span closed. Elapsed: {elapsed:?}",
);
}
}

0 comments on commit 2f40772

Please sign in to comment.