Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Julius de Bruijn committed Mar 13, 2020
1 parent 15668ed commit 7406ee9
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 53 deletions.
77 changes: 38 additions & 39 deletions query-engine/connectors/query-connector/src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,51 +62,54 @@ impl Filter {
pub fn can_batch(&self) -> bool {
match self {
Self::Scalar(sf) => sf.can_batch(),
Self::And(filters) => {
let batchable_count = filters
.iter()
.map(|f| if f.can_batch() { 1 } else { 0 })
.sum::<usize>();

batchable_count == 1
}
Self::Or(filters) => {
let batchable_count = filters
.iter()
.map(|f| if f.can_batch() { 1 } else { 0 })
.sum::<usize>();

batchable_count == 1
}
Self::And(filters) => filters.iter().any(|f| f.can_batch()),
Self::Or(filters) => filters.iter().any(|f| f.can_batch()),
_ => false,
}
}

pub fn batched(self) -> Vec<Filter> {
fn inner<F>(mut filters: Vec<Filter>, f: F) -> Vec<Filter>
where
F: Fn(Vec<Filter>) -> Filter
{
let mut other = Vec::new();
let mut batched = Vec::new();
fn split_longest(mut filters: Vec<Filter>) -> (Option<ScalarFilter>, Vec<Filter>) {
let mut longest: Option<ScalarFilter> = None;
let mut other = Vec::with_capacity(filters.len());

while let Some(filter) = filters.pop() {
if filter.can_batch() {
for filter in filter.batched() {
batched.push(filter);
match (filter, longest.as_mut()) {
(Filter::Scalar(sf), Some(ref mut prev)) if sf.len() > prev.len() => {
let previous = longest.replace(sf);
other.push(Filter::Scalar(previous.unwrap()));
}
} else {
other.push(filter);
(Filter::Scalar(sf), None) if sf.can_batch() => {
longest = Some(sf);
}
(filter, _) => other.push(filter),
}
}

if batched.len() > 0 {
batched.into_iter().map(|batch| {
let mut filters = other.clone();
filters.push(batch);
(longest, other)
}

fn batch<F>(filters: Vec<Filter>, f: F) -> Vec<Filter>
where
F: Fn(Vec<Filter>) -> Filter,
{
let (longest, other) = split_longest(filters);
let mut batched = Vec::new();

f(filters)
}).collect()
if let Some(filter) = longest {
for filter in filter.batched() {
batched.push(Filter::Scalar(filter))
}

batched
.into_iter()
.map(|batch| {
let mut filters = other.clone();
filters.push(batch);

f(filters)
})
.collect()
} else {
vec![f(other)]
}
Expand All @@ -118,12 +121,8 @@ impl Filter {
.into_iter()
.map(|sf| Self::Scalar(sf))
.collect(),
Self::And(filters) => {
inner(filters, |filters| Filter::And(filters))
}
Self::Or(filters) => {
inner(filters, |filters| Filter::Or(filters))
}
Self::And(filters) => batch(filters, |filters| Filter::And(filters)),
Self::Or(filters) => batch(filters, |filters| Filter::Or(filters)),
_ => vec![self],
}
}
Expand Down
38 changes: 24 additions & 14 deletions query-engine/connectors/query-connector/src/filter/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ pub struct ScalarFilter {
pub condition: ScalarCondition,
}

const BATCH_SIZE: usize = 5000;
const BATCH_SIZE: usize = 5;

impl ScalarFilter {
pub fn can_batch(&self) -> bool {
pub fn len(&self) -> usize {
match self.condition {
ScalarCondition::In(ref l) if l.len() > BATCH_SIZE => true,
ScalarCondition::NotIn(ref l) if l.len() > BATCH_SIZE => true,
_ => false,
ScalarCondition::In(ref l) => l.len(),
ScalarCondition::NotIn(ref l) => l.len(),
_ => 1,
}
}

pub fn can_batch(&self) -> bool {
self.len() > BATCH_SIZE
}

pub fn batched(self) -> Vec<ScalarFilter> {
fn inner(list: PrismaListValue) -> Vec<PrismaListValue> {
let mut batches = Vec::with_capacity(list.len() % BATCH_SIZE + 1);
Expand All @@ -46,20 +50,26 @@ impl ScalarFilter {
ScalarCondition::In(list) => {
let projection = self.projection;

inner(list).into_iter().map(|batch| ScalarFilter {
projection: projection.clone(),
condition: ScalarCondition::In(batch)
}).collect()
inner(list)
.into_iter()
.map(|batch| ScalarFilter {
projection: projection.clone(),
condition: ScalarCondition::In(batch),
})
.collect()
}
ScalarCondition::NotIn(list) => {
let projection = self.projection;

inner(list).into_iter().map(|batch| ScalarFilter {
projection: projection.clone(),
condition: ScalarCondition::NotIn(batch)
}).collect()
inner(list)
.into_iter()
.map(|batch| ScalarFilter {
projection: projection.clone(),
condition: ScalarCondition::NotIn(batch),
})
.collect()
}
_ => vec![self]
_ => vec![self],
}
}
}
Expand Down

0 comments on commit 7406ee9

Please sign in to comment.