diff --git a/paimon-mosaic/pom.xml b/paimon-mosaic/pom.xml
index 6e0d1eae286c..a265af539b80 100644
--- a/paimon-mosaic/pom.xml
+++ b/paimon-mosaic/pom.xml
@@ -82,5 +82,12 @@ under the License.
test-jar
test
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
index 24cdcbf05c96..43f01c346c19 100644
--- a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
+++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
@@ -73,38 +73,65 @@ public MosaicRecordsReader(
RowType projectedRowType,
@Nullable List predicates,
Path filePath) {
+ this(
+ inputFileAdapter,
+ fileSize,
+ dataSchemaRowType,
+ projectedRowType,
+ predicates,
+ filePath,
+ new RootAllocator(),
+ MosaicReader::open);
+ }
+
+ MosaicRecordsReader(
+ MosaicInputFileAdapter inputFileAdapter,
+ long fileSize,
+ RowType dataSchemaRowType,
+ RowType projectedRowType,
+ @Nullable List predicates,
+ Path filePath,
+ BufferAllocator allocator,
+ NativeReaderOpener nativeReaderOpener) {
this.filePath = filePath;
this.inputFileAdapter = inputFileAdapter;
this.dataSchemaRowType = dataSchemaRowType;
this.predicates = predicates;
- this.allocator = new RootAllocator();
+ this.allocator = allocator;
+ MosaicReader createdReader = null;
+ int createdNumRowGroups;
+ ArrowBatchReader createdArrowBatchReader;
try {
- this.reader = MosaicReader.open(inputFileAdapter, fileSize, allocator);
- } catch (Exception e) {
- allocator.close();
- throw e;
- }
+ createdReader = nativeReaderOpener.open(inputFileAdapter, fileSize, allocator);
- Schema fileSchema = reader.getSchema();
- Set fileColumnNames = new HashSet<>();
- for (Field field : fileSchema.getFields()) {
- fileColumnNames.add(field.getName());
- }
- List projectedNames = projectedRowType.getFieldNames();
- List existingColumns = new ArrayList<>();
- for (String name : projectedNames) {
- if (fileColumnNames.contains(name)) {
- existingColumns.add(name);
+ Schema fileSchema = createdReader.getSchema();
+ Set fileColumnNames = new HashSet<>();
+ for (Field field : fileSchema.getFields()) {
+ fileColumnNames.add(field.getName());
}
- }
- if (!existingColumns.isEmpty()) {
- reader.project(existingColumns.toArray(new String[0]));
+ List projectedNames = projectedRowType.getFieldNames();
+ List existingColumns = new ArrayList<>();
+ for (String name : projectedNames) {
+ if (fileColumnNames.contains(name)) {
+ existingColumns.add(name);
+ }
+ }
+ if (!existingColumns.isEmpty()) {
+ createdReader.project(existingColumns.toArray(new String[0]));
+ }
+
+ createdNumRowGroups = createdReader.numRowGroups();
+ createdArrowBatchReader = new ArrowBatchReader(projectedRowType, true);
+ } catch (Throwable t) {
+ closeOnConstructionFailure(t, createdReader, allocator, inputFileAdapter);
+ throw rethrowUnchecked(t);
}
- this.numRowGroups = reader.numRowGroups();
+ this.reader = createdReader;
+ this.numRowGroups = createdNumRowGroups;
this.currentRowGroup = 0;
- this.arrowBatchReader = new ArrowBatchReader(projectedRowType, true);
+ this.arrowBatchReader = createdArrowBatchReader;
}
@Nullable
@@ -211,4 +238,51 @@ public void close() throws IOException {
allocator.close();
inputFileAdapter.close();
}
+
+ private static void addSuppressed(Throwable throwable, Throwable suppressed) {
+ throwable.addSuppressed(suppressed);
+ }
+
+ private static RuntimeException rethrowUnchecked(Throwable throwable) {
+ if (throwable instanceof RuntimeException) {
+ return (RuntimeException) throwable;
+ }
+ if (throwable instanceof Error) {
+ throw (Error) throwable;
+ }
+ return new RuntimeException(throwable);
+ }
+
+ private static void closeOnConstructionFailure(
+ Throwable throwable,
+ @Nullable MosaicReader reader,
+ BufferAllocator allocator,
+ MosaicInputFileAdapter inputFileAdapter) {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (Throwable t) {
+ addSuppressed(throwable, t);
+ }
+
+ try {
+ allocator.close();
+ } catch (Throwable t) {
+ addSuppressed(throwable, t);
+ }
+
+ try {
+ inputFileAdapter.close();
+ } catch (Throwable t) {
+ addSuppressed(throwable, t);
+ }
+ }
+
+ @FunctionalInterface
+ interface NativeReaderOpener {
+
+ MosaicReader open(
+ MosaicInputFileAdapter inputFileAdapter, long fileSize, BufferAllocator allocator);
+ }
}
diff --git a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
new file mode 100644
index 000000000000..e8bc3df8f947
--- /dev/null
+++ b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.mosaic.MosaicReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/** Test for {@link MosaicRecordsReader}. */
+class MosaicRecordsReaderTest {
+
+ @Test
+ void testConstructorRuntimeExceptionClosesCreatedResources() throws IOException {
+ CloseCountingSeekableInputStream inputStream = new CloseCountingSeekableInputStream();
+ MosaicInputFileAdapter inputFileAdapter = createInputFileAdapter(inputStream);
+ CloseCountingRootAllocator allocator = new CloseCountingRootAllocator();
+ RuntimeException failure = new RuntimeException("native reader failed");
+
+ assertThatThrownBy(
+ () ->
+ new MosaicRecordsReader(
+ inputFileAdapter,
+ 0,
+ rowType(),
+ rowType(),
+ null,
+ new Path("file:/tmp/mosaic-reader-test"),
+ allocator,
+ (inputFile, fileSize, bufferAllocator) -> {
+ throw failure;
+ }))
+ .isSameAs(failure);
+
+ assertThat(allocator.closeCount()).isEqualTo(1);
+ assertThat(inputStream.closeCount()).isEqualTo(1);
+ }
+
+ @Test
+ void testConstructorErrorClosesCreatedResources() throws IOException {
+ CloseCountingSeekableInputStream inputStream = new CloseCountingSeekableInputStream();
+ MosaicInputFileAdapter inputFileAdapter = createInputFileAdapter(inputStream);
+ CloseCountingRootAllocator allocator = new CloseCountingRootAllocator();
+ UnsatisfiedLinkError failure = new UnsatisfiedLinkError("native library failed");
+
+ assertThatThrownBy(
+ () ->
+ new MosaicRecordsReader(
+ inputFileAdapter,
+ 0,
+ rowType(),
+ rowType(),
+ null,
+ new Path("file:/tmp/mosaic-reader-test"),
+ allocator,
+ (inputFile, fileSize, bufferAllocator) -> {
+ throw failure;
+ }))
+ .isSameAs(failure);
+
+ assertThat(allocator.closeCount()).isEqualTo(1);
+ assertThat(inputStream.closeCount()).isEqualTo(1);
+ }
+
+ @Test
+ void testConstructorFailureAfterReaderCreatedClosesReaderAndOtherResources()
+ throws IOException {
+ CloseCountingSeekableInputStream inputStream = new CloseCountingSeekableInputStream();
+ MosaicInputFileAdapter inputFileAdapter = createInputFileAdapter(inputStream);
+ CloseCountingRootAllocator allocator = new CloseCountingRootAllocator();
+ MosaicReader reader = mock(MosaicReader.class);
+ RuntimeException failure = new RuntimeException("schema failed");
+ doThrow(failure).when(reader).getSchema();
+
+ assertThatThrownBy(
+ () ->
+ new MosaicRecordsReader(
+ inputFileAdapter,
+ 0,
+ rowType(),
+ rowType(),
+ null,
+ new Path("file:/tmp/mosaic-reader-test"),
+ allocator,
+ (inputFile, fileSize, bufferAllocator) -> reader))
+ .isSameAs(failure);
+
+ verify(reader).close();
+ assertThat(allocator.closeCount()).isEqualTo(1);
+ assertThat(inputStream.closeCount()).isEqualTo(1);
+ }
+
+ private static MosaicInputFileAdapter createInputFileAdapter(
+ CloseCountingSeekableInputStream inputStream) throws IOException {
+ return new MosaicInputFileAdapter(
+ new CloseCountingFileIO(inputStream), new Path("file:/tmp/mosaic-reader-test"));
+ }
+
+ private static RowType rowType() {
+ return DataTypes.ROW(DataTypes.INT());
+ }
+
+ private static class CloseCountingFileIO extends LocalFileIO {
+
+ private final CloseCountingSeekableInputStream inputStream;
+
+ private CloseCountingFileIO(CloseCountingSeekableInputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) {
+ return inputStream;
+ }
+ }
+
+ private static class CloseCountingSeekableInputStream extends SeekableInputStream {
+
+ private int closeCount;
+
+ @Override
+ public void seek(long desired) {}
+
+ @Override
+ public long getPos() {
+ return 0;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ return -1;
+ }
+
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public void close() {
+ closeCount++;
+ }
+
+ int closeCount() {
+ return closeCount;
+ }
+ }
+
+ private static class CloseCountingRootAllocator extends RootAllocator {
+
+ private int closeCount;
+
+ @Override
+ public void close() {
+ closeCount++;
+ super.close();
+ }
+
+ int closeCount() {
+ return closeCount;
+ }
+ }
+}