Skip to content

Commit

Permalink
add metric to track query execution timestamp (fixes #178) (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
albertodonato committed Dec 28, 2023
1 parent cb110cb commit 7f1e7aa
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 6 deletions.
7 changes: 7 additions & 0 deletions README.rst
Expand Up @@ -433,6 +433,11 @@ For the configuration above, the endpoint would return something like this::
query_latency_created{app="app1",database="db1",query="query1",region="us1"} 1.594544244446163e+09
query_latency_created{app="app1",database="db2",query="query2",region="us2"} 1.5945442444470239e+09
query_latency_created{app="app1",database="db1",query="query2",region="us1"} 1.594544244447551e+09
# HELP query_timestamp Query last execution timestamp
# TYPE query_timestamp gauge
query_timestamp{app="app1",database="db2",query="query2",region="us2"} 1.594544244446199e+09
query_timestamp{app="app1",database="db1",query="query1",region="us1"} 1.594544244452181e+09
query_timestamp{app="app1",database="db1",query="query2",region="us1"} 1.594544244481839e+09
# HELP metric1 A sample gauge
# TYPE metric1 gauge
metric1{app="app1",database="db1",region="us1"} -3561.0
Expand Down Expand Up @@ -489,6 +494,8 @@ The exporter provides a few builtin metrics which can be useful to track query e
``query_latency{database="db",query="q"}``:
a histogram with query latencies, per database and query.

``query_timestamp{database="db",query="q"}``:
a gauge with query last execution timestamps, per database and query.

In addition, metrics for resources usage for the exporter process can be
included by passing ``--process-stats`` in the command line.
Expand Down
16 changes: 15 additions & 1 deletion query_exporter/config.py
Expand Up @@ -52,6 +52,14 @@
type="counter",
labels=("query", "status"),
)
# metric for tracking last query execution timestamp
QUERY_TIMESTAMP_METRIC_NAME = "query_timestamp"
_QUERY_TIMESTAMP_METRIC_CONFIG = MetricConfig(
name=QUERY_TIMESTAMP_METRIC_NAME,
description="Query last execution timestamp",
type="gauge",
labels=("query",),
)
# metric for counting queries execution latency
QUERY_LATENCY_METRIC_NAME = "query_latency"
_QUERY_LATENCY_METRIC_CONFIG = MetricConfig(
Expand All @@ -61,7 +69,12 @@
labels=("query",),
)
GLOBAL_METRICS = frozenset(
[DB_ERRORS_METRIC_NAME, QUERIES_METRIC_NAME, QUERY_LATENCY_METRIC_NAME]
(
DB_ERRORS_METRIC_NAME,
QUERIES_METRIC_NAME,
QUERY_LATENCY_METRIC_NAME,
QUERY_TIMESTAMP_METRIC_NAME,
)
)

# regexp for validating environment variables names
Expand Down Expand Up @@ -165,6 +178,7 @@ def _get_metrics(
_DB_ERRORS_METRIC_CONFIG,
_QUERIES_METRIC_CONFIG,
_QUERY_LATENCY_METRIC_CONFIG,
_QUERY_TIMESTAMP_METRIC_CONFIG,
):
configs[metric_config.name] = MetricConfig(
metric_config.name,
Expand Down
16 changes: 13 additions & 3 deletions query_exporter/db.py
Expand Up @@ -5,7 +5,7 @@
from itertools import chain
import logging
import sys
from time import perf_counter
from time import perf_counter, time
from traceback import format_tb
from typing import (
Any,
Expand Down Expand Up @@ -137,15 +137,20 @@ class QueryResults(NamedTuple):

keys: list[str]
rows: list[tuple]
timestamp: float | None = None
latency: float | None = None

@classmethod
async def from_results(cls, results: AsyncResultProxy):
"""Return a QueryResults from results for a query."""
timestamp = time()
conn_info = results._result_proxy.connection.info
latency = conn_info.get("query_latency", None)
return cls(
await results.keys(), await results.fetchall(), latency=latency
await results.keys(),
await results.fetchall(),
timestamp=timestamp,
latency=latency,
)


Expand All @@ -161,6 +166,7 @@ class MetricResults(NamedTuple):
"""Collection of metric results for a query."""

results: list[MetricResult]
timestamp: float | None = None
latency: float | None = None


Expand Down Expand Up @@ -224,7 +230,11 @@ def results(self, query_results: QueryResults) -> MetricResults:
)
results.append(metric_result)

return MetricResults(results, latency=query_results.latency)
return MetricResults(
results,
timestamp=query_results.timestamp,
latency=query_results.latency,
)

def _check_schedule(self):
if self.interval and self.schedule:
Expand Down
16 changes: 16 additions & 0 deletions query_exporter/loop.py
Expand Up @@ -26,6 +26,7 @@
DB_ERRORS_METRIC_NAME,
QUERIES_METRIC_NAME,
QUERY_LATENCY_METRIC_NAME,
QUERY_TIMESTAMP_METRIC_NAME,
Config,
)
from .db import (
Expand Down Expand Up @@ -206,6 +207,10 @@ async def _execute_query(self, query: Query, dbname: str):
self._update_query_latency_metric(
db, query, metric_results.latency
)
if metric_results.timestamp:
self._update_query_timestamp_metric(
db, query, metric_results.timestamp
)
self._increment_queries_count(db, query, "success")

async def _remove_if_dooomed(self, query: Query, dbname: str) -> bool:
Expand Down Expand Up @@ -306,6 +311,17 @@ def _update_query_latency_metric(
labels={"query": query.config_name},
)

def _update_query_timestamp_metric(
self, database: DataBase, query: Query, timestamp: float
):
"""Update timestamp metric for a query on a database."""
self._update_metric(
database,
QUERY_TIMESTAMP_METRIC_NAME,
timestamp,
labels={"query": query.config_name},
)

def _now(self) -> datetime:
"""Return the current time with local timezone."""
return datetime.now().replace(tzinfo=gettz())
Expand Down
3 changes: 3 additions & 0 deletions tests/db_test.py
@@ -1,5 +1,6 @@
import asyncio
import logging
import time

import pytest
from sqlalchemy import create_engine
Expand Down Expand Up @@ -306,6 +307,7 @@ async def test_from_results(self):
assert query_results.keys == ["a", "b"]
assert query_results.rows == [(1, 2)]
assert query_results.latency is None
assert query_results.timestamp < time.time()

@pytest.mark.asyncio
async def test_from_results_with_latency(self):
Expand All @@ -319,6 +321,7 @@ async def test_from_results_with_latency(self):
assert query_results.keys == ["a", "b"]
assert query_results.rows == [(1, 2)]
assert query_results.latency == 1.2
assert query_results.timestamp < time.time()


@pytest.fixture
Expand Down
11 changes: 9 additions & 2 deletions tests/loop_test.py
Expand Up @@ -59,10 +59,11 @@ async def make_query_loop(tmp_path, config_data, registry):
def make_loop():
config_file = tmp_path / "config.yaml"
config_file.write_text(yaml.dump(config_data), "utf-8")
logger = logging.getLogger()
with config_file.open() as fh:
config = load_config(fh, logging.getLogger())
config = load_config(fh, logger)
registry.create_metrics(config.metrics.values())
query_loop = loop.QueryLoop(config, registry, logging)
query_loop = loop.QueryLoop(config, registry, logger)
query_loops.append(query_loop)
return query_loop

Expand Down Expand Up @@ -300,6 +301,9 @@ async def test_run_query_log(self, caplog, query_tracker, query_loop):
re_match(
r'updating metric "query_latency" observe .* \{database="db",query="q"\}'
),
re_match(
r'updating metric "query_timestamp" set .* \{database="db",query="q"\}'
),
'updating metric "queries" inc 1 {database="db",query="q",status="success"}',
]

Expand All @@ -320,6 +324,9 @@ async def test_run_query_log_labels(
re_match(
r'updating metric "query_latency" observe .* \{database="db",query="q"\}'
),
re_match(
r'updating metric "query_timestamp" set .* \{database="db",query="q"\}'
),
'updating metric "queries" inc 1 {database="db",query="q",status="success"}',
]

Expand Down

0 comments on commit 7f1e7aa

Please sign in to comment.