Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jcipar/parquet pr #18313

Draft
wants to merge 7 commits into
base: dev
Choose a base branch
from
Draft

Jcipar/parquet pr #18313

wants to merge 7 commits into from

Conversation

jcipar
Copy link
Contributor

@jcipar jcipar commented May 8, 2024

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.1.x
  • v23.3.x
  • v23.2.x

Release Notes

Before uploading to archival storage, write out locally.

This new branch is mostly a copy of the demo work, but with improved organization.
Group rows into batches of 1000 for writing. Currently this number is hard coded.
Creating an interface for the schema registry reader breaks the circular
dependency between schema registry, cluster, and datalake.
This commit adds a proto_to_arrow_consumer class that converts protobuf
messages to columns in an Arrow table. If no schema is available, it
falls back to the old code and the arrow table simply encodes the key
and value as binary columns.
Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a complete review, but leaving some quick thoughts (particularly if you're benchmarking, uploading the file stream is probably what you want).

Structurally I think the proto and arrow converters look fine and could probably be pulled into their own PR if you want to begin checking things in, given they're pretty self contained.

Also just a thought about longer term placement, I wonder if the parquet writers should be located similar to WASM rather than being colocated with the data. I don't know the full details, but IIUC WASM uses its own internal topic to checkpoint itself. Especially as a CPU intensive process, something to think about

Comment on lines 20 to 21
10ll * 1024ll * 1024ll
* 1024ll, // FIXME(jcipar): 10 GiB is probably too much.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just fyi base/units.h has some nifty macros, like 10_GiB

Comment on lines 143 to 146
vlog(
logger.debug,
"Uploaded datalake topic {} successfully.",
model::topic_view(_log->config().ntp().tp.topic));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in either case we should probably be deleting the old file

});
co_await iobuf_ostream.close();

auto ret = co_await remote.upload_object(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to use upload_stream() here instead, to avoid buffereing it all into an iobuf

@@ -1171,6 +1171,11 @@ ss::future<cloud_storage::upload_result> ntp_archiver::do_upload_segment(
ctxlog.warn,
"Failed to upload datalake segment {}",
candidate);
} else {
vlog(
ctxlog.warn,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: info?

Comment on lines +1218 to +1219
local_partition_manager.set_panadaproxy_schema_registry(
_schema_registry.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to do a double take at this -- though _schema_registry is only on shard 0, under the hood it's actually a wrapper around a sharded service, right? And calls transparently call local() via the wasm:: impl

Comment on lines +20 to +26
try {
_table_builder = std::make_unique<proto_to_arrow_converter>(
protobuf_schema);
} catch (const std::exception& e) {
// Couldn't build a table builder, fall back to schemaless
// TODO: Log this
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: longer term, may want to consider relying on some kind of result (e.g. see base/outcome.h) type and moving this work into some start() method, in case the exceptions get too unwieldy

Comment on lines +79 to +80
// TODO: Is this a good path? should it be configurable?
std::filesystem::path path = std::filesystem::path("/tmp/parquet_files")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth a config variable, or even some subdirectory of the cloud cache?

@dotnwat
Copy link
Member

dotnwat commented May 9, 2024

/ci-repeat 1

@dotnwat
Copy link
Member

dotnwat commented May 9, 2024

/ci-repeat 1 skip-rebase

v::cloud_storage
v::raft
Seastar::seastar
Arrow::arrow_shared
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not static for this and parquet, curious.

proto_to_arrow_scalar()
: _builder(std::make_shared<BuilderType>()) {}

arrow::Status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is interesting, we tend to use std::error-code see the rpc layer. but wondering if the ok() is for arrow internals


public:
proto_to_arrow_scalar()
: _builder(std::make_shared<BuilderType>()) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this uses a global lock - lw_shared<>?

namespace pb = google::protobuf;

// Set up child arrays
for (int field_idx = 0; field_idx < message_descriptor->field_count();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the for loop be a function w/ tests and if() return style for the conversions.

desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_MESSAGE) {
auto field_message_descriptor = desc->message_type();
if (field_message_descriptor == nullptr) {
throw std::runtime_error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering a std::error-code (integer) like the RPC layer makes more sense here, since this is likely to be called in hot paths and exceptions/runtimes are expensive with global lock acquisitions.

for (int field_idx = 0; field_idx < message_descriptor->field_count();
field_idx++) {
auto desc = message_descriptor->field(field_idx);
if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be it's own series of tests. the code has both sum times (status<>) and exceptions. wondeirng if sumtypes should be enough to represent the paths.

}
}
void datalake::proto_to_arrow_converter::initialize_protobuf_schema(
const std::string& schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use ss:string in general

field_idx++) {
auto desc = message_desc->field(field_idx);

if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like the same loop above w/ the same code, one just pushes back the other move-assigns but the return type could be a free function w/ tests.


void finish_batch();

std::shared_ptr<arrow::Table> build_table();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shared_ptr have global locks on dtor. lw_shared?

@emaxerrno
Copy link
Contributor

had some time and did a quick pass. easy to read code.

&offset_builder](model::record&& record) {
std::string key;
std::string value;
key = iobuf_to_string(record.key());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: initialize key on same line as assignment, const std::string key = iobuf_to_string(record.key());

// encode as utf8? The Parquet library will happily output binary in
// these columns, and a reader will get an exception trying to read the
// file.
_field_key = arrow::field("Key", arrow::utf8());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can these assignments be in the initializer list?

}

std::string
datalake::arrow_writing_consumer::iobuf_to_string(const iobuf& buf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see an implementation of iobuf_to_string in cloud_storage/tests/anomalies_detector_test.cc. Which is better, and should we pull this out into it's own utility function elsewhere?

explicit arrow_writing_consumer();
ss::future<ss::stop_iteration> operator()(model::record_batch batch);
ss::future<std::shared_ptr<arrow::Table>> end_of_stream();
std::shared_ptr<arrow::Table> get_table();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments to these declarations, so users have a rough idea of the workflow for the public API?

@dotnwat
Copy link
Member

dotnwat commented May 10, 2024

Looks like some build errors @jcipar

@jcipar jcipar mentioned this pull request May 13, 2024
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants