From 54f27e3b9857b76694f3b9d64ff32ad015ec1b87 Mon Sep 17 00:00:00 2001 From: Julius de Bruijn Date: Wed, 11 Mar 2020 18:09:36 +0100 Subject: [PATCH] Batch big IN/NOT IN selections --- Cargo.lock | 20 ++---- .../sql-introspection-connector/Cargo.toml | 3 +- libs/prisma-models/Cargo.toml | 2 +- libs/user-facing-error-macros/Cargo.toml | 2 +- .../connectors/query-connector/Cargo.toml | 2 +- .../query-connector/src/filter/mod.rs | 69 +++++++++++++++++++ .../query-connector/src/filter/scalar.rs | 49 +++++++++++++ .../query-connector/src/query_arguments.rs | 33 +++++++++ .../connectors/sql-query-connector/Cargo.toml | 1 + .../src/database/operations/read.rs | 32 ++++++--- 10 files changed, 186 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cde5f6ceed79..bdb828e68822 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,9 +123,9 @@ dependencies = [ [[package]] name = "backtrace-sys" -version = "0.1.33" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e17b52e737c40a7d75abca20b29a19a0eb7ba9fc72c5a72dd282a0a3c2c0dc35" +checksum = "ca797db0057bae1a7aa2eef3283a874695455cecf08a43bfb8507ee0ebc1ed69" dependencies = [ "cc", "libc", @@ -229,15 +229,6 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" -[[package]] -name = "c2-chacha" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "214238caa1bf3a496ec3392968969cab8549f96ff30652c9e56885329315f6bb" -dependencies = [ - "ppv-lite86", -] - [[package]] name = "cc" version = "1.0.50" @@ -2224,11 +2215,11 @@ dependencies = [ [[package]] name = "rand_chacha" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03a2a90da8c7523f554344f921aa97283eadf6ac484a6d2a7d0212fa7f8d6853" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" dependencies = [ - "c2-chacha", + "ppv-lite86", "rand_core", ] @@ -2627,6 +2618,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", + "tokio", "url 2.1.1", "user-facing-errors", "uuid", diff --git a/introspection-engine/connectors/sql-introspection-connector/Cargo.toml b/introspection-engine/connectors/sql-introspection-connector/Cargo.toml index 3737a45ee8d5..dbdd28bc4da3 100644 --- a/introspection-engine/connectors/sql-introspection-connector/Cargo.toml +++ b/introspection-engine/connectors/sql-introspection-connector/Cargo.toml @@ -22,7 +22,7 @@ user-facing-errors = { path = "../../../libs/user-facing-errors", features = ["s tracing = "0.1.10" tracing-futures = "0.2.0" tokio = { version = "0.2", features = ["rt-threaded", "time"] } -once_cell = "1.3.1" +once_cell = "1.3" [dependencies.quaint] git = "https://github.com/prisma/quaint" @@ -32,5 +32,4 @@ features = ["single"] barrel = { version = "0.6.5-alpha.0", features = ["sqlite3", "mysql", "pg"] } test-macros = { path = "../../../libs/test-macros" } test-setup = { path = "../../../libs/test-setup" } -once_cell = "1.2.0" pretty_assertions = "0.6.1" diff --git a/libs/prisma-models/Cargo.toml b/libs/prisma-models/Cargo.toml index d4ec36ac5915..1b4679eea5ec 100644 --- a/libs/prisma-models/Cargo.toml +++ b/libs/prisma-models/Cargo.toml @@ -9,7 +9,7 @@ default = [] sql-ext = ["quaint"] [dependencies] -once_cell = "1.2" +once_cell = "1.3" serde_derive = "1.0" serde_json = "1.0" serde = "1.0" diff --git a/libs/user-facing-error-macros/Cargo.toml b/libs/user-facing-error-macros/Cargo.toml index d42821361cf1..61afb80b2f65 100644 --- a/libs/user-facing-error-macros/Cargo.toml +++ b/libs/user-facing-error-macros/Cargo.toml @@ -14,5 +14,5 @@ darling = "0.10.1" syn = "1.0.5" quote = "1.0.2" proc-macro2 = "1.0.6" -once_cell = "1.2.0" +once_cell = "1.3" regex = "1.3.1" diff --git a/query-engine/connectors/query-connector/Cargo.toml b/query-engine/connectors/query-connector/Cargo.toml index 842ee6f781bd..01337a31a58b 100644 --- a/query-engine/connectors/query-connector/Cargo.toml +++ b/query-engine/connectors/query-connector/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -once_cell = "1.2" +once_cell = "1.3" prisma-models = { path = "../../../libs/prisma-models" } prisma-value = { path = "../../../libs/prisma-value" } failure = { version = "0.1", features = ["derive"] } diff --git a/query-engine/connectors/query-connector/src/filter/mod.rs b/query-engine/connectors/query-connector/src/filter/mod.rs index 907c9f68974b..8499499e8a47 100644 --- a/query-engine/connectors/query-connector/src/filter/mod.rs +++ b/query-engine/connectors/query-connector/src/filter/mod.rs @@ -58,6 +58,75 @@ impl Filter { _ => 1, } } + + pub fn can_batch(&self) -> bool { + match self { + Self::Scalar(sf) => sf.can_batch(), + Self::And(filters) => { + let batchable_count = filters + .iter() + .map(|f| if f.can_batch() { 1 } else { 0 }) + .sum::(); + + batchable_count == 1 + } + Self::Or(filters) => { + let batchable_count = filters + .iter() + .map(|f| if f.can_batch() { 1 } else { 0 }) + .sum::(); + + batchable_count == 1 + } + _ => false, + } + } + + pub fn batched(self) -> Vec { + fn inner(mut filters: Vec, f: F) -> Vec + where + F: Fn(Vec) -> Filter + { + let mut other = Vec::new(); + let mut batched = Vec::new(); + + while let Some(filter) = filters.pop() { + if filter.can_batch() { + for filter in filter.batched() { + batched.push(filter); + } + } else { + other.push(filter); + } + } + + if batched.len() > 0 { + batched.into_iter().map(|batch| { + let mut filters = other.clone(); + filters.push(batch); + + f(filters) + }).collect() + } else { + vec![f(other)] + } + } + + match self { + Self::Scalar(sf) => sf + .batched() + .into_iter() + .map(|sf| Self::Scalar(sf)) + .collect(), + Self::And(filters) => { + inner(filters, |filters| Filter::And(filters)) + } + Self::Or(filters) => { + inner(filters, |filters| Filter::Or(filters)) + } + _ => vec![self], + } + } } impl From for Filter { diff --git a/query-engine/connectors/query-connector/src/filter/scalar.rs b/query-engine/connectors/query-connector/src/filter/scalar.rs index 6e52034c8054..0a0b34a8ae85 100644 --- a/query-engine/connectors/query-connector/src/filter/scalar.rs +++ b/query-engine/connectors/query-connector/src/filter/scalar.rs @@ -15,6 +15,55 @@ pub struct ScalarFilter { pub condition: ScalarCondition, } +const BATCH_SIZE: usize = 5000; + +impl ScalarFilter { + pub fn can_batch(&self) -> bool { + match self.condition { + ScalarCondition::In(ref l) if l.len() > BATCH_SIZE => true, + ScalarCondition::NotIn(ref l) if l.len() > BATCH_SIZE => true, + _ => false, + } + } + + pub fn batched(self) -> Vec { + fn inner(list: PrismaListValue) -> Vec { + let mut batches = Vec::with_capacity(list.len() % BATCH_SIZE + 1); + batches.push(Vec::with_capacity(BATCH_SIZE)); + + for (idx, item) in list.into_iter().enumerate() { + if idx != 0 && idx % BATCH_SIZE == 0 { + batches.push(Vec::with_capacity(BATCH_SIZE)); + } + + batches.last_mut().unwrap().push(item); + } + + batches + } + + match self.condition { + ScalarCondition::In(list) => { + let projection = self.projection; + + inner(list).into_iter().map(|batch| ScalarFilter { + projection: projection.clone(), + condition: ScalarCondition::In(batch) + }).collect() + } + ScalarCondition::NotIn(list) => { + let projection = self.projection; + + inner(list).into_iter().map(|batch| ScalarFilter { + projection: projection.clone(), + condition: ScalarCondition::NotIn(batch) + }).collect() + } + _ => vec![self] + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum ScalarCondition { Equals(PrismaValue), diff --git a/query-engine/connectors/query-connector/src/query_arguments.rs b/query-engine/connectors/query-connector/src/query_arguments.rs index 1bc7ac07c930..7d470bffb84a 100644 --- a/query-engine/connectors/query-connector/src/query_arguments.rs +++ b/query-engine/connectors/query-connector/src/query_arguments.rs @@ -65,6 +65,39 @@ impl QueryArguments { }, } } + + pub fn can_batch(&self) -> bool { + self.filter + .as_ref() + .map(|filter| filter.can_batch()) + .unwrap_or(false) + } + + pub fn batched(self) -> Vec { + match self.filter { + Some(filter) => { + let after = self.after; + let before = self.before; + let skip = self.skip; + let first = self.first; + let last = self.last; + let order_by = self.order_by; + + filter.batched().into_iter().map(|filter| { + QueryArguments { + after: after.clone(), + before: before.clone(), + skip: skip.clone(), + first: first.clone(), + last: last.clone(), + filter: Some(filter), + order_by: order_by.clone(), + } + }).collect() + }, + _ => vec![self] + } + } } impl From for QueryArguments diff --git a/query-engine/connectors/sql-query-connector/Cargo.toml b/query-engine/connectors/sql-query-connector/Cargo.toml index f61b5e81cd0e..d1316ed7908b 100644 --- a/query-engine/connectors/sql-query-connector/Cargo.toml +++ b/query-engine/connectors/sql-query-connector/Cargo.toml @@ -15,6 +15,7 @@ log = "0.4" async-trait = "0.1" futures = "0.3" rust_decimal = "=1.1.0" +tokio = "0.2" [dependencies.quaint] git = "https://github.com/prisma/quaint" diff --git a/query-engine/connectors/sql-query-connector/src/database/operations/read.rs b/query-engine/connectors/sql-query-connector/src/database/operations/read.rs index 4e955df7b5c1..56b3a639256e 100644 --- a/query-engine/connectors/sql-query-connector/src/database/operations/read.rs +++ b/query-engine/connectors/sql-query-connector/src/database/operations/read.rs @@ -5,6 +5,7 @@ use crate::{ use connector_interface::*; use prisma_models::*; use quaint::ast::*; +use futures::future; pub async fn get_single_record( conn: &dyn QueryExt, @@ -36,14 +37,29 @@ pub async fn get_many_records( ) -> crate::Result { let field_names = selected_fields.db_names().map(String::from).collect(); let idents: Vec<_> = selected_fields.types().collect(); - let query = read::get_records(model, selected_fields.columns(), query_arguments); - - let records = conn - .filter(query.into(), idents.as_slice()) - .await? - .into_iter() - .map(Record::from) - .collect(); + let mut records = Vec::new(); + + if query_arguments.can_batch() { + let batches = query_arguments.batched(); + let mut futures = Vec::with_capacity(batches.len()); + + for args in batches.into_iter() { + let query = read::get_records(model, selected_fields.columns(), args); + futures.push(conn.filter(query.into(), idents.as_slice())); + } + + for result in future::join_all(futures).await.into_iter() { + for item in result?.into_iter() { + records.push(Record::from(item)) + } + } + } else { + let query = read::get_records(model, selected_fields.columns(), query_arguments); + + for item in conn.filter(query.into(), idents.as_slice()).await?.into_iter() { + records.push(Record::from(item)) + } + } Ok(ManyRecords { records, field_names }) }