Skip to content

Pushdown of complex operations

Martin Traverso edited this page Feb 22, 2019 · 12 revisions

The intent of this document is to capture high-level thoughts and ideas about how to add support for pushing down complex operations (filters, projections, aggregations, joins, etc) into connectors and allowing connectors to surface additional filters (for row-level security) or projections (for masking) during query planning.

Goals and non-goals:

  • This is explicitly a short-term solution. In the long term, we want connectors to be able to provide transformation rules that get evaluated during the optimization loop.
  • It's not a goal to build a generic mechanism to support every operation.
  • We need to be able to start simple and improve iteratively (near-term improvements should, ideally, be additive).
  • It should be implemented using the existing Rule/Iterative optimizer framework instead of visitor-based PlanOptimizers.

Pushdown

The general idea is to introduce a new set of transformation rules, each of which fires on patterns such as filter(tableScan(...)), project(tableScan(...)), etc. Each rule would be responsible for pushing down the corresponding type of operation into the table scan. Examples (names TBD) would be: PushFilterIntoConnector, PushProjectionIntoConnector, PushAggregationIntoConnector, PushJoinIntoConnector.

These rules interact with connectors via a set of specialized metadata calls. Below is a straw-man for what they might look like (names and concrete signatures TBD). If a connector doesn't support or understand the action, result of the call would indicate so. The language to express filters, projections, aggregates, join criteria, etc. is also TBD.

Filter pushdown

Metadata.pushFilter(TableHandle, Filter) => TableHandle + remaining filter + new projections

Returns a new table handle and any part of the filter that the connector doesn't understand or support. The result TableHandle represents a table with the same schema as the input TableHandle.

The optimization rule uses this method to transform filter(f1, tablescan(th1)) into filter(f2, tablescan(th2)).

Example 1

Let's say a connector knows how to handle LIKE expressions. Given the following plan fragment,

- Filter (a like 'xy%z%' AND f(b)) :: (a varchar, b bigint)
  - TableScan (TH(0)) :: (a varchar, b bigint)
    a = CH(0)
    b = CH(1)

PushFilterIntoTableScan would call:

Metadata.pushFilter(
  TH(0), 
  call("and", 
      call("like", CH(0), 'xy%z%'), 
      call("f", CH(1))))

which would return

new table handle: TH(0')
remaining filter: call("f", CH(1))

The rule would then replace the original fragment with:

- Filter (f(b)) :: (a varchar, b bigint)
  - TableScan (TH(0')) :: (a varchar, b bigint)
    a = CH(0)
    b = CH(1)

Example 2

Connector knows how to handle sub-expression in filter.

- Filter (a.x > 10) :: (a row(x bigint, y bigint))
  - TableScan (TH(0)) :: (a row(x bigint, y bigint))
    a = CH(0)

PushFilterIntoTableScan would call:

Metadata.pushFilter(
  TH(0), 
  call(">", 
      dereference(CH(0), "x"), 
      10)

which would return

new table handle: TH(0')
remaining filter: call(">", CH(1), 10)
new projections: [ CH(1) :: bigint ]

The rule would then replace the original fragment with:

- Project [a] :: (a row(x bigint, y bigint))
  - Filter (s > 10) :: (a row(x bigint, y bigint), s bigint)
    - TableScan (TH(0')) :: (a row(x bigint, y bigint), s bigint)
      a = CH(0)
      s = CH(1)

Projection pushdown

Metadata.pushProjection(TableHandle, Projections) -> TableHandle + new projections

Returns a new table handle and a new set of projections that include any projection that aren't supported by the connector.

Let's say a connector knows how to handle dereference expressions. Given the following plan fragment,

- Project [a.x, b + 1] :: (r varchar, s bigint)
  - TableScan (TH(0)) :: (a row(x varchar, y bigint), b bigint)
    a = CH(0)
    b = CH(1)

PushProjectIntoTableScan would call:

Metadata.pushProjection(
  TH(0), 
  [ 
     dereference(CH(0), "x"),
     add(CH(1), 1)
  ])

which would return

new table handle: TH(0')
new projections: [
    CH(0') :: varchar,
    add(CH(1), 1) :: bigint
]

The rule would then replace the original fragment with:

- Project [c, b + 1] :: (r varchar, s bigint)
  - TableScan (TH(0')) :: (c varchar, b bigint)
    c = CH(0')
    b = CH(1)

Aggregation pushdown

Metadata.pushAggregation(TableHandle, partial?, <group-by columns>, <aggregates>) -> TableHandle + new column handles for aggregates

Example

Given this initial plan fragment:

- Aggregation[SingleStep] :: (k varchar, x bigint, y double)
    group by: [k]
    aggregates: [x = sum(a), y = avg(b)]
  - TableScan(TH(0)) :: (k varchar, a bigint, b bigint)
     k = CH(0)
     a = CH(1)
     b = CH(2)

The rule calls:

Metadata.pushAggregation(
  TH(0),
  false
  [CH(0)],
  [ 
    aggregate("sum", CH(1)), 
    aggregate("avg", CH(2)) 
  ])

which returns:

new table handle: TH(0')
new column handles for aggregates: [CH(3), CH(4)]

The fragment is rewritten to:

- TableScan(TH(0')) :: (k varchar, x bigint, y double)
   k = CH(0)
   x = CH(3)
   z = CH(4)

Join pushdown

TODO

Metadata.pushJoin(TableHandle, TableHandle, JoinType, Criteria) -> TableHandle + ???

Exposing filter/projections for security-based filtering and masking

During analysis and initial query planning, connectors can provide additional filters or projections to evaluate immediately after the data is produced during query execution. This can be used to implement row-level filtering based on security rules or masking for sensitive data.

Metadata.getAdditionalFilter(TableHandle) -> Filter

TODO: projections for masking

Miscellaneous

  • We should deprecate TableLayouts. They complicate the optimizer and can be replaced by a combination of "pushFilter" above plus a an API to obtain (physical) properties of a table during query optimization. E.g.,
Metadata.getProperties(TableHandle)
  • TupleDomain is an abstraction we added to deal with the limitation of not being able to communicate more complex expressions to connectors. With these new APIs, TupleDomain is no longer necessary and we can remove it from the APIs. We'll need to add utility libraries or a toolkit for connector writers to manipulate and extract features out of the expressions.

Open questions

  • How to handle a plan like this one when the filter cannot be pushed into the connector? The generalized predicate pushdown rules moves filters below projections, so adding a rule that moves a projection below a filter would result in non-converging optimization under the IterativeOptimizer. In the long term, it won't be an issue since the optimizer will be able to consider multiple plans simultaneously. One option in the short term is to have specialized rules that match "project(filter(tablescan)))" and handle that specific scenario.
- Project (a.x)
  - Filter (f(b))
    - TableScan (TH(0))