Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: ReadAllFiles does not fully read gzipped files from GCS #31040

Open
1 of 16 tasks
janowskijak opened this issue Apr 18, 2024 · 11 comments
Open
1 of 16 tasks

[Bug]: ReadAllFiles does not fully read gzipped files from GCS #31040

janowskijak opened this issue Apr 18, 2024 · 11 comments

Comments

@janowskijak
Copy link

What happened?

Since the refactor of gcsio (2.52?) ReadAllFiles does not fully read gzipped files from GCS. Part of the file will be correctly returned but rest will go missing.

I presume this is caused by the fact that GCS performs decompressive transcoding while _ExpandIntoRanges uses the GCS objects metadata to determine the read range. This means that the file size we receive is larger than the maximum of the read range.

For example, a gzip on GCS might have a file size of 1 MB and this will be the object size in the metadata. Thus the maximum of the read range will be 1 MB. However, when beam opens the file it's already decompressed by GCS so the file size will be 1.5 MB and we won't read 0.5 MB out of it thus causing data loss.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Apr 19, 2024

Thanks for reporting. Agree this is a P1 bug as it causes data loss.

@Abacn
Copy link
Contributor

Abacn commented Apr 19, 2024

Is it possible to provide a working example that reproduce the issue, which could help triage.

@liferoad
Copy link
Collaborator

@shunping FYI

@janowskijak
Copy link
Author

janowskijak commented Apr 22, 2024

Is it possible to provide a working example that reproduce the issue, which could help triage.

@Abacn I don't have a working example however the steps to reproduce are:

  1. Upload a gzip file to GCS. Make sure that the unzipped file is large enough, e.g a few MB.
  2. Create a beam pipeline using Python SDK that reads the file from 1. using RealAllFromText.
  3. Print or write the output of ReadAllFromText.
  4. Observe that the file is not fully read.

EDIT: This issue will probably appear for any compression type. I just encountered it with gzip but did not test with other compression algorithms.

@liferoad
Copy link
Collaborator

liferoad commented Apr 22, 2024

I uploaded one test file here: gs://apache-beam-samples/gcs/bigfile.txt.gz (~7MB), which has 100000 lines but cannot reproduce this:

# standard libraries
import logging

# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count

logger = logging.getLogger()
logger.setLevel(logging.INFO)

elements = [
    "gs://apache-beam-samples/gcs/bigfile.txt.gz",
]

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    (
        p
        | Create(elements)
        | "Read File from GCS" >> ReadAllFromText()
        | Count.Globally()
        | "Log" >> Map(lambda x: logging.info("Total lines %d", x))
    )

This shows:

INFO:root:Total lines 100000

@Michal-Nguyen-airspace-intelligence

So I double checked and there are differences between your example and our case.

  • We use content encoding gzip while saving our files to GCS, you don't have encoding specified
  • This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy
  • This further results in reading only fragment of the file

Furthermore, after removing encoding type from our file and using CompressionTypes.AUTO on it worked properly.
To get you example to represent our situation please add content encoding gzip to your file metadata.

@Michal-Nguyen-airspace-intelligence

For quick patch we use following solution:

class ReadAllFromTextNotSplittable(ReadAllFromText):
    """This class doesn't take advantage of splitting files in bundles because
    when doing so beam was taking compressed file size as reference resulting in
    reading only a fracture of uncompressed file"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._read_all_files._splittable = False

@liferoad
Copy link
Collaborator

What does your medadata look like?

I tried this:
image

Then I got this error:

ERROR:apache_beam.runners.common:Error -3 while decompressing data: incorrect header check [while running '[6]: Read File from GCS/ReadAllFiles/ReadRange']

@Michal-Nguyen-airspace-intelligence

This is expected, as I mentioned earlier
This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy
I presume while downloading file from GCS it's already decompressed, hence the error of decompression in Beam.

@Michal-Nguyen-airspace-intelligence

Metadata is as follows (also please note we checked both text/plain and application/x-gzip, both were only partially read):
image

@liferoad
Copy link
Collaborator

liferoad commented Apr 27, 2024

I see. We need to check decompressive transcoding for the GCS file to determine whether the content is compressed rather than relying on the file extension.

# standard libraries
import logging

# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count

logger = logging.getLogger()
logger.setLevel(logging.INFO)

elements = [
    # "gs://apache-beam-samples/gcs/bigfile.txt.gz",
    # "gs://apache-beam-samples/gcs/bigfile_with_encoding.txt.gz",
    "gs://apache-beam-samples/gcs/bigfile_with_encoding_plain.txt.gz",
]

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    (
        p
        | Create(elements)
        | "Read File from GCS"
        >> ReadAllFromText(
            compression_type=beam.io.filesystem.CompressionTypes.UNCOMPRESSED
        )
        | Count.Globally()
        | "Log" >> Map(lambda x: logging.info("Total lines %d", x))
    )

This only loads 75,601 lines.

#19413 could be related for uploading the file to GCS.

@liferoad liferoad added this to the 2.57.0 Release milestone Apr 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants