From 4121b1a4291ac08d759bcf3fd5173986cf9e294d Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 1 Jul 2026 11:05:13 +0800 Subject: [PATCH] branch-4.1: [fix](iceberg) Fix NPE in COUNT(*) pushdown when snapshot summary omits total-* counters (#64648) Backport of #64648 to branch-4.1. IcebergScanNode.getCountFromSnapshot() read total-equality-deletes / total-position-deletes / total-records from the snapshot summary and called .equals() / Long.parseLong() directly on the Map.get() results. An Iceberg snapshot summary is not guaranteed to carry these counters (snapshots written by compaction/replace, or some writers, may omit them), so `SELECT COUNT(*)` threw a NullPointerException on such tables while `SELECT *` worked fine. Extract the summary parsing into a pure static IcebergUtils.getCountFromSummary() that null-checks the counters and falls back to a normal scan (returns -1) when any is absent, and reuse it from both IcebergScanNode and IcebergUtils row-count estimation. On the BE side, IcebergTableReader::init_row_filters() now accepts a table-level row count of 0 (>= 0 instead of > 0) so a genuine pushed-down count of 0 takes the CountReader fast path. The BE change is applied to branch-4.1's be/src/format/table/iceberg_reader.cpp, since master's iceberg_reader_mixin.h does not exist on this branch. Co-Authored-By: Raghvendra Singh Co-Authored-By: Claude Opus 4.8 (1M context) --- be/src/format/table/iceberg_reader.cpp | 8 +- .../datasource/iceberg/IcebergUtils.java | 35 +++++++- .../iceberg/source/IcebergScanNode.java | 19 +--- .../source/IcebergCountPushDownTest.java | 86 +++++++++++++++++++ 4 files changed, 127 insertions(+), 21 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java diff --git a/be/src/format/table/iceberg_reader.cpp b/be/src/format/table/iceberg_reader.cpp index aaf80dc9a2f290..a6647770aadc39 100644 --- a/be/src/format/table/iceberg_reader.cpp +++ b/be/src/format/table/iceberg_reader.cpp @@ -199,8 +199,12 @@ Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, } Status IcebergTableReader::init_row_filters() { - // We get the count value by doris's be, so we don't need to read the delete file - if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) { + // We get the count value by doris's be, so we don't need to read the delete file. + // A table-level row count of 0 (e.g. an all-deleted table read with ignore_iceberg_dangling_delete, + // where total-records == total-position-deletes) is still a valid pushed-down count, so accept >= 0. + // FE sends -1 when there is no table-level count; using > 0 here would drop a genuine 0 into the + // delete-applying path below and never produce the intended CountReader(0). + if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count >= 0) { return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index df4adead31bc5c..676dfd258f1cb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -1063,6 +1063,34 @@ public static List parseSchema(Schema schema, boolean enableMappingVarbi return resSchema; } + /** + * Decide whether a row count can be read from an Iceberg snapshot summary. + * Returns {@link TableIf#UNKNOWN_ROW_COUNT} when required counters are absent + * or when delete semantics make the summary count unsafe to use. + */ + @VisibleForTesting + public static long getCountFromSummary(Map summary, boolean ignoreDanglingDelete) { + String equalityDeletes = summary.get(TOTAL_EQUALITY_DELETES); + String positionDeletes = summary.get(TOTAL_POSITION_DELETES); + String totalRecords = summary.get(TOTAL_RECORDS); + if (equalityDeletes == null || positionDeletes == null || totalRecords == null) { + return TableIf.UNKNOWN_ROW_COUNT; + } + if (!equalityDeletes.equals("0")) { + return TableIf.UNKNOWN_ROW_COUNT; + } + + long deleteCount = Long.parseLong(positionDeletes); + if (deleteCount == 0) { + return Long.parseLong(totalRecords); + } + if (ignoreDanglingDelete) { + return Long.parseLong(totalRecords) - deleteCount; + } else { + return TableIf.UNKNOWN_ROW_COUNT; + } + } + /** * Estimate iceberg table row count. * Get the row count by adding all task file recordCount. @@ -1082,7 +1110,12 @@ public static long getIcebergRowCount(ExternalTable tbl) { return TableIf.UNKNOWN_ROW_COUNT; } Map summary = snapshot.summary(); - long rows = Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); + long rows = getCountFromSummary(summary, true); + if (rows == TableIf.UNKNOWN_ROW_COUNT) { + LOG.info("Iceberg table {}.{}.{} row count in summary is unknown, return -1.", + tbl.getCatalog().getName(), tbl.getDbName(), tbl.getName()); + return TableIf.UNKNOWN_ROW_COUNT; + } LOG.info("Iceberg table {}.{}.{} row count in summary is {}", tbl.getCatalog().getName(), tbl.getDbName(), tbl.getName(), rows); return rows; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index e8763a09ca0761..bb1110687fc459 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -1067,24 +1067,7 @@ public long getCountFromSnapshot() throws UserException { return 0; } - Map summary = snapshot.summary(); - if (!summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) { - // has equality delete files, can not push down count - return -1; - } - - long deleteCount = Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES)); - if (deleteCount == 0) { - // no delete files, can push down count directly - return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)); - } - if (sessionVariable.ignoreIcebergDanglingDelete) { - // has position delete files, if we ignore dangling delete, can push down count - return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)) - deleteCount; - } else { - // otherwise, can not push down count - return -1; - } + return IcebergUtils.getCountFromSummary(snapshot.summary(), sessionVariable.ignoreIcebergDanglingDelete); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java new file mode 100644 index 00000000000000..19ad6bece44ed5 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.source; + +import org.apache.doris.datasource.iceberg.IcebergUtils; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +public class IcebergCountPushDownTest { + + private static Map summary(String equalityDeletes, String positionDeletes, String totalRecords) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + if (equalityDeletes != null) { + builder.put(IcebergUtils.TOTAL_EQUALITY_DELETES, equalityDeletes); + } + if (positionDeletes != null) { + builder.put(IcebergUtils.TOTAL_POSITION_DELETES, positionDeletes); + } + if (totalRecords != null) { + builder.put(IcebergUtils.TOTAL_RECORDS, totalRecords); + } + return builder.build(); + } + + @Test + public void testMissingCounterFallsBackToScan() { + // Snapshots written by compaction/replace (and some writers) may omit + // total-* counters. The pushdown previously NPE'd on the missing key; + // it must now fall back to a normal scan (return -1). + Assertions.assertEquals(-1L, IcebergUtils.getCountFromSummary(summary(null, "0", "100"), false)); + Assertions.assertEquals(-1L, IcebergUtils.getCountFromSummary(summary("0", null, "100"), false)); + Assertions.assertEquals(-1L, IcebergUtils.getCountFromSummary(summary("0", "0", null), false)); + Assertions.assertEquals(-1L, IcebergUtils.getCountFromSummary(Collections.emptyMap(), false)); + } + + @Test + public void testUtilityMissingCounterReturnsUnknownCount() { + Assertions.assertEquals(-1L, IcebergUtils.getCountFromSummary(summary("0", null, "100"), true)); + } + + @Test + public void testNoDeletesPushesDownTotalRecords() { + Assertions.assertEquals(100L, IcebergUtils.getCountFromSummary(summary("0", "0", "100"), false)); + } + + @Test + public void testEqualityDeletesCannotPushDown() { + Assertions.assertEquals(-1L, IcebergUtils.getCountFromSummary(summary("3", "0", "100"), false)); + } + + @Test + public void testPositionDeletesRespectIgnoreDangling() { + // ignoreDanglingDelete = true -> total-records minus position-deletes + Assertions.assertEquals(90L, IcebergUtils.getCountFromSummary(summary("0", "10", "100"), true)); + // ignoreDanglingDelete = false -> cannot push down (fall back to scan) + Assertions.assertEquals(-1L, IcebergUtils.getCountFromSummary(summary("0", "10", "100"), false)); + } + + @Test + public void testZeroCountWithPositionDeletesIsPushedDown() { + // total-records == position-deletes -> count is 0. With ignore_iceberg_dangling_delete this + // is a valid pushed-down count; FE returns 0 and BE honors it via CountReader(0) (the BE + // table-level guard accepts table_level_row_count >= 0). It must NOT fall back to -1. + Assertions.assertEquals(0L, IcebergUtils.getCountFromSummary(summary("0", "100", "100"), true)); + } +}