Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement timestamp encoder #169

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
143 changes: 134 additions & 9 deletions src/encoding/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::registry::{Prefix, Registry, Unit};
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Write;
use std::time::{SystemTime, SystemTimeError, UNIX_EPOCH};

/// Encode the metrics registered with the provided [`Registry`] into the
/// provided [`Write`]r using the OpenMetrics text format.
Expand All @@ -44,10 +45,43 @@ where
Ok(())
}

/// Encode the metrics registered with the provided [`Registry`] into the
/// provided [`Write`]r using the OpenMetrics text format (with timestamps included).
pub fn encode_with_timestamps<W>(writer: &mut W, registry: &Registry) -> Result<(), std::fmt::Error>
where
W: Write,
{
registry.encode(
&mut DescriptorEncoder::new(writer)
.with_timestamp()
.map_err(|_| std::fmt::Error)?
.into(),
)?;
writer.write_str("# EOF\n")?;
Ok(())
}

#[derive(Clone, Copy)]
struct UnixTimestamp(f64);

impl UnixTimestamp {
fn now() -> Result<Self, SystemTimeError> {
let sys_time = SystemTime::now();
sys_time
.duration_since(UNIX_EPOCH)
.map(|d| Self(d.as_secs_f64()))
}

fn as_f64(&self) -> f64 {
self.0
}
}

pub(crate) struct DescriptorEncoder<'a> {
writer: &'a mut dyn Write,
prefix: Option<&'a Prefix>,
labels: &'a [(Cow<'static, str>, Cow<'static, str>)],
timestamp: Option<UnixTimestamp>,
}

impl<'a> std::fmt::Debug for DescriptorEncoder<'a> {
Expand All @@ -62,6 +96,7 @@ impl DescriptorEncoder<'_> {
writer,
prefix: Default::default(),
labels: Default::default(),
timestamp: Default::default(),
}
}

Expand All @@ -74,6 +109,7 @@ impl DescriptorEncoder<'_> {
prefix,
labels,
writer: self.writer,
timestamp: self.timestamp,
}
}

Expand Down Expand Up @@ -133,8 +169,14 @@ impl DescriptorEncoder<'_> {
unit,
const_labels: self.labels,
family_labels: None,
timestamp: self.timestamp,
})
}

fn with_timestamp(mut self) -> Result<Self, SystemTimeError> {
self.timestamp = Some(UnixTimestamp::now()?);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused. For backfilling, you would need to be able to control the timestamp, no? I.e. set it to some time in the past. Here you just set it to the current time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for having a look. Yes, my original intent for this interface was that you'd always capture the current values, as if producing a frozen exposition. I think it would make sense to accept the timestamp as an argument though.

Ok(self)
}
}

/// Helper type for [`EncodeMetric`](super::EncodeMetric), see
Expand All @@ -153,6 +195,7 @@ pub(crate) struct MetricEncoder<'a> {
unit: Option<&'a Unit>,
const_labels: &'a [(Cow<'static, str>, Cow<'static, str>)],
family_labels: Option<&'a dyn super::EncodeLabelSet>,
timestamp: Option<UnixTimestamp>,
}

impl<'a> std::fmt::Debug for MetricEncoder<'a> {
Expand All @@ -162,13 +205,20 @@ impl<'a> std::fmt::Debug for MetricEncoder<'a> {
l.encode(LabelSetEncoder::new(&mut labels).into())?;
}

f.debug_struct("Encoder")
let mut debug_struct = f.debug_struct("Encoder");

debug_struct
.field("name", &self.name)
.field("prefix", &self.prefix)
.field("unit", &self.unit)
.field("const_labels", &self.const_labels)
.field("labels", &labels.as_str())
.finish()
.field("labels", &labels.as_str());

if let Some(timestamp) = &self.timestamp {
debug_struct.field("timestamp", &timestamp.as_f64());
}

debug_struct.finish()
}
}

Expand All @@ -195,6 +245,8 @@ impl<'a> MetricEncoder<'a> {
.into(),
)?;

self.encode_timestamp()?;

if let Some(exemplar) = exemplar {
self.encode_exemplar(exemplar)?;
}
Expand All @@ -219,6 +271,8 @@ impl<'a> MetricEncoder<'a> {
.into(),
)?;

self.encode_timestamp()?;

self.newline()?;

Ok(())
Expand Down Expand Up @@ -254,6 +308,7 @@ impl<'a> MetricEncoder<'a> {
unit: self.unit,
const_labels: self.const_labels,
family_labels: Some(label_set),
timestamp: self.timestamp,
})
}

Expand All @@ -269,13 +324,15 @@ impl<'a> MetricEncoder<'a> {
self.encode_labels::<()>(None)?;
self.writer.write_str(" ")?;
self.writer.write_str(dtoa::Buffer::new().format(sum))?;
self.encode_timestamp()?;
self.newline()?;

self.write_prefix_name_unit()?;
self.write_suffix("count")?;
self.encode_labels::<()>(None)?;
self.writer.write_str(" ")?;
self.writer.write_str(itoa::Buffer::new().format(count))?;
self.encode_timestamp()?;
self.newline()?;

let mut cummulative = 0;
Expand All @@ -295,6 +352,8 @@ impl<'a> MetricEncoder<'a> {
self.writer
.write_str(itoa::Buffer::new().format(cummulative))?;

self.encode_timestamp()?;

if let Some(exemplar) = exemplars.and_then(|e| e.get(&i)) {
self.encode_exemplar(exemplar)?
}
Expand All @@ -321,12 +380,14 @@ impl<'a> MetricEncoder<'a> {
}
.into(),
)?;
self.encode_timestamp()?;
Ok(())
}

fn newline(&mut self) -> Result<(), std::fmt::Error> {
self.writer.write_str("\n")
}

fn write_prefix_name_unit(&mut self) -> Result<(), std::fmt::Error> {
if let Some(prefix) = self.prefix {
self.writer.write_str(prefix.as_str())?;
Expand Down Expand Up @@ -386,6 +447,16 @@ impl<'a> MetricEncoder<'a> {

Ok(())
}

fn encode_timestamp(&mut self) -> Result<(), std::fmt::Error> {
if let Some(timestamp) = &self.timestamp {
self.writer.write_str(" ")?;
self.writer
.write_str(dtoa::Buffer::new().format(timestamp.as_f64()))
} else {
Ok(())
}
}
}

pub(crate) struct CounterValueEncoder<'a> {
Expand Down Expand Up @@ -573,10 +644,14 @@ mod tests {
registry.register("my_counter", "My counter", counter);

let mut encoded = String::new();

encode(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand All @@ -596,6 +671,11 @@ mod tests {
assert_eq!(expected, encoded);

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand Down Expand Up @@ -625,6 +705,11 @@ mod tests {
assert_eq!(expected, encoded);

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand All @@ -634,10 +719,14 @@ mod tests {
registry.register("my_gauge", "My gauge", gauge);

let mut encoded = String::new();

encode(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand All @@ -654,10 +743,14 @@ mod tests {
.inc();

let mut encoded = String::new();

encode(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand All @@ -677,7 +770,6 @@ mod tests {
.inc();

let mut encoded = String::new();

encode(&mut encoded, &registry).unwrap();

let expected = "# HELP my_prefix_my_counter_family My counter family.\n"
Expand All @@ -688,6 +780,11 @@ mod tests {
assert_eq!(expected, encoded);

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand All @@ -706,6 +803,11 @@ mod tests {
assert_eq!(expected, encoded);

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand All @@ -716,10 +818,14 @@ mod tests {
histogram.observe(1.0);

let mut encoded = String::new();

encode(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand All @@ -736,10 +842,14 @@ mod tests {
.observe(1.0);

let mut encoded = String::new();

encode(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand Down Expand Up @@ -771,6 +881,11 @@ mod tests {
assert_eq!(expected, encoded);

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand Down Expand Up @@ -845,6 +960,11 @@ mod tests {
assert_eq!(expected, encoded);

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

#[test]
Expand Down Expand Up @@ -904,6 +1024,11 @@ mod tests {
assert_eq!(expected, encoded);

parse_with_python_client(encoded);

let mut encoded = String::new();
encode_with_timestamps(&mut encoded, &registry).unwrap();

parse_with_python_client(encoded);
}

fn parse_with_python_client(input: String) {
Expand Down