New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Jcipar/parquet pr #18313
base: dev
Are you sure you want to change the base?
Jcipar/parquet pr #18313
Conversation
Before uploading to archival storage, write out locally. This new branch is mostly a copy of the demo work, but with improved organization.
Group rows into batches of 1000 for writing. Currently this number is hard coded.
Creating an interface for the schema registry reader breaks the circular dependency between schema registry, cluster, and datalake.
This commit adds a proto_to_arrow_consumer class that converts protobuf messages to columns in an Arrow table. If no schema is available, it falls back to the old code and the arrow table simply encodes the key and value as binary columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a complete review, but leaving some quick thoughts (particularly if you're benchmarking, uploading the file stream is probably what you want).
Structurally I think the proto and arrow converters look fine and could probably be pulled into their own PR if you want to begin checking things in, given they're pretty self contained.
Also just a thought about longer term placement, I wonder if the parquet writers should be located similar to WASM rather than being colocated with the data. I don't know the full details, but IIUC WASM uses its own internal topic to checkpoint itself. Especially as a CPU intensive process, something to think about
src/v/datalake/parquet_uploader.cc
Outdated
10ll * 1024ll * 1024ll | ||
* 1024ll, // FIXME(jcipar): 10 GiB is probably too much. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just fyi base/units.h has some nifty macros, like 10_GiB
src/v/datalake/parquet_uploader.cc
Outdated
vlog( | ||
logger.debug, | ||
"Uploaded datalake topic {} successfully.", | ||
model::topic_view(_log->config().ntp().tp.topic)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in either case we should probably be deleting the old file
}); | ||
co_await iobuf_ostream.close(); | ||
|
||
auto ret = co_await remote.upload_object( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably want to use upload_stream()
here instead, to avoid buffereing it all into an iobuf
@@ -1171,6 +1171,11 @@ ss::future<cloud_storage::upload_result> ntp_archiver::do_upload_segment( | |||
ctxlog.warn, | |||
"Failed to upload datalake segment {}", | |||
candidate); | |||
} else { | |||
vlog( | |||
ctxlog.warn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: info?
local_partition_manager.set_panadaproxy_schema_registry( | ||
_schema_registry.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to do a double take at this -- though _schema_registry is only on shard 0, under the hood it's actually a wrapper around a sharded service, right? And calls transparently call local()
via the wasm:: impl
try { | ||
_table_builder = std::make_unique<proto_to_arrow_converter>( | ||
protobuf_schema); | ||
} catch (const std::exception& e) { | ||
// Couldn't build a table builder, fall back to schemaless | ||
// TODO: Log this | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: longer term, may want to consider relying on some kind of result (e.g. see base/outcome.h) type and moving this work into some start()
method, in case the exceptions get too unwieldy
// TODO: Is this a good path? should it be configurable? | ||
std::filesystem::path path = std::filesystem::path("/tmp/parquet_files") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth a config variable, or even some subdirectory of the cloud cache?
/ci-repeat 1 |
/ci-repeat 1 skip-rebase |
v::cloud_storage | ||
v::raft | ||
Seastar::seastar | ||
Arrow::arrow_shared |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not static for this and parquet, curious.
proto_to_arrow_scalar() | ||
: _builder(std::make_shared<BuilderType>()) {} | ||
|
||
arrow::Status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is interesting, we tend to use std::error-code see the rpc layer. but wondering if the ok()
is for arrow internals
|
||
public: | ||
proto_to_arrow_scalar() | ||
: _builder(std::make_shared<BuilderType>()) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this uses a global lock - lw_shared<>
?
namespace pb = google::protobuf; | ||
|
||
// Set up child arrays | ||
for (int field_idx = 0; field_idx < message_descriptor->field_count(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the for loop be a function w/ tests and if() return
style for the conversions.
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_MESSAGE) { | ||
auto field_message_descriptor = desc->message_type(); | ||
if (field_message_descriptor == nullptr) { | ||
throw std::runtime_error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering a std::error-code (integer) like the RPC layer makes more sense here, since this is likely to be called in hot paths and exceptions/runtimes are expensive with global lock acquisitions.
for (int field_idx = 0; field_idx < message_descriptor->field_count(); | ||
field_idx++) { | ||
auto desc = message_descriptor->field(field_idx); | ||
if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be it's own series of tests. the code has both sum times (status<>) and exceptions. wondeirng if sumtypes should be enough to represent the paths.
} | ||
} | ||
void datalake::proto_to_arrow_converter::initialize_protobuf_schema( | ||
const std::string& schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use ss:string in general
field_idx++) { | ||
auto desc = message_desc->field(field_idx); | ||
|
||
if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like the same loop above w/ the same code, one just pushes back the other move-assigns but the return type could be a free function w/ tests.
|
||
void finish_batch(); | ||
|
||
std::shared_ptr<arrow::Table> build_table(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shared_ptr have global locks on dtor. lw_shared?
had some time and did a quick pass. easy to read code. |
&offset_builder](model::record&& record) { | ||
std::string key; | ||
std::string value; | ||
key = iobuf_to_string(record.key()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: initialize key on same line as assignment, const std::string key = iobuf_to_string(record.key());
// encode as utf8? The Parquet library will happily output binary in | ||
// these columns, and a reader will get an exception trying to read the | ||
// file. | ||
_field_key = arrow::field("Key", arrow::utf8()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can these assignments be in the initializer list?
} | ||
|
||
std::string | ||
datalake::arrow_writing_consumer::iobuf_to_string(const iobuf& buf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also see an implementation of iobuf_to_string
in cloud_storage/tests/anomalies_detector_test.cc
. Which is better, and should we pull this out into it's own utility function elsewhere?
explicit arrow_writing_consumer(); | ||
ss::future<ss::stop_iteration> operator()(model::record_batch batch); | ||
ss::future<std::shared_ptr<arrow::Table>> end_of_stream(); | ||
std::shared_ptr<arrow::Table> get_table(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some comments to these declarations, so users have a rough idea of the workflow for the public API?
Looks like some build errors @jcipar |
Backports Required
Release Notes