Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection pushdown optimization #4180

Open
wants to merge 48 commits into
base: main
Choose a base branch
from

Conversation

balavinaithirthan
Copy link
Contributor

@balavinaithirthan balavinaithirthan commented May 2, 2024

Here, we implement projection pushdown, allowing the select operator to get pushed up the pipeline and to allow for optimization in feather and parquet parse. In this PR, we modified every inherited optimize function to include a selection type, we allowed for feather and parquet to read only selected columns, and considered cases where select can be pushed up. This is a good starting point for the summarize and put operators because the framework for moving selection information has been implemented.
Semantically, an empty columnar selection means that no selection should be performed. Thus, special care must be taken when dealing with a null selection. Here, a null selection acts as a blocker. Further, when returning an optimize result, std::null opt, the selection is blocked. Thus, order_invariant and do_not_optimize are blockers

Currently, only slice, batch, and pass allow the selection to move upstream. More should be added carefully.

@balavinaithirthan balavinaithirthan force-pushed the topic/projectionPushdownOptimization branch from 95f199a to 9c32232 Compare May 2, 2024 15:07
@balavinaithirthan balavinaithirthan added performance Improvements or regressions of performance improvement An incremental enhancement of an existing feature labels May 22, 2024
libtenzir/src/pipeline.cpp Outdated Show resolved Hide resolved
libtenzir/src/pipeline.cpp Outdated Show resolved Hide resolved
libtenzir/include/tenzir/columnar_selection.hpp Outdated Show resolved Hide resolved
libtenzir/include/tenzir/selection.hpp Outdated Show resolved Hide resolved
libtenzir/include/tenzir/selection.hpp Outdated Show resolved Hide resolved
libtenzir/include/tenzir/selection.hpp Outdated Show resolved Hide resolved
libtenzir/include/tenzir/selection.hpp Outdated Show resolved Hide resolved
libtenzir/include/tenzir/plugin.hpp Show resolved Hide resolved
libtenzir/include/tenzir/pipeline.hpp Show resolved Hide resolved
libtenzir/builtins/formats/feather.cpp Outdated Show resolved Hide resolved
libtenzir/builtins/operators/decapsulate.cpp Show resolved Hide resolved
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can discard type.cpp changes

libtenzir/include/tenzir/plugin.hpp Outdated Show resolved Hide resolved
@balavinaithirthan balavinaithirthan marked this pull request as ready for review May 29, 2024 15:01
Comment on lines +332 to +333
/// this | select columns | sink
/// <=> select columns | this | sink
Copy link
Contributor

@jachris jachris May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optimize accepts an select_optimization and returns a std::optional<select_optimization>. Can you phrase the semantic requirements while considering (A) the optionality of the output selection, and (B) that the input columns may be different from the output columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this sound better (italics is new part)?

#Projection Pushdown

The selection is a projection pushdown represented as a vector of
columns to select. This vector can be moved around a pipeline similar to an
expression. The implementation must ensure that the selected columns are
preserved in the correct order throughout the pipeline processing stages. For an
otherwise unknown pipeline sink, the following equivalences must hold :

this  | select columns | sink <=> select columns | this | sink

The input columns will differ from the output columns(i.e., if columns 0 and 4 are
selected, then 1, 2, and 3 remain), but the
individual parse operator must handle this functionality. Optimize allows a modified parser with specified
columns to be returned.

If the operator modifies the schema of the events, the
selection must be adjusted accordingly to ensure that the specified columns are
correctly mapped and preserved in the resulting schema. The pipeline
implementation can block the upward movement of the select operator by
passing a std::nullopt into the optimized result. Semantically, an empty vector
of selection means no selection was provided, whereas a nullopt selection means
the prior selection is being blocked.

#Example

For an operator that filters and then selects columns,
the optimized output must ensure that the filter is applied before the
selection to maintain the correct semantics :

where expr | OPT | select columns | sink <==> where expr | select columns |
OPT | sink

This ensures that filtering and selection are treated as distinct operations,
and the correct behavior is preserved in the optimized pipeline.

Comment on lines +87 to +99
auto config_fields = config_.fields;
auto selection_fields = selection.fields_of_interest;
std::sort(config_fields.begin(), config_fields.end());
std::sort(selection_fields.begin(), selection_fields.end());
std::vector<std::string> intersection;
std::set_intersection(config_fields.begin(), config_fields.end(),
selection_fields.begin(), selection_fields.end(),
std::back_inserter(intersection));
if (intersection.empty()) {
return optimize_result{filter, order, copy(), std::nullopt};
}
return optimize_result{filter, order, nullptr,
select_optimization(intersection)};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this assumes that we are free to reorder fields as part of the optimization, right? I would need to be convinced that this is a good choice, as field order is very visible to the user when looking at the output. The more conservative choice would be to require that the optimization does not change the order of fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this just says that the event_order passed from the previous operator will continue to get passed through the select operator to the next operator. Select currently doesn't do anything with order. Do you think there are cases where select should change the order (changing to ordered for example)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goals

  1. Modify every inherited optimize function to include a selection type
  2. Start with optimizing selection on feather read
  • select.cpp: should push up projection information
  • read: should push projection into feather read, return the replacement
  • what additional logic is needed when we have where | select and select | where? What metadata is needed to be stored in the selection type. Is it possible to optimize where and select for feather? How is row extraction possible?
  • How do we deal with nested selection? Perhaps some sort of recursion? Or perhaps linking multiple select together.

Comments

  • There is no simple way to force a nested read of Feather data. A top-level approach may be the only option
  • For export and other operators where nesting is possible, the current logic is for the selection object to contain all nesting information and the operator using the selection information (ie. read/export) should create an array of replacements instead of a single one.
  • An alternative is having the selection information as just a string and let the individual read/export plugin deal with the nesting structure. The issue with this approach is if multiple pruning are required, the parsing logic will be encapsulated in an ugly loop.
  • Parquet reading allows for a similar "row_group_indices and column_group_indices". The where operator can also be applied here! The difficult part here is getting the values of a specific row. Look into how the where filter can be pushed up for this

The parquet and feather optimizations are implemented, but now the semantics of moving the select operator through the pipeline is proving difficult.

  1. Concept of no_columnar_selection: empty fields_of_interest, do_not_optimize_select is true
  2. Blocking_selection: fields of interests exists, do_not_optimize_select is true
  3. Selection moving up pipe: field of interests exists, and do_not_optimize is false

On start up: the default selection is no_columnar_selection

  1. do_not_optimize returns a null_opt for the the selection, so the previous current columnar selection holds and is allowed to pass through
  2. If a blocking_selection is introduced, a select operator is added there and current selection becomes a blocking one. Perhaps it is better to turn it to no selection so a later operator can use

Operators that need to return a blocking selection (turn do_not_optimize flag on)

  1. Where
  2. ...

Questions and todo

  1. What to do with located in parquet/feather
  2. Change order_invarient to return a null opt for selection
  3. Create equality operator for the columnar_selection struct
  4. make selection const ref
  5. include std::move everywhere
  6. can do_not_optimize, use blocking optimize?
  7. Should all order invariants be a blocker

libtenzir/builtins/operators/decapsulate.cpp Show resolved Hide resolved
std::set_intersection(config_fields.begin(), config_fields.end(),
selection_fields.begin(), selection_fields.end(),
std::back_inserter(intersection));
if (intersection.empty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take another look at how to deal with empty intersection of selection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's your plan for it?

Comment on lines +87 to +99
auto config_fields = config_.fields;
auto selection_fields = selection.fields_of_interest;
std::sort(config_fields.begin(), config_fields.end());
std::sort(selection_fields.begin(), selection_fields.end());
std::vector<std::string> intersection;
std::set_intersection(config_fields.begin(), config_fields.end(),
selection_fields.begin(), selection_fields.end(),
std::back_inserter(intersection));
if (intersection.empty()) {
return optimize_result{filter, order, copy(), std::nullopt};
}
return optimize_result{filter, order, nullptr,
select_optimization(intersection)};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this just says that the event_order passed from the previous operator will continue to get passed through the select operator to the next operator. Select currently doesn't do anything with order. Do you think there are cases where select should change the order (changing to ordered for example)?

Comment on lines +332 to +333
/// this | select columns | sink
/// <=> select columns | this | sink
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this sound better (italics is new part)?

#Projection Pushdown

The selection is a projection pushdown represented as a vector of
columns to select. This vector can be moved around a pipeline similar to an
expression. The implementation must ensure that the selected columns are
preserved in the correct order throughout the pipeline processing stages. For an
otherwise unknown pipeline sink, the following equivalences must hold :

this  | select columns | sink <=> select columns | this | sink

The input columns will differ from the output columns(i.e., if columns 0 and 4 are
selected, then 1, 2, and 3 remain), but the
individual parse operator must handle this functionality. Optimize allows a modified parser with specified
columns to be returned.

If the operator modifies the schema of the events, the
selection must be adjusted accordingly to ensure that the specified columns are
correctly mapped and preserved in the resulting schema. The pipeline
implementation can block the upward movement of the select operator by
passing a std::nullopt into the optimized result. Semantically, an empty vector
of selection means no selection was provided, whereas a nullopt selection means
the prior selection is being blocked.

#Example

For an operator that filters and then selects columns,
the optimized output must ensure that the filter is applied before the
selection to maintain the correct semantics :

where expr | OPT | select columns | sink <==> where expr | select columns |
OPT | sink

This ensures that filtering and selection are treated as distinct operations,
and the correct behavior is preserved in the optimized pipeline.

Copy link
Member

@dominiklohmann dominiklohmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this massive undertaking. This is a great piece of work! 🚀

I went through everything in code once now and have left a bunch of comments that need addressing. I think it's only minor things from here on out.

@@ -203,6 +203,7 @@ auto exec_pipeline(std::string content,
return std::move(implicit_pipe.error());
}
pipe = std::move(*implicit_pipe);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Comment on lines +368 to +370
optimize_parser_result(std::unique_ptr<plugin_parser> replacement,
bool selection_optimized, bool filter_optimized,
bool order_optimized)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the reader of this code, how am I supposed to know which boolean is which?

See also this section of the C++ Core Guidelines: http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Ri-typed

std::set_intersection(config_fields.begin(), config_fields.end(),
selection_fields.begin(), selection_fields.end(),
std::back_inserter(intersection));
if (intersection.empty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's your plan for it?

Comment on lines +92 to +94
std::set_intersection(config_fields.begin(), config_fields.end(),
selection_fields.begin(), selection_fields.end(),
std::back_inserter(intersection));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intersection works incorrectly when using type extractors. Consider this example:

…
| select :ip
| select src_ip

You cannot know the type of the field src_ip ahead of time. The intersection is only empty when src_ip does not exist in the input event, or there are no IP type fields, or when src_ip is not of type IP.

I think we need to back to the drawing board to exactly figure out whether we can optimize this at all for TQL1.

@@ -6,6 +6,8 @@
// SPDX-FileCopyrightText: (c) 2021 The Tenzir Contributors
// SPDX-License-Identifier: BSD-3-Clause

#include "tenzir/select_optimization.hpp"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include with angle brackets for external library includes (which libtenzir is here, as where is a plugin).

-> optimize_result override {
(void)filter;
if (order == event_order::ordered) {
auto parser_opt = parser_->optimize(filter, order, selection);
if (!parser_opt.replacement) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when parser_opt.selection_optimized is true and parser_opt.replacement is false?

@@ -6,6 +6,8 @@
// SPDX-FileCopyrightText: (c) 2023 The Tenzir Contributors
// SPDX-License-Identifier: BSD-3-Clause

#include "tenzir/pipeline.hpp"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#include "tenzir/pipeline.hpp"
#include <tenzir/pipeline.hpp>

Comment on lines +1203 to +1204
return {std::make_unique<json_parser>(std::move(args)), false, false,
false};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, one of the false, false, false indicates that the order was not optimized. But it was optimized right above this line.

Comment on lines +319 to +391
while (true) {
auto required_size
= detail::narrow_cast<size_t>(schema_stream_decoder.next_required_size());
auto payload = byte_reader(required_size);
if (!payload) {
co_yield {};
continue;
}
truncated_bytes += payload->size();
if (payload->size() < required_size) {
if (truncated_bytes != 0 and payload->size() != 0) {
// Ideally this always would be just a warning, but the stream decoder
// happily continues to consume invalid bytes. E.g., trying to read a
// JSON file with this parser will just swallow all bytes, emitting this
// one error at the very end. Not a single time does consuming a buffer
// actually fail. We should probably look into limiting the memory usage
// here, as the stream decoder will keep consumed-but-not-yet-converted
// buffers in memory.
diagnostic::warning("truncated {} trailing bytes", truncated_bytes)
.severity(decoded_once ? severity::warning : severity::error)
.emit(ctrl.diagnostics());
}
co_return;
}
auto arrow_buffer = as_arrow_buffer(std::move(payload));
buffered_schema_data.push_back(arrow_buffer);
auto decode_result = schema_stream_decoder.Consume(std::move(arrow_buffer));
if (!decode_result.ok()) {
diagnostic::error("{}", decode_result.ToStringWithoutContextLines())
.note("failed to decode the byte stream into a record batch")
.emit(ctrl.diagnostics());
co_return;
}
if (schema_listener->decoded_schema) {
decoded_once = false;
schema = type::from_arrow(*schema_listener->decoded_schema);
break;
}
}
auto indices = std::vector<tenzir::offset>{};
if (!selection.inner.fields_of_interest.empty()) {
for (const auto& field : selection.inner.fields_of_interest) {
for (auto index : schema.resolve(field)) {
if (index.size() > 1) {
further_selection_needed = true;
}
read_options.included_fields.push_back(static_cast<int>(index[0]));
indices.push_back(std::move(index));
}
}
std::sort(indices.begin(), indices.end());
indices.erase(std::unique(indices.begin(), indices.end()), indices.end());
std::sort(read_options.included_fields.begin(),
read_options.included_fields.end());
read_options.included_fields.erase(
std::unique(read_options.included_fields.begin(),
read_options.included_fields.end()),
read_options.included_fields.end());
for (size_t i = 0; i < indices.size(); i++) {
indices[i][0] = i;
}
}
auto listener = std::make_shared<callback_listener>();
auto stream_decoder = arrow::ipc::StreamDecoder(listener, read_options);
for (auto&& buffer : buffered_schema_data) {
auto decode_result = stream_decoder.Consume(buffer);
if (!decode_result.ok()) {
diagnostic::error("{}", decode_result.ToStringWithoutContextLines())
.note("failed to decode the byte stream into a record batch")
.emit(ctrl.diagnostics());
co_return;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is almost exactly duplicated from right below. Can we make this easier to maintain?

Comment on lines +200 to +211
auto opt = op.optimize(current_filter, current_order, current_selection);
if (opt.selection) {
current_selection = *opt.selection;
} else if (!current_selection.fields_of_interest.empty()) {
auto pipe = tql::parse_internal(fmt::format(
"select {}", fmt::join(current_selection.fields_of_interest, ", ")));
TENZIR_ASSERT(pipe);
auto ops = std::move(*pipe).unwrap();
TENZIR_ASSERT(ops.size() == 1);
result.push_back(std::move(ops[0]));
current_selection = select_optimization::no_select_optimization();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider an alternative implementation where there is no replacement operator, i.e., select never gets elided and also never added back in, because generally select is a cheap operator? We could still push up the information so that it could become a no-op in optimized cases.

Not sure if this is worth it, I think you can judge the impact of that best.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement An incremental enhancement of an existing feature performance Improvements or regressions of performance
Projects
None yet
3 participants