-
-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Projection pushdown optimization #4180
base: main
Are you sure you want to change the base?
Conversation
This reverts commit f660086.
95f199a
to
9c32232
Compare
libtenzir/src/type.cpp
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can discard type.cpp changes
/// this | select columns | sink | ||
/// <=> select columns | this | sink |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optimize
accepts an select_optimization
and returns a std::optional<select_optimization>
. Can you phrase the semantic requirements while considering (A) the optionality of the output selection, and (B) that the input columns may be different from the output columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this sound better (italics is new part)?
#Projection Pushdown
The selection is a projection pushdown represented as a vector of
columns to select. This vector can be moved around a pipeline similar to an
expression. The implementation must ensure that the selected columns are
preserved in the correct order throughout the pipeline processing stages. For an
otherwise unknown pipeline sink,
the following equivalences must hold :
this | select columns | sink <=> select columns | this | sink
The input columns will differ from the output columns(i.e., if columns 0 and 4 are
selected, then 1, 2, and 3 remain), but the
individual parse operator must handle this functionality. Optimize allows a modified parser with specified
columns to be returned.
If the operator modifies the schema of the events, the
selection must be adjusted accordingly to ensure that the specified columns are
correctly mapped and preserved in the resulting schema. The pipeline
implementation can block the upward movement of the select operator by
passing a std::nullopt into the optimized result. Semantically, an empty vector
of selection means no selection was provided, whereas a nullopt selection means
the prior selection is being blocked.
#Example
For an operator that filters and then selects columns,
the optimized output must ensure that the filter is applied before the
selection to maintain the correct semantics :
where expr | OPT | select columns | sink <==> where expr | select columns |
OPT | sink
This ensures that filtering and selection are treated as distinct operations,
and the correct behavior is preserved in the optimized pipeline.
auto config_fields = config_.fields; | ||
auto selection_fields = selection.fields_of_interest; | ||
std::sort(config_fields.begin(), config_fields.end()); | ||
std::sort(selection_fields.begin(), selection_fields.end()); | ||
std::vector<std::string> intersection; | ||
std::set_intersection(config_fields.begin(), config_fields.end(), | ||
selection_fields.begin(), selection_fields.end(), | ||
std::back_inserter(intersection)); | ||
if (intersection.empty()) { | ||
return optimize_result{filter, order, copy(), std::nullopt}; | ||
} | ||
return optimize_result{filter, order, nullptr, | ||
select_optimization(intersection)}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this assumes that we are free to reorder fields as part of the optimization, right? I would need to be convinced that this is a good choice, as field order is very visible to the user when looking at the output. The more conservative choice would be to require that the optimization does not change the order of fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this just says that the event_order passed from the previous operator will continue to get passed through the select operator to the next operator. Select currently doesn't do anything with order. Do you think there are cases where select should change the order (changing to ordered for example)?
contrib/tenzir-plugins
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Goals
- Modify every inherited optimize function to include a selection type
- Start with optimizing selection on feather read
- select.cpp: should push up projection information
- read: should push projection into feather read, return the replacement
- what additional logic is needed when we have where | select and select | where? What metadata is needed to be stored in the selection type. Is it possible to optimize where and select for feather? How is row extraction possible?
- How do we deal with nested selection? Perhaps some sort of recursion? Or perhaps linking multiple select together.
Comments
- There is no simple way to force a nested read of Feather data. A top-level approach may be the only option
- For export and other operators where nesting is possible, the current logic is for the selection object to contain all nesting information and the operator using the selection information (ie. read/export) should create an array of replacements instead of a single one.
- An alternative is having the selection information as just a string and let the individual read/export plugin deal with the nesting structure. The issue with this approach is if multiple pruning are required, the parsing logic will be encapsulated in an ugly loop.
- Parquet reading allows for a similar "row_group_indices and column_group_indices". The where operator can also be applied here! The difficult part here is getting the values of a specific row. Look into how the where filter can be pushed up for this
The parquet and feather optimizations are implemented, but now the semantics of moving the select operator through the pipeline is proving difficult.
- Concept of no_columnar_selection: empty fields_of_interest, do_not_optimize_select is true
- Blocking_selection: fields of interests exists, do_not_optimize_select is true
- Selection moving up pipe: field of interests exists, and do_not_optimize is false
On start up: the default selection is no_columnar_selection
- do_not_optimize returns a null_opt for the the selection, so the previous current columnar selection holds and is allowed to pass through
- If a blocking_selection is introduced, a select operator is added there and current selection becomes a blocking one. Perhaps it is better to turn it to no selection so a later operator can use
Operators that need to return a blocking selection (turn do_not_optimize flag on)
- Where
- ...
Questions and todo
- What to do with located in parquet/feather
- Change order_invarient to return a null opt for selection
- Create equality operator for the columnar_selection struct
- make selection const ref
- include std::move everywhere
- can do_not_optimize, use blocking optimize?
- Should all order invariants be a blocker
std::set_intersection(config_fields.begin(), config_fields.end(), | ||
selection_fields.begin(), selection_fields.end(), | ||
std::back_inserter(intersection)); | ||
if (intersection.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take another look at how to deal with empty intersection of selection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's your plan for it?
auto config_fields = config_.fields; | ||
auto selection_fields = selection.fields_of_interest; | ||
std::sort(config_fields.begin(), config_fields.end()); | ||
std::sort(selection_fields.begin(), selection_fields.end()); | ||
std::vector<std::string> intersection; | ||
std::set_intersection(config_fields.begin(), config_fields.end(), | ||
selection_fields.begin(), selection_fields.end(), | ||
std::back_inserter(intersection)); | ||
if (intersection.empty()) { | ||
return optimize_result{filter, order, copy(), std::nullopt}; | ||
} | ||
return optimize_result{filter, order, nullptr, | ||
select_optimization(intersection)}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this just says that the event_order passed from the previous operator will continue to get passed through the select operator to the next operator. Select currently doesn't do anything with order. Do you think there are cases where select should change the order (changing to ordered for example)?
/// this | select columns | sink | ||
/// <=> select columns | this | sink |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this sound better (italics is new part)?
#Projection Pushdown
The selection is a projection pushdown represented as a vector of
columns to select. This vector can be moved around a pipeline similar to an
expression. The implementation must ensure that the selected columns are
preserved in the correct order throughout the pipeline processing stages. For an
otherwise unknown pipeline sink,
the following equivalences must hold :
this | select columns | sink <=> select columns | this | sink
The input columns will differ from the output columns(i.e., if columns 0 and 4 are
selected, then 1, 2, and 3 remain), but the
individual parse operator must handle this functionality. Optimize allows a modified parser with specified
columns to be returned.
If the operator modifies the schema of the events, the
selection must be adjusted accordingly to ensure that the specified columns are
correctly mapped and preserved in the resulting schema. The pipeline
implementation can block the upward movement of the select operator by
passing a std::nullopt into the optimized result. Semantically, an empty vector
of selection means no selection was provided, whereas a nullopt selection means
the prior selection is being blocked.
#Example
For an operator that filters and then selects columns,
the optimized output must ensure that the filter is applied before the
selection to maintain the correct semantics :
where expr | OPT | select columns | sink <==> where expr | select columns |
OPT | sink
This ensures that filtering and selection are treated as distinct operations,
and the correct behavior is preserved in the optimized pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this massive undertaking. This is a great piece of work! 🚀
I went through everything in code once now and have left a bunch of comments that need addressing. I think it's only minor things from here on out.
@@ -203,6 +203,7 @@ auto exec_pipeline(std::string content, | |||
return std::move(implicit_pipe.error()); | |||
} | |||
pipe = std::move(*implicit_pipe); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optimize_parser_result(std::unique_ptr<plugin_parser> replacement, | ||
bool selection_optimized, bool filter_optimized, | ||
bool order_optimized) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the reader of this code, how am I supposed to know which boolean is which?
See also this section of the C++ Core Guidelines: http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Ri-typed
std::set_intersection(config_fields.begin(), config_fields.end(), | ||
selection_fields.begin(), selection_fields.end(), | ||
std::back_inserter(intersection)); | ||
if (intersection.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's your plan for it?
std::set_intersection(config_fields.begin(), config_fields.end(), | ||
selection_fields.begin(), selection_fields.end(), | ||
std::back_inserter(intersection)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intersection works incorrectly when using type extractors. Consider this example:
…
| select :ip
| select src_ip
You cannot know the type of the field src_ip
ahead of time. The intersection is only empty when src_ip
does not exist in the input event, or there are no IP type fields, or when src_ip
is not of type IP.
I think we need to back to the drawing board to exactly figure out whether we can optimize this at all for TQL1.
@@ -6,6 +6,8 @@ | |||
// SPDX-FileCopyrightText: (c) 2021 The Tenzir Contributors | |||
// SPDX-License-Identifier: BSD-3-Clause | |||
|
|||
#include "tenzir/select_optimization.hpp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please include with angle brackets for external library includes (which libtenzir is here, as where
is a plugin).
-> optimize_result override { | ||
(void)filter; | ||
if (order == event_order::ordered) { | ||
auto parser_opt = parser_->optimize(filter, order, selection); | ||
if (!parser_opt.replacement) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when parser_opt.selection_optimized
is true and parser_opt.replacement
is false?
@@ -6,6 +6,8 @@ | |||
// SPDX-FileCopyrightText: (c) 2023 The Tenzir Contributors | |||
// SPDX-License-Identifier: BSD-3-Clause | |||
|
|||
#include "tenzir/pipeline.hpp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#include "tenzir/pipeline.hpp" | |
#include <tenzir/pipeline.hpp> |
return {std::make_unique<json_parser>(std::move(args)), false, false, | ||
false}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand, one of the false, false, false
indicates that the order was not optimized. But it was optimized right above this line.
while (true) { | ||
auto required_size | ||
= detail::narrow_cast<size_t>(schema_stream_decoder.next_required_size()); | ||
auto payload = byte_reader(required_size); | ||
if (!payload) { | ||
co_yield {}; | ||
continue; | ||
} | ||
truncated_bytes += payload->size(); | ||
if (payload->size() < required_size) { | ||
if (truncated_bytes != 0 and payload->size() != 0) { | ||
// Ideally this always would be just a warning, but the stream decoder | ||
// happily continues to consume invalid bytes. E.g., trying to read a | ||
// JSON file with this parser will just swallow all bytes, emitting this | ||
// one error at the very end. Not a single time does consuming a buffer | ||
// actually fail. We should probably look into limiting the memory usage | ||
// here, as the stream decoder will keep consumed-but-not-yet-converted | ||
// buffers in memory. | ||
diagnostic::warning("truncated {} trailing bytes", truncated_bytes) | ||
.severity(decoded_once ? severity::warning : severity::error) | ||
.emit(ctrl.diagnostics()); | ||
} | ||
co_return; | ||
} | ||
auto arrow_buffer = as_arrow_buffer(std::move(payload)); | ||
buffered_schema_data.push_back(arrow_buffer); | ||
auto decode_result = schema_stream_decoder.Consume(std::move(arrow_buffer)); | ||
if (!decode_result.ok()) { | ||
diagnostic::error("{}", decode_result.ToStringWithoutContextLines()) | ||
.note("failed to decode the byte stream into a record batch") | ||
.emit(ctrl.diagnostics()); | ||
co_return; | ||
} | ||
if (schema_listener->decoded_schema) { | ||
decoded_once = false; | ||
schema = type::from_arrow(*schema_listener->decoded_schema); | ||
break; | ||
} | ||
} | ||
auto indices = std::vector<tenzir::offset>{}; | ||
if (!selection.inner.fields_of_interest.empty()) { | ||
for (const auto& field : selection.inner.fields_of_interest) { | ||
for (auto index : schema.resolve(field)) { | ||
if (index.size() > 1) { | ||
further_selection_needed = true; | ||
} | ||
read_options.included_fields.push_back(static_cast<int>(index[0])); | ||
indices.push_back(std::move(index)); | ||
} | ||
} | ||
std::sort(indices.begin(), indices.end()); | ||
indices.erase(std::unique(indices.begin(), indices.end()), indices.end()); | ||
std::sort(read_options.included_fields.begin(), | ||
read_options.included_fields.end()); | ||
read_options.included_fields.erase( | ||
std::unique(read_options.included_fields.begin(), | ||
read_options.included_fields.end()), | ||
read_options.included_fields.end()); | ||
for (size_t i = 0; i < indices.size(); i++) { | ||
indices[i][0] = i; | ||
} | ||
} | ||
auto listener = std::make_shared<callback_listener>(); | ||
auto stream_decoder = arrow::ipc::StreamDecoder(listener, read_options); | ||
for (auto&& buffer : buffered_schema_data) { | ||
auto decode_result = stream_decoder.Consume(buffer); | ||
if (!decode_result.ok()) { | ||
diagnostic::error("{}", decode_result.ToStringWithoutContextLines()) | ||
.note("failed to decode the byte stream into a record batch") | ||
.emit(ctrl.diagnostics()); | ||
co_return; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is almost exactly duplicated from right below. Can we make this easier to maintain?
auto opt = op.optimize(current_filter, current_order, current_selection); | ||
if (opt.selection) { | ||
current_selection = *opt.selection; | ||
} else if (!current_selection.fields_of_interest.empty()) { | ||
auto pipe = tql::parse_internal(fmt::format( | ||
"select {}", fmt::join(current_selection.fields_of_interest, ", "))); | ||
TENZIR_ASSERT(pipe); | ||
auto ops = std::move(*pipe).unwrap(); | ||
TENZIR_ASSERT(ops.size() == 1); | ||
result.push_back(std::move(ops[0])); | ||
current_selection = select_optimization::no_select_optimization(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider an alternative implementation where there is no replacement operator, i.e., select
never gets elided and also never added back in, because generally select
is a cheap operator? We could still push up the information so that it could become a no-op in optimized cases.
Not sure if this is worth it, I think you can judge the impact of that best.
Here, we implement projection pushdown, allowing the select operator to get pushed up the pipeline and to allow for optimization in feather and parquet parse. In this PR, we modified every inherited optimize function to include a selection type, we allowed for feather and parquet to read only selected columns, and considered cases where select can be pushed up. This is a good starting point for the summarize and put operators because the framework for moving selection information has been implemented.
Semantically, an empty columnar selection means that no selection should be performed. Thus, special care must be taken when dealing with a null selection. Here, a null selection acts as a blocker. Further, when returning an optimize result, std::null opt, the selection is blocked. Thus, order_invariant and do_not_optimize are blockers
Currently, only slice, batch, and pass allow the selection to move upstream. More should be added carefully.