From 6c72cff24486f249e9e1c35d24015e155f05923e Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 9 Jun 2026 22:14:28 +0800 Subject: [PATCH 1/4] fix iceberg rename --- native-engine/auron-planner/proto/auron.proto | 2 + native-engine/auron-planner/src/lib.rs | 46 ++++---- .../datafusion-ext-plans/src/scan/mod.rs | 17 ++- .../spark/sql/auron/NativeConverters.scala | 14 ++- .../auron/plan/NativeGenerateBase.scala | 2 +- .../spark/source/AuronIcebergSourceUtil.scala | 64 +++++++++++ .../auron/iceberg/IcebergScanSupport.scala | 28 ++++- .../plan/NativeIcebergTableScanExec.scala | 3 +- .../AuronIcebergIntegrationSuite.scala | 100 ++++++++++++++++++ 9 files changed, 241 insertions(+), 35 deletions(-) diff --git a/native-engine/auron-planner/proto/auron.proto b/native-engine/auron-planner/proto/auron.proto index ab471fbb2..232fe5c4d 100644 --- a/native-engine/auron-planner/proto/auron.proto +++ b/native-engine/auron-planner/proto/auron.proto @@ -809,6 +809,8 @@ message Field { bool nullable = 3; // for complex data types like structs, unions repeated Field children = 4; + // Iceberg/Parquet field id. Zero means unset. + int32 field_id = 5; } message FixedSizeBinary { diff --git a/native-engine/auron-planner/src/lib.rs b/native-engine/auron-planner/src/lib.rs index a0f7b83d2..47291efbe 100644 --- a/native-engine/auron-planner/src/lib.rs +++ b/native-engine/auron-planner/src/lib.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema, TimeUnit}; use datafusion::{common::JoinSide, logical_expr::Operator, scalar::ScalarValue}; @@ -406,17 +406,29 @@ impl TryInto for &Box { impl TryInto for &protobuf::Field { type Error = PlanSerDeError; fn try_into(self) -> Result { - let pb_datatype = self.arrow_type.as_ref().ok_or_else(|| { - proto_error( - "Protobuf deserialization error: Field message missing required field 'arrow_type'", - ) - })?; + build_arrow_field(self) + } +} + +fn build_arrow_field(field: &protobuf::Field) -> Result { + let pb_datatype = field.arrow_type.as_ref().ok_or_else(|| { + proto_error( + "Protobuf deserialization error: Field message missing required field 'arrow_type'", + ) + })?; + let arrow_field = Field::new( + field.name.as_str(), + pb_datatype.as_ref().try_into()?, + field.nullable, + ); - Ok(Field::new( - self.name.as_str(), - pb_datatype.as_ref().try_into()?, - self.nullable, - )) + if field.field_id == 0 { + Ok(arrow_field) + } else { + Ok(arrow_field.with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + field.field_id.to_string(), + )]))) } } @@ -427,17 +439,7 @@ impl TryInto for &protobuf::Schema { let fields = self .columns .iter() - .map(|c| { - let pb_arrow_type_res = c - .arrow_type - .as_ref() - .ok_or_else(|| proto_error("Protobuf deserialization error: Field message was missing required field 'arrow_type'")); - let pb_arrow_type: &protobuf::ArrowType = match pb_arrow_type_res { - Ok(res) => res, - Err(e) => return Err(e), - }; - Ok(Field::new(&c.name, pb_arrow_type.try_into()?, c.nullable)) - }) + .map(build_arrow_field) .collect::, _>>()?; Ok(Schema::new(fields)) } diff --git a/native-engine/datafusion-ext-plans/src/scan/mod.rs b/native-engine/datafusion-ext-plans/src/scan/mod.rs index 3d7899acd..f3d0790af 100644 --- a/native-engine/datafusion-ext-plans/src/scan/mod.rs +++ b/native-engine/datafusion-ext-plans/src/scan/mod.rs @@ -57,11 +57,10 @@ impl SchemaAdapter for AuronSchemaAdapter { fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { let field = self.table_schema.field(index); - // use case insensitive matching file_schema .fields() .iter() - .position(|f| f.name().eq_ignore_ascii_case(field.name())) + .position(|file_field| fields_match(field, file_field)) } fn map_schema(&self, file_schema: &Schema) -> Result<(Arc, Vec)> { @@ -73,7 +72,7 @@ impl SchemaAdapter for AuronSchemaAdapter { .table_schema .fields() .iter() - .position(|f| f.name().eq_ignore_ascii_case(file_field.name())) + .position(|table_field| fields_match(table_field, file_field)) { field_mappings[table_idx] = Some(projection.len()); projection.push(file_idx); @@ -89,6 +88,18 @@ impl SchemaAdapter for AuronSchemaAdapter { } } +const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id"; + +fn fields_match(table_field: &Field, file_field: &Field) -> bool { + match table_field.metadata().get(PARQUET_FIELD_ID_META_KEY) { + Some(table_field_id) => file_field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .is_some_and(|file_field_id| file_field_id == table_field_id), + None => table_field.name().eq_ignore_ascii_case(file_field.name()), + } +} + pub fn create_auron_schema_mapper( table_schema: &SchemaRef, field_mappings: &[Option], diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala index dbe1781ee..9ca6932c4 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala @@ -216,18 +216,22 @@ object NativeConverters extends Logging { arrowTypeBuilder.build() } - def convertField(sparkField: StructField): pb.Field = { - pb.Field + def convertField(sparkField: StructField, fieldId: Option[Int] = None): pb.Field = { + val fieldBuilder = pb.Field .newBuilder() .setName(sparkField.name) .setNullable(sparkField.nullable) .setArrowType(convertDataType(sparkField.dataType)) - .build() + fieldId.foreach(fieldBuilder.setFieldId) + fieldBuilder.build() } - def convertSchema(sparkSchema: StructType): pb.Schema = { + def convertSchema( + sparkSchema: StructType, + fieldIdsByName: Map[String, Int] = Map.empty): pb.Schema = { val schemaBuilder = pb.Schema.newBuilder() - sparkSchema.foreach(sparkField => schemaBuilder.addColumns(convertField(sparkField))) + sparkSchema.foreach(sparkField => + schemaBuilder.addColumns(convertField(sparkField, fieldIdsByName.get(sparkField.name)))) schemaBuilder.build() } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala index cea3f1294..a97f5291b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala @@ -118,7 +118,7 @@ abstract class NativeGenerateBase( } private def nativeGeneratorOutput = - Util.getSchema(generatorOutput).map(NativeConverters.convertField) + Util.getSchema(generatorOutput).map(field => NativeConverters.convertField(field)) private def nativeRequiredChildOutput = Util.getSchema(requiredChildOutput).map(_.name) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala index 6b04f16a3..e26e711c0 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/iceberg/spark/source/AuronIcebergSourceUtil.scala @@ -16,8 +16,12 @@ */ package org.apache.iceberg.spark.source +import scala.collection.JavaConverters._ + object AuronIcebergSourceUtil { + final case class RenameOrDrop(topLevel: Boolean, nested: Boolean) + def getClassOfSparkBatchQueryScan(): Class[SparkBatchQueryScan] = { classOf[SparkBatchQueryScan] } @@ -25,4 +29,64 @@ object AuronIcebergSourceUtil { def getClassOfSparkInputPartition(): Class[SparkInputPartition] = { classOf[SparkInputPartition] } + + def expectedFieldIds(scan: AnyRef): Map[String, Int] = { + val expectedSchema = asBatchQueryScan(scan).expectedSchema() + expectedSchema.columns().asScala.map(field => field.name() -> field.fieldId()).toMap + } + + def detectRenameOrDrop(scan: AnyRef): RenameOrDrop = { + val table = asBatchQueryScan(scan).table() + val currentFields = collectFieldIdToName(table.schema()) + + table + .schemas() + .values() + .asScala + .foldLeft(RenameOrDrop(topLevel = false, nested = false)) { (result, schema) => + collectFieldIdToName(schema).foldLeft(result) { + case (currentResult, (fieldId, historicalField)) => + currentFields.get(fieldId) match { + case Some(currentField) if currentField.name != historicalField.name => + if (historicalField.topLevel || currentField.topLevel) { + currentResult.copy(topLevel = true) + } else { + currentResult.copy(nested = true) + } + case None => + if (historicalField.topLevel) { + currentResult.copy(topLevel = true) + } else { + currentResult.copy(nested = true) + } + case _ => + currentResult + } + } + } + } + + final private case class FieldIdentity(name: String, topLevel: Boolean) + + private def collectFieldIdToName(schema: org.apache.iceberg.Schema): Map[Int, FieldIdentity] = { + def collect( + fields: Seq[org.apache.iceberg.types.Types.NestedField], + topLevel: Boolean): Seq[(Int, FieldIdentity)] = { + fields.flatMap { field => + val current = field.fieldId() -> FieldIdentity(field.name(), topLevel) + val nested = + if (field.`type`().isNestedType) { + collect(field.`type`().asNestedType().fields().asScala.toSeq, topLevel = false) + } else { + Seq.empty + } + current +: nested + } + } + + collect(schema.columns().asScala.toSeq, topLevel = true).toMap + } + + private def asBatchQueryScan(scan: AnyRef): SparkBatchQueryScan = + scan.asInstanceOf[SparkBatchQueryScan] } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 6e332fd5e..2e6dec5f7 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -41,7 +41,8 @@ final case class IcebergScanPlan( readSchema: StructType, fileSchema: StructType, partitionSchema: StructType, - pruningPredicates: Seq[pb.PhysicalExprNode]) + pruningPredicates: Seq[pb.PhysicalExprNode], + fieldIdsByName: Map[String, Int]) object IcebergScanSupport extends Logging { @@ -75,6 +76,22 @@ object IcebergScanSupport extends Logging { partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), "Has unsupported schema type.") + val (renameOrDrop, fieldIdsByName) = + try { + ( + AuronIcebergSourceUtil.detectRenameOrDrop(scan.asInstanceOf[AnyRef]), + AuronIcebergSourceUtil.expectedFieldIds(scan.asInstanceOf[AnyRef])) + } catch { + case NonFatal(t) => + logWarning(s"Failed to inspect Iceberg field ids for $scanClassName.", t) + return None + } + assert(!renameOrDrop.nested, "Nested Iceberg rename or drop is not supported.") + + assert( + fileSchema.fields.forall(field => fieldIdsByName.contains(field.name)), + "Failed to find field ids for all Iceberg data columns.") + val partitions = inputPartitions(exec) // Empty scan (e.g. empty table) should still build a plan to return no rows. if (partitions.isEmpty) { @@ -86,7 +103,8 @@ object IcebergScanSupport extends Logging { readSchema, fileSchema, partitionSchema, - Seq.empty)) + Seq.empty, + fieldIdsByName)) } val icebergPartitions = partitions.flatMap(icebergPartition) @@ -110,6 +128,9 @@ object IcebergScanSupport extends Logging { assert( !(format != FileFormat.PARQUET && format != FileFormat.ORC), "Only support parquet or orc.") + assert( + !(format == FileFormat.ORC && renameOrDrop.topLevel), + "Iceberg ORC rename or drop is not supported.") val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema) Some( @@ -119,7 +140,8 @@ object IcebergScanSupport extends Logging { readSchema, fileSchema, partitionSchema, - pruningPredicates)) + pruningPredicates, + fieldIdsByName)) } private def collectUnsupportedMetadataColumns(schema: StructType): Seq[String] = diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index d53aa690a..7ab27a118 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -74,7 +74,8 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca private lazy val fileSizes: Map[String, Long] = buildFileSizes() private lazy val fileSpecIds: Map[String, Int] = buildFileSpecIds() - private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema) + private lazy val nativeFileSchema: pb.Schema = + NativeConverters.convertSchema(fileSchema, plan.fieldIdsByName) private lazy val nativePartitionSchema: pb.Schema = NativeConverters.convertSchema(partitionSchema) diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 88fc22004..17ab7d5ba 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -190,6 +190,106 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native parquet scan reads top-level renamed columns by field id") { + withTable("local.db.t_rename") { + sql("create table local.db.t_rename (id int, old_name string) using iceberg") + sql("insert into local.db.t_rename values (1, 'before')") + sql("alter table local.db.t_rename rename column old_name to new_name") + sql("insert into local.db.t_rename values (2, 'after')") + + val df = sql("select id, new_name from local.db.t_rename order by id") + checkAnswer(df, Seq(Row(1, "before"), Row(2, "after"))) + assert(df.queryExecution.executedPlan.toString().contains("NativeIcebergTableScan")) + } + } + + test("iceberg native parquet scan does not reuse a dropped field id for an added column") { + withTable("local.db.t_drop_add") { + sql("create table local.db.t_drop_add (id int, value string) using iceberg") + sql("insert into local.db.t_drop_add values (1, 'old')") + sql("alter table local.db.t_drop_add drop column value") + sql("alter table local.db.t_drop_add add column value string") + sql("insert into local.db.t_drop_add values (2, 'new')") + + val df = sql("select id, value from local.db.t_drop_add order by id") + checkAnswer(df, Seq(Row(1, null), Row(2, "new"))) + assert(df.queryExecution.executedPlan.toString().contains("NativeIcebergTableScan")) + } + } + + test("iceberg ORC scan falls back after a top-level column rename") { + withTable("local.db.t_orc_rename") { + sql(""" + |create table local.db.t_orc_rename (id int, old_name string) + |using iceberg + |tblproperties ('write.format.default' = 'orc') + |""".stripMargin) + sql("insert into local.db.t_orc_rename values (1, 'before')") + sql("alter table local.db.t_orc_rename rename column old_name to new_name") + + val df = sql("select id, new_name from local.db.t_orc_rename") + checkAnswer(df, Seq(Row(1, "before"))) + assert(!df.queryExecution.executedPlan.toString().contains("NativeIcebergTableScan")) + } + } + + test("iceberg ORC scan remains native for additive schema evolution") { + withTable("local.db.t_orc_add") { + sql(""" + |create table local.db.t_orc_add (id int) + |using iceberg + |tblproperties ('write.format.default' = 'orc') + |""".stripMargin) + sql("insert into local.db.t_orc_add values (1)") + sql("alter table local.db.t_orc_add add column value string") + + val df = sql("select id, value from local.db.t_orc_add") + checkAnswer(df, Seq(Row(1, null))) + assert(df.queryExecution.executedPlan.toString().contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back after a nested column rename") { + withTable("local.db.t_nested_rename") { + sql(""" + |create table local.db.t_nested_rename ( + | id int, + | payload struct + |) using iceberg + |""".stripMargin) + sql("insert into local.db.t_nested_rename values (1, named_struct('old_name', 'before'))") + sql("alter table local.db.t_nested_rename rename column payload.old_name to new_name") + + val df = sql("select id, payload.new_name from local.db.t_nested_rename") + checkAnswer(df, Seq(Row(1, "before"))) + assert(!df.queryExecution.executedPlan.toString().contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back when top-level and nested columns are both renamed") { + withTable("local.db.t_top_and_nested_rename") { + sql(""" + |create table local.db.t_top_and_nested_rename ( + | old_id int, + | payload struct + |) using iceberg + |""".stripMargin) + sql("""insert into local.db.t_top_and_nested_rename + |values (1, named_struct('old_name', 'before')) + |""".stripMargin) + sql("alter table local.db.t_top_and_nested_rename rename column old_id to new_id") + sql(""" + |alter table local.db.t_top_and_nested_rename + |rename column payload.old_name to new_name + |""".stripMargin) + + val df = + sql("select new_id, payload.new_name from local.db.t_top_and_nested_rename") + checkAnswer(df, Seq(Row(1, "before"))) + assert(!df.queryExecution.executedPlan.toString().contains("NativeIcebergTableScan")) + } + } + test("iceberg native scan is applied when delete files are null (format v1)") { withTable("local.db.t_v1") { sql(""" From 70e6956d3d00acb039ef82a97145cbc5462987f7 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Wed, 10 Jun 2026 11:10:11 +0800 Subject: [PATCH 2/4] simplify --- native-engine/auron-planner/src/lib.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/native-engine/auron-planner/src/lib.rs b/native-engine/auron-planner/src/lib.rs index 47291efbe..c118cd2b0 100644 --- a/native-engine/auron-planner/src/lib.rs +++ b/native-engine/auron-planner/src/lib.rs @@ -16,7 +16,10 @@ use std::{collections::HashMap, sync::Arc}; use arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema, TimeUnit}; -use datafusion::{common::JoinSide, logical_expr::Operator, scalar::ScalarValue}; +use datafusion::{ + common::JoinSide, logical_expr::Operator, parquet::arrow::PARQUET_FIELD_ID_META_KEY, + scalar::ScalarValue, +}; use datafusion_ext_plans::{agg::AggFunction, joins::join_utils::JoinType}; use crate::error::PlanSerDeError; @@ -426,7 +429,7 @@ fn build_arrow_field(field: &protobuf::Field) -> Result { Ok(arrow_field) } else { Ok(arrow_field.with_metadata(HashMap::from([( - "PARQUET:field_id".to_string(), + PARQUET_FIELD_ID_META_KEY.to_string(), field.field_id.to_string(), )]))) } From 44261cb49abeaa8f0df344a3a3bd1a4a7463015b Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Wed, 10 Jun 2026 11:11:06 +0800 Subject: [PATCH 3/4] simplify --- native-engine/datafusion-ext-plans/src/scan/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/scan/mod.rs b/native-engine/datafusion-ext-plans/src/scan/mod.rs index f3d0790af..c3e78ed83 100644 --- a/native-engine/datafusion-ext-plans/src/scan/mod.rs +++ b/native-engine/datafusion-ext-plans/src/scan/mod.rs @@ -25,6 +25,7 @@ use datafusion::{ datasource::schema_adapter::{ SchemaAdapter, SchemaAdapterFactory, SchemaMapper, SchemaMapping, }, + parquet::arrow::PARQUET_FIELD_ID_META_KEY, }; use datafusion_ext_commons::df_execution_err; @@ -88,8 +89,6 @@ impl SchemaAdapter for AuronSchemaAdapter { } } -const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id"; - fn fields_match(table_field: &Field, file_field: &Field) -> bool { match table_field.metadata().get(PARQUET_FIELD_ID_META_KEY) { Some(table_field_id) => file_field From b2a6740ec14df2c10db5ec1608dab29a46adf50a Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Wed, 10 Jun 2026 16:41:39 +0800 Subject: [PATCH 4/4] lint --- .../sql/auron/iceberg/IcebergScanSupport.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 2e6dec5f7..9daffa4a7 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -76,16 +76,21 @@ object IcebergScanSupport extends Logging { partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), "Has unsupported schema type.") - val (renameOrDrop, fieldIdsByName) = + val inspected: Option[(AuronIcebergSourceUtil.RenameOrDrop, Map[String, Int])] = try { - ( - AuronIcebergSourceUtil.detectRenameOrDrop(scan.asInstanceOf[AnyRef]), - AuronIcebergSourceUtil.expectedFieldIds(scan.asInstanceOf[AnyRef])) + Some( + ( + AuronIcebergSourceUtil.detectRenameOrDrop(scan.asInstanceOf[AnyRef]), + AuronIcebergSourceUtil.expectedFieldIds(scan.asInstanceOf[AnyRef]))) } catch { case NonFatal(t) => logWarning(s"Failed to inspect Iceberg field ids for $scanClassName.", t) - return None + None } + if (inspected.isEmpty) { + return None + } + val (renameOrDrop, fieldIdsByName) = inspected.get assert(!renameOrDrop.nested, "Nested Iceberg rename or drop is not supported.") assert(