Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zstd and bzip2 compression support #204

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

Conversation

boiler
Copy link

@boiler boiler commented Mar 23, 2020

tested with avro-tools 1.9.2

$ avro-tools getmeta event.zstd.avro
avro.schema
          {
                "type" : "record",
                "name" : "Event",
                "fields" : [ {
                  "name" : "body",
                  "type" : "bytes"
                } ]
          }

avro.codec      zstandard
$ avro-tools getmeta event.bz2.avro
avro.schema
          {
                "type" : "record",
                "name" : "Event",
                "fields" : [ {
                  "name" : "body",
                  "type" : "bytes"
                } ]
          }

avro.codec      bzip2

Copy link
Contributor

@karrick karrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, thank you for your work. The code looks great, with a single correction I would ask to be made.

The one thing I need to check before we merge is potential license conflicts for the libraries being added.

ocf_reader.go Outdated Show resolved Hide resolved
Co-authored-by: Karrick McDermott <github@karrick.dev>
@karrick
Copy link
Contributor

karrick commented Aug 17, 2020

I just took a peek at the LICENSE files for the two added libraries:

goavro: Released initially under the Apache License (version 2.0)
Google Snappy: Released under BSD 3-Clause License
dsnet/compress: Released under BSD 3-Clause License
klauspost/compress: Released under BSD 3-Clause License

@karrick
Copy link
Contributor

karrick commented Aug 17, 2020

Having a few moments to think about the library dependency issue a bit more, I think I have an idea to eliminate any potential library license concerns, while making it more easy to use other compression libraries in the future without requiring manually adding them.

I am going to ask for a tiny bit of patience for this PR, while I add a pluggable compression algorithm feature for processing OCF files. The goal is to make it so a program will call a function to register a compression codec with goavro before trying to use that particular compression algorithm.

@karrick
Copy link
Contributor

karrick commented Aug 19, 2020

@boiler, Thank you for your patience. I have updated a private branch of code that allows easily adding in new compression methods. It presently looks like this, and I think it's fairly simple to use. However, it has a caveat that concerns me. Adding new compression codecs is an action that mutates global state. This might not be an issue for most, but it could potentially become an issue for users in the future.

Because of this caveat, I am looking at a modification to the method of registering new compression codecs for OCF blocks, and while it works, I am not sure I like the API.

Just keeping you in the loop.

const (
	// CompressionSnappyLabel is used when OCF blocks are compressed using the
	// snappy algorithm.
	CompressionSnappyLabel = "snappy"
)

func init() {
	// NOTE: This registration of a compression algorithm serves as an example
	// for future compression algorithm additions. However, there is no reason
	// to make the compression algorithm name publically available. The various
	// compression name labels, including the CompressionSnappyLabel constant
	// here, remain publically available for backwards compatibility only. All
	// new compression algorithms should be defined without necessarily creating
	// a new string constant of their algorithm label.
	_ = registerCompression(CompressionSnappyLabel, ocfCompressSnappy, ocfExpandSnappy)
}

// ocfCompressSnappy compresses the expanded byte slice, returning either the
// compressed byte slice, or a non-nil error.
//
// "Each compressed block is followed by the 4-byte, big-endian CRC32 checksum
// of the uncompressed data in the block."
func ocfCompressSnappy(expanded []byte) ([]byte, error) {
	compressed := snappy.Encode(nil, expanded)
	// OCF requires snappy to have CRC32 checksum after each snappy block
	compressed = append(compressed, 0, 0, 0, 0)                                              // expand slice by 4 bytes so checksum will fit
	binary.BigEndian.PutUint32(compressed[len(compressed)-4:], crc32.ChecksumIEEE(expanded)) // checksum of expanded block
	return compressed, nil
}

// ocfExpandSnappy decompresses the compressed byte slice returning either the
// expanded byte slice, or a non-nil error.
//
// "Each compressed block is followed by the 4-byte, big-endian CRC32 checksum
// of the uncompressed data in the block."
func ocfExpandSnappy(compressed []byte) ([]byte, error) {
	index := len(compressed) - 4 // last 4 bytes is crc32 of decoded block
	if index <= 0 {
		return nil, fmt.Errorf("not enough bytes for CRC32 checksum: %d", len(compressed))
	}
	expanded, err := snappy.Decode(nil, compressed[:index])
	if err != nil {
		return nil, err
	}
	actualCRC := crc32.ChecksumIEEE(expanded)
	expectedCRC := binary.BigEndian.Uint32(compressed[index : index+4])
	if actualCRC != expectedCRC {
		return nil, fmt.Errorf("CRC32 checksum mismatch: %x != %x", actualCRC, expectedCRC)
	}
	return expanded, nil
}

@karrick
Copy link
Contributor

karrick commented Aug 19, 2020

I forgot to mention, the above OCF block compression algorithm registration method has another minor downside. It does not allow for more performant buffer handling to minimize memory allocations, and perhaps even re-use of compression structure instances. So the revised API has the ability to allow for that to be optimized in the future.

@ThomasHabets
Copy link

Any progress on this? I'd love to get zstd in this upstream tree.

(and a way to set zstd.BestCompression)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants