Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing Avro OCF #762

Closed
siredmar opened this issue May 12, 2021 · 11 comments · Fixed by #1344
Closed

Processing Avro OCF #762

siredmar opened this issue May 12, 2021 · 11 comments · Fixed by #1344
Labels
enhancement inputs Any tasks or issues relating specifically to inputs processors Any tasks or issues relating specifically to processors

Comments

@siredmar
Copy link

Hi!
Am i right that currently it is not possible to process Avro OCF (Object Container Files, http://avro.apache.org/docs/current/spec.html#Object+Container+Files) with benthos?
The main difference here is, that the schema is within the header an can be used to decode the message.

Cheers
Armin

@Jeffail Jeffail added enhancement inputs Any tasks or issues relating specifically to inputs processors Any tasks or issues relating specifically to processors labels May 12, 2021
@Jeffail
Copy link
Member

Jeffail commented May 12, 2021

Hey @siredmar, yeah I would imagine this isn't possible without some crazy trickery. We could potentially support this as an input codec format which would work with file based inputs (file, aws_s3, sftp, etc). Or, alternatively, we could expand the avro processor to support this format, but that would mean we'd need to load the entire archive in memory before we can process the individual documents.

@siredmar
Copy link
Author

siredmar commented May 12, 2021

What to you think would be the best approach?
Just tinkering a little bit, so please correct me if i'm wrong.
Case input codec format, lets call it avro_ocf for now:
avro_ocf reads the binary data, extracts the schema from the header. For this only the file/stream/binary has to be read until the header is complete. No complete file read at all. Then both the schema and the binary payload is passed to the avro processor where the standard avro processing can be performed. For now i cannot estimate the usage of the avro processor and the interface changes that need to be done for such a scenario.

Case expand avro processor:
As you mentioned, one must read the entire archive into memory. Isn't that always the case when processing large avro messages? Wouldn't it also work reading the header first, before processing? One could control the avro processor using a new option like ocf: true that disables options schema and schema_path telling the processor that the schema to use lies withing the message.

@Jeffail
Copy link
Member

Jeffail commented May 12, 2021

I think the codec approach is definitely the better option. The interface of a codec implementation consumes an io.Reader and emits streamed messages that can be structured, so in this case I think it would make sense to implement it entirely within that, no need for a separate processor or anything.

@loicalleyne
Copy link
Contributor

Hi I have a use case where we have a large volume of OCF to process and this would be extremely useful.

I looked at /internal/codec/reader.go, and from what I understand the main things needed to add a codec for the file input would be:
A new avro_ocf struct type with ack, Next, Close methods
newAvroOCFReader function that returns the address of a new reader
adding avro_ocf to the codec list and config parsing

Is this correct?
I've tried several Avro packages in the past so I'm somewhat familiar with how to read OCF files and read messages. I'd be willing to take a stab at implementing this if I could get a little guidance as to how to go about it (not a professional developer but I've been building Ops tools and services with Go for 3+ years now).

@mihaitodor
Copy link
Member

mihaitodor commented Jul 21, 2022

By all means, all PRs are welcome! You're looking in the right place. Search for example for lines in that file and write avro-ocf based on that in the same file. Then, ideally, try adding a (few) test(s) here: https://github.com/benthosdev/benthos/blob/main/internal/codec/reader_test.go. Besides that, the contributing docs are here. Try to follow those before raising a PR.

Not sure which AVRO package you want to try, but LinkedIn's goavro is used for schema_registry_encode and schema_registry_decode and I see they have https://pkg.go.dev/github.com/linkedin/goavro/v2#NewOCFReader. There's also https://github.com/hamba/avro, but AFAIK it doesn't offer a good way to convert AVRO to JSON because, in some cases (i.e. logical types), the schema has to be used when constructing the JSON. Goavro does a better job with that, but there are some issues and limitations with logical types that I hope they'll decide to address soon.

@loicalleyne
Copy link
Contributor

I think I have something that follows the pattern in the other codecs using goavro.TextualFromNative(). Trying to run make, but running into issues:

go: finding cloud.google.com/go/pubsub v1.17.1
go: finding cloud.google.com/go/storage v1.18.2
build github.com/Azure/azure-sdk-for-go/sdk/azcore: cannot load github.com/Azure/azure-sdk-for-go/sdk/azcore: no Go source files
make: *** [Makefile:43: target/bin/benthos] Error 1

I have go 1.18 installed, do I need to downgrade to 1.16 or is the build failing for some other reason? Here's what I get when I try to manually 'go get github.com/Azure/azure-sdk-for-go/sdk/azcore'

go get -x github.com/Azure/azure-sdk-for-go/sdk/azcore

get https://proxy.golang.org/github.com/%21azure/azure-sdk-for-go/sdk/azcore/@v/list

get https://proxy.golang.org/github.com/%21azure/azure-sdk-for-go/sdk/azcore/@v/list: 200 OK (0.476s)

build github.com/Azure/azure-sdk-for-go/sdk/azcore: cannot load github.com/Azure/azure-sdk-for-go/sdk/azcore: no Go source files

@mihaitodor
Copy link
Member

Not sure why you're getting that. I'm on Go 1.18 and I have the following env vars set:

GO111MODULE="on"
GOPRIVATE=""
GOPROXY="https://proxy.golang.org,direct"

If all else fails, try building it in a golang Docker container

@loicalleyne
Copy link
Contributor

Apparently it's what happens when you upgraded your Windows Go install to 1.18 but your WSL Go install is still on 1.13 🤦 I only had make installed in WSL but I rarely use it so didn't notice till now.

I ran a test that read an OCF and output to file lines codec, it's spitting out the data with some invisible UTF-8 characters, not json. I think I need to use a different codec method and json.Marshall; although that makes me wonder if there's a use case for reading binary avro data and passing on the avro schema as input metadata so a processor can mutate it and use it for output. I should probably get this working first though.

@loicalleyne
Copy link
Contributor

loicalleyne commented Jul 24, 2022

I looked at the schema_registry processor implementation, realized that the avro-ocf input should do the same thing but from a file, using part. SetJSON().
Is using encoding/json ever a performance bottleneck? Or is it unlikely to be the slowest element in the stream processing?
One of my tests involves a ~130MB avro OCF, when I tried outputting it to file lines the output file was over 5GB before I killed it, I wonder how this will work at scale.

@mihaitodor
Copy link
Member

Is using encoding/json ever a performance bottleneck?

I guess it can be when dealing with really complex and heavily-nested structures.

One of my tests involves a ~130MB avro OCF, when I tried outputting it to file lines the output file was over 5GB before I killed it, I wonder how this will work at scale.

I don't know anything about AVRO OCF, but is that a meaningful operation then? Maybe such a file is not meant to be converted to JSON. Note that the only reason to convert it to JSON is to allow users to define transformations for it. If it's such a complex and large object, then the Benthos way of performing transformations might not be ideal.

OTOH, does it perhaps contain multiple records inside, like a really big JSON array? If this is the case, then maybe the new codec should behave like the lines one and stream each object individually instead of the entire set in one go.

Still, 130MB expanding to 5GB sounds excessive to me. Maybe you bumped into some bug in the goavro library. Do you have a tool which can extract its schema somehow? If the output produced by goavro is complete nonsense, I'd recommend creating a simple self-contained app which reproduces the issue by calling goavro functions directly and then raise an issue upstream on the goavro repo. Also, it might be worth trying the same thing with https://github.com/hamba/avro to see if it behaves the same way.

@loicalleyne
Copy link
Contributor

So the use case is that we get logs shipped to us from a third party that don't partition well in BigQuery, the datetime field is nested and not top-level; so far we've been dealing with this by doing a load to a temporary table then appending "SELECT CAST(nested.field AS TIMESTAMP) event_time, * from temptable" to the production table. This incurs scanning costs just to ingest the logs into BigQuery.
I loaded that test file from GCS into a temporary table, the file size is 180,131,873 bytes, in BigQuery the table size ends up being 7.18 GB with 7,922,579 rows. This is one of the less useful log types, I'll run tests on the other types and see Benthos still fits our use case.
All that being said, I think I've got the basic functionality working so I'll likely submit a new PR tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement inputs Any tasks or issues relating specifically to inputs processors Any tasks or issues relating specifically to processors
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants