Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: ReadAllFiles does not fully read gzipped files from GCS #31040

Closed
1 of 16 tasks
janowskijak opened this issue Apr 18, 2024 · 34 comments · Fixed by #33384
Closed
1 of 16 tasks

[Bug]: ReadAllFiles does not fully read gzipped files from GCS #31040

janowskijak opened this issue Apr 18, 2024 · 34 comments · Fixed by #33384
Assignees

Comments

@janowskijak
Copy link

What happened?

Since the refactor of gcsio (2.52?) ReadAllFiles does not fully read gzipped files from GCS. Part of the file will be correctly returned but rest will go missing.

I presume this is caused by the fact that GCS performs decompressive transcoding while _ExpandIntoRanges uses the GCS objects metadata to determine the read range. This means that the file size we receive is larger than the maximum of the read range.

For example, a gzip on GCS might have a file size of 1 MB and this will be the object size in the metadata. Thus the maximum of the read range will be 1 MB. However, when beam opens the file it's already decompressed by GCS so the file size will be 1.5 MB and we won't read 0.5 MB out of it thus causing data loss.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Apr 19, 2024

Thanks for reporting. Agree this is a P1 bug as it causes data loss.

@Abacn
Copy link
Contributor

Abacn commented Apr 19, 2024

Is it possible to provide a working example that reproduce the issue, which could help triage.

@liferoad
Copy link
Contributor

@shunping FYI

@janowskijak
Copy link
Author

janowskijak commented Apr 22, 2024

Is it possible to provide a working example that reproduce the issue, which could help triage.

@Abacn I don't have a working example however the steps to reproduce are:

  1. Upload a gzip file to GCS. Make sure that the unzipped file is large enough, e.g a few MB.
  2. Create a beam pipeline using Python SDK that reads the file from 1. using RealAllFromText.
  3. Print or write the output of ReadAllFromText.
  4. Observe that the file is not fully read.

EDIT: This issue will probably appear for any compression type. I just encountered it with gzip but did not test with other compression algorithms.

@liferoad
Copy link
Contributor

liferoad commented Apr 22, 2024

I uploaded one test file here: gs://apache-beam-samples/gcs/bigfile.txt.gz (~7MB), which has 100000 lines but cannot reproduce this:

# standard libraries
import logging

# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count

logger = logging.getLogger()
logger.setLevel(logging.INFO)

elements = [
    "gs://apache-beam-samples/gcs/bigfile.txt.gz",
]

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    (
        p
        | Create(elements)
        | "Read File from GCS" >> ReadAllFromText()
        | Count.Globally()
        | "Log" >> Map(lambda x: logging.info("Total lines %d", x))
    )

This shows:

INFO:root:Total lines 100000

@Michal-Nguyen-airspace-intelligence

So I double checked and there are differences between your example and our case.

  • We use content encoding gzip while saving our files to GCS, you don't have encoding specified
  • This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy
  • This further results in reading only fragment of the file

Furthermore, after removing encoding type from our file and using CompressionTypes.AUTO on it worked properly.
To get you example to represent our situation please add content encoding gzip to your file metadata.

@Michal-Nguyen-airspace-intelligence

For quick patch we use following solution:

class ReadAllFromTextNotSplittable(ReadAllFromText):
    """This class doesn't take advantage of splitting files in bundles because
    when doing so beam was taking compressed file size as reference resulting in
    reading only a fracture of uncompressed file"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._read_all_files._splittable = False

@liferoad
Copy link
Contributor

What does your medadata look like?

I tried this:
image

Then I got this error:

ERROR:apache_beam.runners.common:Error -3 while decompressing data: incorrect header check [while running '[6]: Read File from GCS/ReadAllFiles/ReadRange']

@Michal-Nguyen-airspace-intelligence

This is expected, as I mentioned earlier
This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy
I presume while downloading file from GCS it's already decompressed, hence the error of decompression in Beam.

@Michal-Nguyen-airspace-intelligence

Metadata is as follows (also please note we checked both text/plain and application/x-gzip, both were only partially read):
image

@liferoad
Copy link
Contributor

liferoad commented Apr 27, 2024

I see. We need to check decompressive transcoding for the GCS file to determine whether the content is compressed rather than relying on the file extension.

# standard libraries
import logging

# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count

logger = logging.getLogger()
logger.setLevel(logging.INFO)

elements = [
    # "gs://apache-beam-samples/gcs/bigfile.txt.gz",
    # "gs://apache-beam-samples/gcs/bigfile_with_encoding.txt.gz",
    "gs://apache-beam-samples/gcs/bigfile_with_encoding_plain.txt.gz",
]

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    (
        p
        | Create(elements)
        | "Read File from GCS"
        >> ReadAllFromText(
            compression_type=beam.io.filesystem.CompressionTypes.UNCOMPRESSED
        )
        | Count.Globally()
        | "Log" >> Map(lambda x: logging.info("Total lines %d", x))
    )

This only loads 75,601 lines.

#19413 could be related for uploading the file to GCS.

@liferoad liferoad added this to the 2.57.0 Release milestone Apr 29, 2024
@shunping
Copy link
Contributor

.take-issue

@kennknowles
Copy link
Member

Have we reproduced this?

@liferoad
Copy link
Contributor

liferoad commented Jun 7, 2024

Yes, see my above link: #31040 (comment)

@kennknowles
Copy link
Member

Is there a hope of a fix for 2.57.0 cherry pick? I would guess this is a longstanding issue so getting it fixed in a very thorough way for 2.58.0 is actually the best thing to do. I recall we had decompressive transcoding bugs in the past. So we should make sure we really get it right this time. And the user can mitigate by configuring GCS to not do the transcoding.

@liferoad
Copy link
Contributor

Moved this to 2.58.0. Thanks!

@jrmccluskey
Copy link
Contributor

Has any progress been made on this?

@liferoad
Copy link
Contributor

liferoad commented Jul 2, 2024

Not yet. We can move this to 2.59.0.

@lostluck
Copy link
Contributor

Has any progress been made on this?

@lostluck
Copy link
Contributor

Moved to 2.60.0

@kennknowles kennknowles removed this from the 2.60.0 Release milestone Aug 22, 2024
@kennknowles
Copy link
Member

Based on this getting pushed from release to release, it is clearly not a true release-blocker.

@serratedserenade
Copy link

serratedserenade commented Nov 29, 2024

Has there been any progress on this? If there is none, can anyone suggest a monkey patch or some sort of hotfix that we can apply ourselves?

EDIT: Did some disgusting hacks and it seems that if we're able to somehow pass raw_download as a parameter to the Blob.download_as_bytes function everything works fine.

@liferoad
Copy link
Contributor

#31040 (comment) does not work for you?

@serratedserenade
Copy link

serratedserenade commented Nov 29, 2024

#31040 (comment) does not work for you?

That's what I was using prior until we realised this method was causing part of the data we were reading to be skipped. Basically same problem as here #31040 (comment)

However, just to make sure, I will double check.

Just to add details:

  • Using Beam 2.61 locally.
  • Reading from a GCS Bucket where a data provider pushes data into it with the following metadata (there is no option for them to not do this or modify how they set the headers):
    • Content-Type: application/json
    • Content-Encoding: gzip

EDIT: Did a quick check and realised that I was using both the one you pointed at and the UNCOMPRESSED flag, however removing the uncompressed flag just gives me the Error -3 while decompressing data: incorrect header check

@kennknowles
Copy link
Member

This issue came up and was fixed in the Java SDK file IO many years ago, so we should have some reference material to work with. I am trying to find it.

And presumably also related to #18390.

@shunping
Copy link
Contributor

shunping commented Dec 15, 2024

Pick up this issue again. I confirm that I am able to reproduce the problem with code in #31040 (comment) with the latest Beam.

@shunping
Copy link
Contributor

shunping commented Dec 15, 2024

I briefly debugged the issue, it seems that the file is accessed with decompressive transcoding (https://cloud.google.com/storage/docs/transcoding#decompressive_transcoding).

I checked the metadata of the GCS file in use, and the result is as follows.

$ gcloud storage objects describe gs://apache-beam-samples/gcs/bigfile_with_encoding_plain.txt.gz
acl:
...
bucket: apache-beam-samples
content_encoding: gzip
content_type: text/plain
crc32c_hash: /REfiQ==
creation_time: 2024-04-27T20:19:48+0000
...
metageneration: 1
name: gcs/bigfile_with_encoding_plain.txt.gz
size: 7635685
storage_class: STANDARD
storage_class_update_time: 2024-04-27T20:19:48+0000
storage_url: gs://apache-beam-samples/gcs/bigfile_with_encoding_plain.txt.gz#1714249188497359
update_time: 2024-04-27T20:19:48+0000

The size field shows the size of the gzip file, i.e. 7635685. The original file size prior to gzip should be 10100000.

$ wc bigfile_with_encoding_plain.txt
  100000  252788 10100000 bigfile_with_encoding_plain.txt

When debugging in the DoFn _ReadRange(), i see the initial range matches the file size.

image

I think the size is a problem here. Because the file metadata meets the criteria of decompressive transcoding, GCS is sending uncompressed data when we request it and our DoFn thinks the decompressed file size is only 7635685 bytes.

@shunping
Copy link
Contributor

Just check around. It seems GCS decompressive transcoding causes some similar issues elsewhere.

@shunping
Copy link
Contributor

The range is determined by the size obtained from GCSIO (

file_metadata = self._gcsIO()._status(path)
,
file_status['size'] = gcs_object.size
), which in turn called GCS Python Client Library google-cloud-storage.

The size property in the Blob object there comes from "Content-Length". (https://cloud.google.com/storage/docs/json_api/v1/objects)
image

@shunping
Copy link
Contributor

shunping commented Jan 3, 2025

To clarify the behavior of textio with various content encoding, content type, and compression settings, I've expanded the table in the Apache Beam GitHub issue #18390. This table compares the behavior across two Beam SDK versions: 2.52.0 (prior to the GCSIO migration) and 2.62.0 (the upcoming release). I also include the proposed behavior of my fix in the last column.

image

A few notes about how the data is generated.

  • For the first 3 x 2 x 3 rows, the text data is gzipped locally and then uploaded to gcs. Then the metadata values of content-type and content-encoded are manually adjusted.
  • For the row marked as "copy default text file", the text data is directly copied/uploaded to gcs without gzip.
  • For the row marked as "copy default gzip file", the gzipped text data is copied/uploaded to gcs.
  • For the row marked as "copy default text file with gzip-local flag", the text data is uploaded to gcs with the said flag.
    gcloud storage cp -Z ./textio-test-data.1k.txt gs://apache-beam-samples/textio/textio-test-data.gzip-local.1k.txt.gz.

@shunping
Copy link
Contributor

shunping commented Jan 3, 2025

Notice that when content-type=application/gzip and content-encoding=gzip, GCS considers the file is doubly compressed(https://cloud.google.com/storage/docs/transcoding#gzip-gzip), which is against the actual content we store here (aka. a gzipped text file). Therefore, an exception is thrown there.

For any UnicodeDecodeError, most of them are due to the fact that users specify "UNCOMPRESSED" on a gzip file. We can let it be this way, or we can give a better and more informative error if "content-type" and/or "content-encoding" does not match the specified "compression type".

@kennknowles WDYT?

@kennknowles
Copy link
Member

I agree with all of your proposals that replace "Data Loss" with "UnicodeDecodeError"

@kennknowles
Copy link
Member

In all the cases where GCS transcoding causes a zlib.error I also agree with making them function correctly. Basically if the user says it is GZIP and the data really is GZIP but we know that GCS is going to decode it then we do not do a redundant decode.

@shunping
Copy link
Contributor

shunping commented Jan 4, 2025

I agree with all of your proposals that replace "Data Loss" with "UnicodeDecodeError"

Great! GCS decompressive transcoding is a bit unintuitive to Beam users here, and when it happens, we see data loss. I think it is more natural to expect users to specify GZIP or AUTO in those cases rather than UNCOMPRESSED, as shown in the proposal.

Basically if the user says it is GZIP and the data really is GZIP but we know that GCS is going to decode it then we do not do a redundant decode.

Yep, that's the idea, but it is implemented differently in the proposed fix (#33384).

In my fix (#33384), I let GCS client library skip the decoding step in itself and rely on Beam's decoding mechanism in CompressedFile to process the file. I believe this is more intuitive to our users and the end result of this approach is exactly the same as what has been proposed in the previous table.

@github-actions github-actions bot added this to the 2.63.0 Release milestone Jan 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants