Skip to content

Commit

Permalink
Compression cpu thread pool (#4970)
Browse files Browse the repository at this point in the history
* Added a thread pool for ingest decompression

* Running small but cpu intensive tasks in a thread pool.

This PR removes usage of spawn_blocking for cpu intensive tasks.

The problem with spawn_blocking is that these tasks get scheduled on a
ever growing thread pool.

For instance, when the server is under load, the GZIP decompression of
payloads could considerably increase the load factor of quickwit,
possibly making it unresponsive to healthcheck.

This PR isolates the thread pool used for the searcher, and instantiates
a second generic thread pool dedicated to those short cpu-intensive tasks.

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <adrien@quickwit.io>

---------

Co-authored-by: Adrien Guillo <adrien@quickwit.io>
  • Loading branch information
fulmicoton and guilload committed May 11, 2024
1 parent c5bfe5c commit 770d8a7
Show file tree
Hide file tree
Showing 23 changed files with 343 additions and 216 deletions.
33 changes: 21 additions & 12 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 4 additions & 8 deletions quickwit/Cargo.toml
Expand Up @@ -187,7 +187,7 @@ pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04
quote = "1.0.23"
rand = "0.8"
rand_distr = "0.4"
rayon = "1"
rayon = "1.10"
rdkafka = { version = "0.33", default-features = false, features = [
"cmake-build",
"libz",
Expand Down Expand Up @@ -269,16 +269,12 @@ wiremock = "0.5"
zstd = "0.13.0"

aws-config = "1.2"
aws-credential-types = { version = "1.2", features = [
"hardcoded-credentials",
] }
aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] }
aws-sdk-kinesis = "1.21"
aws-sdk-s3 = "1.24"
aws-smithy-async = "1.2"
aws-smithy-runtime = "1.3"
aws-smithy-types = { version = "1.1", features = [
"byte-stream-poll-next"
] }
aws-smithy-types = { version = "1.1", features = ["byte-stream-poll-next"] }
aws-types = "1.2"

azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] }
Expand Down Expand Up @@ -321,7 +317,7 @@ quickwit-serve = { path = "quickwit-serve" }
quickwit-storage = { path = "quickwit-storage" }
quickwit-telemetry = { path = "quickwit-telemetry" }

tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "1ee5f907", default-features = false, features = [
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "6181c1e", default-features = false, features = [
"lz4-compression",
"mmap",
"quickwit",
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-common/Cargo.toml
Expand Up @@ -30,6 +30,7 @@ pin-project = { workspace = true }
pnet = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
siphasher = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/lib.rs
Expand Up @@ -43,6 +43,7 @@ pub mod stream_utils;
pub mod temp_dir;
#[cfg(any(test, feature = "testsuite"))]
pub mod test_utils;
pub mod thread_pool;
pub mod tower;
pub mod type_map;
pub mod uri;
Expand Down
49 changes: 43 additions & 6 deletions quickwit/quickwit-common/src/metrics.rs
Expand Up @@ -179,19 +179,19 @@ pub fn new_histogram_vec<const N: usize>(
HistogramVec { underlying }
}

pub struct GaugeGuard {
gauge: &'static IntGauge,
pub struct GaugeGuard<'a> {
gauge: &'a IntGauge,
delta: i64,
}

impl std::fmt::Debug for GaugeGuard {
impl<'a> std::fmt::Debug for GaugeGuard<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.delta.fmt(f)
}
}

impl GaugeGuard {
pub fn from_gauge(gauge: &'static IntGauge) -> Self {
impl<'a> GaugeGuard<'a> {
pub fn from_gauge(gauge: &'a IntGauge) -> Self {
Self { gauge, delta: 0i64 }
}

Expand All @@ -210,7 +210,44 @@ impl GaugeGuard {
}
}

impl Drop for GaugeGuard {
impl<'a> Drop for GaugeGuard<'a> {
fn drop(&mut self) {
self.gauge.sub(self.delta)
}
}

pub struct OwnedGaugeGuard {
gauge: IntGauge,
delta: i64,
}

impl std::fmt::Debug for OwnedGaugeGuard {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.delta.fmt(f)
}
}

impl OwnedGaugeGuard {
pub fn from_gauge(gauge: IntGauge) -> Self {
Self { gauge, delta: 0i64 }
}

pub fn get(&self) -> i64 {
self.delta
}

pub fn add(&mut self, delta: i64) {
self.gauge.add(delta);
self.delta += delta;
}

pub fn sub(&mut self, delta: i64) {
self.gauge.sub(delta);
self.delta -= delta;
}
}

impl Drop for OwnedGaugeGuard {
fn drop(&mut self) {
self.gauge.sub(self.delta)
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/stream_utils.rs
Expand Up @@ -233,7 +233,7 @@ where T: RpcName
}
}

pub struct InFlightValue<T>(T, #[allow(dead_code)] GaugeGuard);
pub struct InFlightValue<T>(T, #[allow(dead_code)] GaugeGuard<'static>);

impl<T> fmt::Debug for InFlightValue<T>
where T: fmt::Debug
Expand Down

0 comments on commit 770d8a7

Please sign in to comment.