Skip to content

Commit

Permalink
Upgrade to hyper 1.x (#450)
Browse files Browse the repository at this point in the history
  • Loading branch information
evaneaston committed Mar 13, 2024
1 parent e2c37f4 commit b195391
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 230 deletions.
16 changes: 10 additions & 6 deletions metrics-exporter-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ keywords = ["metrics", "telemetry", "prometheus"]

[features]
default = ["http-listener", "push-gateway"]
async-runtime = ["tokio", "hyper"]
http-listener = ["async-runtime", "hyper/server", "ipnet"]
push-gateway = ["async-runtime", "hyper/client", "hyper-tls", "tracing"]
async-runtime = ["tokio", "hyper-util/tokio"]
http-listener = ["async-runtime", "ipnet", "tracing", "_hyper-server"]
push-gateway = ["async-runtime", "tracing", "_hyper-client"]
_hyper-server = ["http-body-util", "hyper/server", "hyper-util/server-auto"]
_hyper-client = ["http-body-util", "hyper/client", "hyper-util/client", "hyper-util/http1", "hyper-util/client-legacy", "hyper-tls"]

[dependencies]
metrics = { version = "^0.22", path = "../metrics" }
Expand All @@ -31,11 +33,13 @@ indexmap = { version = "2.1", default-features = false, features = ["std"] }
base64 = { version = "0.21.0", default-features = false, features = ["std"] }

# Optional
hyper = { version = "0.14", default-features = false, features = ["tcp", "http1"], optional = true }
hyper = { version = "1.1", features = [ "server", "client" ], optional = true }
hyper-util = { version="0.1.3", features = [ "tokio", "service", "client", "client-legacy", "http1" ], optional = true }
http-body-util = { version = "0.1.0", optional = true }
ipnet = { version = "2", optional = true }
tokio = { version = "1", features = ["rt", "net", "time"], optional = true }
tokio = { version = "1", features = ["rt", "net", "time", "rt-multi-thread"], optional = true }
tracing = { version = "0.1.26", optional = true }
hyper-tls = { version = "0.5.0", optional = true }
hyper-tls = { version = "0.6.0", optional = true }

[dev-dependencies]
tracing = "0.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,20 @@
use std::collections::HashMap;
#[cfg(feature = "push-gateway")]
use std::convert::TryFrom;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use std::future::Future;
#[cfg(feature = "http-listener")]
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::num::NonZeroU32;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use std::pin::Pin;
use std::sync::RwLock;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use std::thread;
use std::time::Duration;

#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use hyper::Body;

#[cfg(feature = "http-listener")]
use hyper::{
server::{conn::AddrStream, Server},
service::{make_service_fn, service_fn},
Response, StatusCode,
};

#[cfg(feature = "push-gateway")]
use hyper::{
body::{aggregate, Buf},
client::Client,
http::HeaderValue,
Method, Request, Uri,
};
#[cfg(feature = "push-gateway")]
use hyper_tls::HttpsConnector;

use hyper::Uri;
use indexmap::IndexMap;
#[cfg(feature = "http-listener")]
use ipnet::IpNet;
use quanta::Clock;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use tokio::runtime;
#[cfg(feature = "push-gateway")]
use tracing::error;

use metrics_util::{
parse_quantiles,
Expand All @@ -54,41 +28,9 @@ use crate::recorder::{Inner, PrometheusRecorder};
use crate::registry::AtomicStorage;
use crate::{common::BuildError, PrometheusHandle};

use super::ExporterConfig;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
type ExporterFuture = Pin<Box<dyn Future<Output = Result<(), hyper::Error>> + Send + 'static>>;

#[derive(Clone)]
enum ExporterConfig {
// Run an HTTP listener on the given `listen_address`.
#[cfg(feature = "http-listener")]
HttpListener { listen_address: SocketAddr },

// Run a push gateway task sending to the given `endpoint` after `interval` time has elapsed,
// infinitely.
#[cfg(feature = "push-gateway")]
PushGateway {
endpoint: Uri,
interval: Duration,
username: Option<String>,
password: Option<String>,
},

#[allow(dead_code)]
Unconfigured,
}

impl ExporterConfig {
#[cfg_attr(not(any(feature = "http-listener", feature = "push-gateway")), allow(dead_code))]
fn as_type_str(&self) -> &'static str {
match self {
#[cfg(feature = "http-listener")]
Self::HttpListener { .. } => "http-listener",
#[cfg(feature = "push-gateway")]
Self::PushGateway { .. } => "push-gateway",
Self::Unconfigured => "unconfigured,",
}
}
}
use super::ExporterFuture;

/// Builder for creating and installing a Prometheus recorder/exporter.
pub struct PrometheusBuilder {
Expand Down Expand Up @@ -390,6 +332,8 @@ impl PrometheusBuilder {
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "http-listener", feature = "push-gateway"))))]
pub fn install(self) -> Result<(), BuildError> {
use tokio::runtime;

let recorder = if let Ok(handle) = runtime::Handle::try_current() {
let (recorder, exporter) = {
let _g = handle.enter();
Expand Down Expand Up @@ -470,112 +414,28 @@ impl PrometheusBuilder {
let recorder = self.build_recorder();
let handle = recorder.handle();

match exporter_config {
ExporterConfig::Unconfigured => Err(BuildError::MissingExporterConfiguration),
#[cfg(feature = "http-listener")]
ExporterConfig::HttpListener { listen_address } => {
let server = Server::try_bind(&listen_address)
.map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?;
let exporter = async move {
let make_svc = make_service_fn(move |socket: &AddrStream| {
let remote_addr = socket.remote_addr().ip();

// If the allowlist is empty, the request is allowed. Otherwise, it must
// match one of the entries in the allowlist or it will be denied.
let is_allowed = allowed_addresses.as_ref().map_or(true, |addresses| {
addresses.iter().any(|address| address.contains(&remote_addr))
});

let handle = handle.clone();

async move {
Ok::<_, hyper::Error>(service_fn(move |req| {
let handle = handle.clone();

async move {
if is_allowed {
Ok::<_, hyper::Error>(match req.uri().path() {
"/health" => Response::new(Body::from("OK")),
_ => Response::new(Body::from(handle.render())),
})
} else {
Ok::<_, hyper::Error>(
Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::empty())
.expect("static response is valid"),
)
}
}
}))
}
});
server.serve(make_svc).await
};

Ok((recorder, Box::pin(exporter)))
}

#[cfg(feature = "push-gateway")]
ExporterConfig::PushGateway { endpoint, interval, username, password } => {
let exporter = async move {
let https = HttpsConnector::new();
let client = Client::builder().build::<_, hyper::Body>(https);
let auth = username.as_ref().map(|name| basic_auth(name, password.as_deref()));

loop {
// Sleep for `interval` amount of time, and then do a push.
tokio::time::sleep(interval).await;

let mut builder = Request::builder();
if let Some(auth) = &auth {
builder = builder.header("authorization", auth.clone());
}

let output = handle.render();
let result = builder
.method(Method::PUT)
.uri(endpoint.clone())
.body(Body::from(output));
let req = match result {
Ok(req) => req,
Err(e) => {
error!("failed to build push gateway request: {}", e);
continue;
}
};

match client.request(req).await {
Ok(response) => {
if !response.status().is_success() {
let status = response.status();
let status = status
.canonical_reason()
.unwrap_or_else(|| status.as_str());
let body = aggregate(response.into_body()).await;
let body = body
.map_err(|_| ())
.map(|mut b| b.copy_to_bytes(b.remaining()))
.map(|b| b[..].to_vec())
.and_then(|s| String::from_utf8(s).map_err(|_| ()))
.unwrap_or_else(|()| {
String::from("<failed to read response body>")
});
error!(
message = "unexpected status after pushing metrics to push gateway",
status,
%body,
);
}
}
Err(e) => error!("error sending request to push gateway: {:?}", e),
}
}
};

Ok((recorder, Box::pin(exporter)))
}
}
Ok((
recorder,
match exporter_config {
ExporterConfig::Unconfigured => Err(BuildError::MissingExporterConfiguration)?,

#[cfg(feature = "http-listener")]
ExporterConfig::HttpListener { listen_address } => {
super::http_listener::new_http_listener(
handle,
listen_address,
allowed_addresses,
)?
}

#[cfg(feature = "push-gateway")]
ExporterConfig::PushGateway { endpoint, interval, username, password } => {
super::push_gateway::new_push_gateway(
endpoint, interval, username, password, handle,
)
}
},
))
}

/// Builds the recorder and returns it.
Expand Down Expand Up @@ -609,26 +469,8 @@ impl Default for PrometheusBuilder {
}
}

#[cfg(feature = "push-gateway")]
fn basic_auth(username: &str, password: Option<&str>) -> HeaderValue {
use base64::prelude::BASE64_STANDARD;
use base64::write::EncoderWriter;
use std::io::Write;

let mut buf = b"Basic ".to_vec();
{
let mut encoder = EncoderWriter::new(&mut buf, &BASE64_STANDARD);
write!(encoder, "{username}:").expect("should not fail to encode username");
if let Some(password) = password {
write!(encoder, "{password}").expect("should not fail to encode password");
}
}
let mut header = HeaderValue::from_bytes(&buf).expect("base64 is always valid HeaderValue");
header.set_sensitive(true);
header
}

#[cfg(test)]
#[allow(clippy::approx_constant)]
mod tests {
use std::time::Duration;

Expand Down Expand Up @@ -1093,39 +935,3 @@ mod tests {
assert_eq!(rendered, expected_counter);
}
}

#[cfg(all(test, feature = "push-gateway"))]
mod push_gateway_tests {
use crate::builder::basic_auth;

#[test]
pub fn test_basic_auth() {
use base64::prelude::BASE64_STANDARD;
use base64::read::DecoderReader;
use std::io::Read;

const BASIC: &str = "Basic ";

// username only
let username = "metrics";
let header = basic_auth(username, None);

let reader = &header.as_ref()[BASIC.len()..];
let mut decoder = DecoderReader::new(reader, &BASE64_STANDARD);
let mut result = Vec::new();
decoder.read_to_end(&mut result).unwrap();
assert_eq!(b"metrics:", &result[..]);
assert!(header.is_sensitive());

// username/password
let password = "123!_@ABC";
let header = basic_auth(username, Some(password));

let reader = &header.as_ref()[BASIC.len()..];
let mut decoder = DecoderReader::new(reader, &BASE64_STANDARD);
let mut result = Vec::new();
decoder.read_to_end(&mut result).unwrap();
assert_eq!(b"metrics:123!_@ABC", &result[..]);
assert!(header.is_sensitive());
}
}

0 comments on commit b195391

Please sign in to comment.