From 1dc7235c10c5a51fb83e892ec6fc316e40c163d8 Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Sat, 6 Jun 2026 20:57:27 +0800 Subject: [PATCH] [mosaic] Clean up reader resources on construction failure Signed-off-by: QuakeWang --- paimon-mosaic/pom.xml | 7 + .../format/mosaic/MosaicRecordsReader.java | 116 +++++++++-- .../mosaic/MosaicRecordsReaderTest.java | 192 ++++++++++++++++++ 3 files changed, 294 insertions(+), 21 deletions(-) create mode 100644 paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java diff --git a/paimon-mosaic/pom.xml b/paimon-mosaic/pom.xml index 6e0d1eae286c..a265af539b80 100644 --- a/paimon-mosaic/pom.xml +++ b/paimon-mosaic/pom.xml @@ -82,5 +82,12 @@ under the License. test-jar test + + + org.mockito + mockito-core + ${mockito.version} + test + diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java index 24cdcbf05c96..43f01c346c19 100644 --- a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java @@ -73,38 +73,65 @@ public MosaicRecordsReader( RowType projectedRowType, @Nullable List predicates, Path filePath) { + this( + inputFileAdapter, + fileSize, + dataSchemaRowType, + projectedRowType, + predicates, + filePath, + new RootAllocator(), + MosaicReader::open); + } + + MosaicRecordsReader( + MosaicInputFileAdapter inputFileAdapter, + long fileSize, + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List predicates, + Path filePath, + BufferAllocator allocator, + NativeReaderOpener nativeReaderOpener) { this.filePath = filePath; this.inputFileAdapter = inputFileAdapter; this.dataSchemaRowType = dataSchemaRowType; this.predicates = predicates; - this.allocator = new RootAllocator(); + this.allocator = allocator; + MosaicReader createdReader = null; + int createdNumRowGroups; + ArrowBatchReader createdArrowBatchReader; try { - this.reader = MosaicReader.open(inputFileAdapter, fileSize, allocator); - } catch (Exception e) { - allocator.close(); - throw e; - } + createdReader = nativeReaderOpener.open(inputFileAdapter, fileSize, allocator); - Schema fileSchema = reader.getSchema(); - Set fileColumnNames = new HashSet<>(); - for (Field field : fileSchema.getFields()) { - fileColumnNames.add(field.getName()); - } - List projectedNames = projectedRowType.getFieldNames(); - List existingColumns = new ArrayList<>(); - for (String name : projectedNames) { - if (fileColumnNames.contains(name)) { - existingColumns.add(name); + Schema fileSchema = createdReader.getSchema(); + Set fileColumnNames = new HashSet<>(); + for (Field field : fileSchema.getFields()) { + fileColumnNames.add(field.getName()); } - } - if (!existingColumns.isEmpty()) { - reader.project(existingColumns.toArray(new String[0])); + List projectedNames = projectedRowType.getFieldNames(); + List existingColumns = new ArrayList<>(); + for (String name : projectedNames) { + if (fileColumnNames.contains(name)) { + existingColumns.add(name); + } + } + if (!existingColumns.isEmpty()) { + createdReader.project(existingColumns.toArray(new String[0])); + } + + createdNumRowGroups = createdReader.numRowGroups(); + createdArrowBatchReader = new ArrowBatchReader(projectedRowType, true); + } catch (Throwable t) { + closeOnConstructionFailure(t, createdReader, allocator, inputFileAdapter); + throw rethrowUnchecked(t); } - this.numRowGroups = reader.numRowGroups(); + this.reader = createdReader; + this.numRowGroups = createdNumRowGroups; this.currentRowGroup = 0; - this.arrowBatchReader = new ArrowBatchReader(projectedRowType, true); + this.arrowBatchReader = createdArrowBatchReader; } @Nullable @@ -211,4 +238,51 @@ public void close() throws IOException { allocator.close(); inputFileAdapter.close(); } + + private static void addSuppressed(Throwable throwable, Throwable suppressed) { + throwable.addSuppressed(suppressed); + } + + private static RuntimeException rethrowUnchecked(Throwable throwable) { + if (throwable instanceof RuntimeException) { + return (RuntimeException) throwable; + } + if (throwable instanceof Error) { + throw (Error) throwable; + } + return new RuntimeException(throwable); + } + + private static void closeOnConstructionFailure( + Throwable throwable, + @Nullable MosaicReader reader, + BufferAllocator allocator, + MosaicInputFileAdapter inputFileAdapter) { + try { + if (reader != null) { + reader.close(); + } + } catch (Throwable t) { + addSuppressed(throwable, t); + } + + try { + allocator.close(); + } catch (Throwable t) { + addSuppressed(throwable, t); + } + + try { + inputFileAdapter.close(); + } catch (Throwable t) { + addSuppressed(throwable, t); + } + } + + @FunctionalInterface + interface NativeReaderOpener { + + MosaicReader open( + MosaicInputFileAdapter inputFileAdapter, long fileSize, BufferAllocator allocator); + } } diff --git a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java new file mode 100644 index 000000000000..e8bc3df8f947 --- /dev/null +++ b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.mosaic.MosaicReader; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.arrow.memory.RootAllocator; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** Test for {@link MosaicRecordsReader}. */ +class MosaicRecordsReaderTest { + + @Test + void testConstructorRuntimeExceptionClosesCreatedResources() throws IOException { + CloseCountingSeekableInputStream inputStream = new CloseCountingSeekableInputStream(); + MosaicInputFileAdapter inputFileAdapter = createInputFileAdapter(inputStream); + CloseCountingRootAllocator allocator = new CloseCountingRootAllocator(); + RuntimeException failure = new RuntimeException("native reader failed"); + + assertThatThrownBy( + () -> + new MosaicRecordsReader( + inputFileAdapter, + 0, + rowType(), + rowType(), + null, + new Path("file:/tmp/mosaic-reader-test"), + allocator, + (inputFile, fileSize, bufferAllocator) -> { + throw failure; + })) + .isSameAs(failure); + + assertThat(allocator.closeCount()).isEqualTo(1); + assertThat(inputStream.closeCount()).isEqualTo(1); + } + + @Test + void testConstructorErrorClosesCreatedResources() throws IOException { + CloseCountingSeekableInputStream inputStream = new CloseCountingSeekableInputStream(); + MosaicInputFileAdapter inputFileAdapter = createInputFileAdapter(inputStream); + CloseCountingRootAllocator allocator = new CloseCountingRootAllocator(); + UnsatisfiedLinkError failure = new UnsatisfiedLinkError("native library failed"); + + assertThatThrownBy( + () -> + new MosaicRecordsReader( + inputFileAdapter, + 0, + rowType(), + rowType(), + null, + new Path("file:/tmp/mosaic-reader-test"), + allocator, + (inputFile, fileSize, bufferAllocator) -> { + throw failure; + })) + .isSameAs(failure); + + assertThat(allocator.closeCount()).isEqualTo(1); + assertThat(inputStream.closeCount()).isEqualTo(1); + } + + @Test + void testConstructorFailureAfterReaderCreatedClosesReaderAndOtherResources() + throws IOException { + CloseCountingSeekableInputStream inputStream = new CloseCountingSeekableInputStream(); + MosaicInputFileAdapter inputFileAdapter = createInputFileAdapter(inputStream); + CloseCountingRootAllocator allocator = new CloseCountingRootAllocator(); + MosaicReader reader = mock(MosaicReader.class); + RuntimeException failure = new RuntimeException("schema failed"); + doThrow(failure).when(reader).getSchema(); + + assertThatThrownBy( + () -> + new MosaicRecordsReader( + inputFileAdapter, + 0, + rowType(), + rowType(), + null, + new Path("file:/tmp/mosaic-reader-test"), + allocator, + (inputFile, fileSize, bufferAllocator) -> reader)) + .isSameAs(failure); + + verify(reader).close(); + assertThat(allocator.closeCount()).isEqualTo(1); + assertThat(inputStream.closeCount()).isEqualTo(1); + } + + private static MosaicInputFileAdapter createInputFileAdapter( + CloseCountingSeekableInputStream inputStream) throws IOException { + return new MosaicInputFileAdapter( + new CloseCountingFileIO(inputStream), new Path("file:/tmp/mosaic-reader-test")); + } + + private static RowType rowType() { + return DataTypes.ROW(DataTypes.INT()); + } + + private static class CloseCountingFileIO extends LocalFileIO { + + private final CloseCountingSeekableInputStream inputStream; + + private CloseCountingFileIO(CloseCountingSeekableInputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public SeekableInputStream newInputStream(Path path) { + return inputStream; + } + } + + private static class CloseCountingSeekableInputStream extends SeekableInputStream { + + private int closeCount; + + @Override + public void seek(long desired) {} + + @Override + public long getPos() { + return 0; + } + + @Override + public int read(byte[] b, int off, int len) { + return -1; + } + + @Override + public int read() { + return -1; + } + + @Override + public void close() { + closeCount++; + } + + int closeCount() { + return closeCount; + } + } + + private static class CloseCountingRootAllocator extends RootAllocator { + + private int closeCount; + + @Override + public void close() { + closeCount++; + super.close(); + } + + int closeCount() { + return closeCount; + } + } +}