Skip to content

[FLINK-39533][s3] Use abort() instead of drain on close/seek when remaining bytes exceed threshold in NativeS3InputStream#28012

Open
Samrat002 wants to merge 1 commit intoapache:masterfrom
Samrat002:FLINK-39533
Open

[FLINK-39533][s3] Use abort() instead of drain on close/seek when remaining bytes exceed threshold in NativeS3InputStream#28012
Samrat002 wants to merge 1 commit intoapache:masterfrom
Samrat002:FLINK-39533

Conversation

@Samrat002
Copy link
Copy Markdown
Contributor

What is the purpose of the change

NativeS3InputStream calls ResponseInputStream.close() when releasing streams during seek(), skip(), and close() operations. Apache HttpClient's close() implementation
drains all remaining bytes from the response body to enable HTTP connection reuse. For large S3 objects where only a small portion was read (e.g., checkpoint metadata from a
multi-GB state file), this drains potentially gigabytes of data over the network — causing severe latency during checkpoint restore and seek-heavy read patterns.

The AWS SDK v2 ResponseInputStream JavaDoc explicitly recommends
calling abort() when remaining data is not needed. This PR replaces close() with abort() in the stream release path.

Brief change log

  • Added releaseStream() method to NativeS3InputStream that calls abort() instead of close() on the underlying ResponseInputStream, and drops the BufferedInputStream
    wrapper without closing it (closing would delegate to the drain path)

  • openStreamAtCurrentPosition() and close() now use releaseStream() for stream cleanup

  • Added NativeS3InputStreamTest with 8 tests covering abort lifecycle, data correctness, position tracking, and error paths

    Verifying this change

    This change added tests and can be verified as follows:

  • Unit Test

  • Manually validated end-to-end on a local Flink 2.3-SNAPSHOT cluster with a stateful job writing checkpoints (up to 199MB) to S3, triggering a savepoint, restoring from it, and confirming checkpoints completed successfully after restore with zero S3/stream errors

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): no
    • The public API, i.e., is any changed class annotated with @Public(Evolving): no
    • The serializers: no
    • The runtime per-record code paths (performance sensitive): no
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
    • The S3 file system connector: yes

    Documentation

    • Does this pull request introduce a new feature? no
    • If yes, how is the feature documented? not applicable

    Was generative AI tooling used to co-author this PR?
    • [] Yes (please specify the tool below)

@Samrat002
Copy link
Copy Markdown
Contributor Author

cc: @gaborgsomogyi

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 23, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

*/
private void releaseStream() {
// Drop the wrapper without closing it; closing would trigger the drain path.
bufferedStream = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes sure that system resources which are normally freed in close() will be handled properly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the revised approach, both bufferedStream.close() and currentStream.close() are still called. The abort() call placed before them terminates the underlying HTTP connection, so when BufferedInputStream.close() delegates to ResponseInputStream.close(), the connection is already dead, and no drain occurs. BufferedInputStream itself holds only a byte[] heap buffer with no native resources. The JVM GCs it upon dereferencing. The currentStream.close() call handles any remaining SDK resource cleanup (connection pool return, etc.) after the abort.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test that somehow? I mean missing this can cause quite some leaks

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, tried adding a test for it.

TrackingInputStream now tracks three things: wasAborted, wasClosed, and wasAbortedBeforeClose (set inside close() based on whether abort already ran).

New test closeAbortsAndThenClosesUnderlyingStream asserts all three after the stream is closed, and I've applied the same three assertions on the seek() and skip() paths too, since those also swap out the stream mid-life.

The three together catch both failure modes:

  1. drop close() → wasClosed fails (connection pool leak)
  2. reverse the order → wasAbortedBeforeClose fails (drain regression)
  3. drop abort() → wasAborted fails

So any future refactor that reintroduces the leak will break exactly one of them.

byte[] tail = new byte[20];
assertThat(in.read(tail, 0, 20)).isEqualTo(6);
assertThat(in.getPos()).isEqualTo(256);
// read past EOF
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK hadoop is throwing such case, isn't it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hadoop's S3AInputStream.seek() throws EOFException for negative positions with message "Cannot seek to a negative offset". here the implementation throws IOException with message "Cannot seek to negative position: ", which matches the Hadoop contract. The test verifies isInstanceOf(IOException.class),
so it covers both.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean more like it throws EOFException when in.read() called but no data

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dug into S3AInputStream and the FSDataInputStream JavaDoc "Can't seek past the end of the stream" and aligned the implementation accordingly. Three changes:

  1. seek() now throws EOFException, not bare IOException
  2. Added more to existing negative-seek assertion so it cannot regress to the more permissive IOException
  3. new tests seekPastEofThrowsEofException and readAtEofReturnsMinusOne

Comment on lines 121 to 138
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're hanging around can we just collapse it into one or more function? This pattern is down below with some tiny diffs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made changes PTAL

* @see ResponseInputStream#abort()
*/
private void abortCurrentStream() {
if (currentStream != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If currentStream is guarded then the function must guard it. Otherwise a simple lock move from upper call will break things silently

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — @GuardedBy("lock") is only a static-analysis hint and doesn't actually enforce anything at runtime, so your "silent break on a lock move" scenario was real.

I've added a runtime precondition at the top of both @GuardedBy helpers (abortCurrentStream() and releaseStreams())

…aining bytes exceed threshold in NativeS3InputStream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants