Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Content-Encoding: gzip along with Transfer-Encoding: chunked sometimes terminates early #1608

Merged
merged 3 commits into from Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,90 @@
package com.google.api.client.http;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;

final class GzipSupport {

private GzipSupport() {}

static GZIPInputStream newGzipInputStream(InputStream in) throws IOException {
return new GZIPInputStream(new OptimisticAvailabilityInputStream(in));
}

/**
* When {@link GZIPInputStream} completes processing an individual member it will call {@link
* InputStream#available()} to determine if there is more stream to try and process. If the call
* to {@code available()} returns 0 {@code GZIPInputStream} will determine it has processed the
* entirety of the underlying stream. This is spurious, as {@link InputStream#available()} is
* allowed to return 0 if it would require blocking in order for more bytes to be available. When
* {@code GZIPInputStream} is reading from a {@code Transfer-Encoding: chunked} response, if the
* chunk boundary happens to align closely enough to the member boundary {@code GZIPInputStream}
* won't consume the whole response.
*
* <p>This class, provides an optimistic "estimate" (in actuality, a lie) of the number of {@code
* available()} bytes in the underlying stream. It does this by tracking the last number of bytes
* read. If the last number of bytes read is grater than -1, we return {@link Integer#MAX_VALUE}
* to any call of {@link #available()}.
*
* <p>We're breaking the contract of available() in that we're lying about how much data we have
* accessible without blocking, however in the case where we're weaving {@link GZIPInputStream}
* into response processing we already know there are going to be blocking calls to read before
* the stream is exhausted.
*
* <p>This scenario isn't unique to processing of chunked responses, and can be replicated
* reliably using a {@link java.io.SequenceInputStream} with two underlying {@link
* java.io.ByteArrayInputStream}. See the corresponding test class for a reproduction.
*
* <p>The need for this class has been verified for the following JVMs:
*
* <ol>
* <li>
* <pre>
* openjdk version "1.8.0_292"
* OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_292-b10)
* OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.292-b10, mixed mode)
* </pre>
* <li>
* <pre>
* openjdk version "11.0.14.1" 2022-02-08
* OpenJDK Runtime Environment Temurin-11.0.14.1+1 (build 11.0.14.1+1)
* OpenJDK 64-Bit Server VM Temurin-11.0.14.1+1 (build 11.0.14.1+1, mixed mode)
* </pre>
* <li>
* <pre>
* openjdk version "17" 2021-09-14
* OpenJDK Runtime Environment Temurin-17+35 (build 17+35)
* OpenJDK 64-Bit Server VM Temurin-17+35 (build 17+35, mixed mode, sharing)
* </pre>
* </ol>
*/
private static final class OptimisticAvailabilityInputStream extends FilterInputStream {
private int lastRead = 0;

OptimisticAvailabilityInputStream(InputStream delegate) {
super(delegate);
}

@Override
public int available() throws IOException {
return lastRead > -1 ? Integer.MAX_VALUE : 0;
}

@Override
public int read() throws IOException {
return lastRead = super.read();
}

@Override
public int read(byte[] b) throws IOException {
return lastRead = super.read(b);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return lastRead = super.read(b, off, len);
}
}
}
Expand Up @@ -30,7 +30,6 @@
import java.util.Locale;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;

/**
* HTTP response.
Expand Down Expand Up @@ -362,7 +361,7 @@ public InputStream getContent() throws IOException {
// GZIPInputStream.close() --> ConsumingInputStream.close() -->
// exhaust(ConsumingInputStream)
lowLevelResponseContent =
new GZIPInputStream(new ConsumingInputStream(lowLevelResponseContent));
GzipSupport.newGzipInputStream(new ConsumingInputStream(lowLevelResponseContent));
}
}
// logging (wrap content with LoggingInputStream)
Expand Down
@@ -0,0 +1,101 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.api.client.http;

import static com.google.common.truth.Truth.assertThat;

import com.google.common.io.ByteStreams;
import com.google.common.io.CountingInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.zip.GZIPInputStream;
import org.junit.Test;

public final class GzipSupportTest {

@SuppressWarnings("UnstableApiUsage") // CountingInputStream is @Beta
@Test
public void gzipInputStreamConsumesAllBytes() throws IOException {
byte[] data = new byte[] {(byte) 'a', (byte) 'b'};
// `echo -n a > a.txt && gzip -n9 a.txt`
byte[] member0 =
new byte[] {
0x1f,
(byte) 0x8b,
0x08,
0x00,
0x00,
0x00,
0x00,
0x00,
0x02,
0x03,
0x4b,
0x04,
0x00,
(byte) 0x43,
(byte) 0xbe,
(byte) 0xb7,
(byte) 0xe8,
0x01,
0x00,
0x00,
0x00
};
// `echo -n b > b.txt && gzip -n9 b.txt`
byte[] member1 =
new byte[] {
0x1f,
(byte) 0x8b,
0x08,
0x00,
0x00,
0x00,
0x00,
0x00,
0x02,
0x03,
0x4b,
0x02,
0x00,
(byte) 0xf9,
(byte) 0xef,
(byte) 0xbe,
(byte) 0x71,
0x01,
0x00,
0x00,
0x00
};
int totalZippedBytes = member0.length + member1.length;
try (InputStream s =
new SequenceInputStream(
new ByteArrayInputStream(member0), new ByteArrayInputStream(member1));
CountingInputStream countS = new CountingInputStream(s);
GZIPInputStream g = GzipSupport.newGzipInputStream(countS);
CountingInputStream countG = new CountingInputStream(g)) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteStreams.copy(countG, baos);
assertThat(baos.toByteArray()).isEqualTo(data);
assertThat(countG.getCount()).isEqualTo(data.length);
assertThat(countS.getCount()).isEqualTo(totalZippedBytes);
}
}
}