diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/HiveTransaction.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/HiveTransaction.java
new file mode 100644
index 000000000000..6fb51abf0069
--- /dev/null
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/HiveTransaction.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.hive.StagingTableOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transaction implementation that stages metadata changes for atomic batch HMS updates across
+ * multiple tables.
+ *
+ *
Extends BaseTransaction to reuse its update tracking and commit machinery while
+ * capturing metadata locations instead of publishing directly to HMS.
+ */
+public class HiveTransaction extends BaseTransaction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveTransaction.class);
+
+ private static final MethodHandle CLEANUP_HANDLE = initCleanupHandle();
+
+ private final HiveTableOperations hiveOps;
+ private final StagingTableOperations stagingOps;
+
+ public HiveTransaction(Table table, HiveTableOperations ops) {
+ this(table, ops, ops.toStagingOps());
+ }
+
+ private HiveTransaction(Table table, HiveTableOperations ops, StagingTableOperations stagingOps) {
+ super(table.name(), stagingOps, TransactionType.SIMPLE, ops.current());
+ this.hiveOps = ops;
+ this.stagingOps = stagingOps;
+ }
+
+ @Override
+ public HiveTableOperations ops() {
+ return hiveOps;
+ }
+
+ public StagingTableOperations stagingOps() {
+ return stagingOps;
+ }
+
+ /**
+ * Cleans up all artifacts produced by the staged commit: new manifests, manifest lists,
+ * the metadata JSON file, and uncommitted files.
+ *
+ *
Called by the coordinator when the batch HMS update fails after staging succeeded.
+ */
+ public void cleanUpOnCommitFailure() {
+ try {
+ CLEANUP_HANDLE.invoke(this);
+ } catch (Throwable t) {
+ throw new IllegalStateException("Failed to invoke cleanUpOnCommitFailure", t);
+ }
+ // delete the staged metadata JSON file
+ deleteMetadataFile();
+ }
+
+ /**
+ * Deletes the staged metadata JSON file written by StagingTableOperations.doCommit().
+ */
+ private void deleteMetadataFile() {
+ String metadataLocation = stagingOps.metadataLocation();
+ if (metadataLocation != null) {
+ try {
+ stagingOps.io().deleteFile(metadataLocation);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to clean metadata file {}", metadataLocation, e);
+ }
+ }
+ }
+
+ private static MethodHandle initCleanupHandle() {
+ try {
+ MethodHandles.Lookup lookup =
+ MethodHandles.privateLookupIn(BaseTransaction.class, MethodHandles.lookup());
+ return lookup.findSpecial(
+ BaseTransaction.class,
+ "cleanUpOnCommitFailure",
+ MethodType.methodType(void.class),
+ BaseTransaction.class
+ );
+ } catch (Exception e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+}
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index a0f0e779d9b7..e2cb3d7a72ec 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -93,6 +93,14 @@ protected HiveTableOperations(
conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
}
+ /**
+ * Create a staging operations instance that skips HMS updates and locking.
+ * Used by HiveTransaction to defer HMS updates to coordinator for atomic batch commits.
+ */
+ public StagingTableOperations toStagingOps() {
+ return new StagingTableOperations(conf, metaClients, fileIO, catalogName, database, tableName);
+ }
+
@Override
protected String tableName() {
return fullName;
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTxnCoordinator.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTxnCoordinator.java
new file mode 100644
index 000000000000..962b8f5579ab
--- /dev/null
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTxnCoordinator.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.TxnCoordinator;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableParamsUpdate;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.HiveTransaction;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.exceptions.CleanableFailure;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+/**
+ * Transaction coordinator that aggregates Iceberg table commits and performs atomic HMS updates
+ * across multiple tables using {@code updateTableParams}.
+ */
+public class HiveTxnCoordinator implements TxnCoordinator {
+
+ private final Configuration conf;
+ private final IMetaStoreClient msClient;
+
+ private final Map stagedUpdates = Maps.newConcurrentMap();
+
+ public HiveTxnCoordinator(Configuration conf, IMetaStoreClient msClient) {
+ this.conf = conf;
+ this.msClient = msClient;
+ }
+
+ public Transaction getOrCreateTransaction(org.apache.iceberg.Table table) {
+ Transaction txn = getTransaction(table);
+ if (txn != null) {
+ return txn;
+ }
+ if (!(table instanceof BaseTable baseTable &&
+ baseTable.operations() instanceof HiveTableOperations ops)) {
+ return table.newTransaction();
+ }
+ return stagedUpdates.computeIfAbsent(
+ table.name(), ignored -> new HiveTransaction(table, ops));
+ }
+
+ public Transaction getTransaction(org.apache.iceberg.Table table) {
+ return stagedUpdates.get(table.name());
+ }
+
+ @Override
+ public synchronized void commit() throws TException {
+ if (stagedUpdates.isEmpty()) {
+ return;
+ }
+
+ List> updates = Lists.newArrayList(stagedUpdates.entrySet());
+ updates.sort(Map.Entry.comparingByKey());
+
+ TableMetadata base = updates.getFirst().getValue()
+ .startMetadata();
+
+ try {
+ Tasks.foreach(1)
+ .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(i -> {
+ try {
+ doCommit(updates);
+ } catch (RuntimeException e) {
+ cleanUpOnCommitFailure(e);
+ throw e;
+ }
+ });
+
+ } catch (ValidationException e) {
+ throw MetaStoreUtils.newMetaException(e);
+
+ } finally {
+ clearState();
+ }
+ }
+
+ @Override
+ public synchronized void rollback() {
+ clearState();
+ }
+
+ @Override
+ public boolean hasPendingWork() {
+ return !stagedUpdates.isEmpty();
+ }
+
+ private void doCommit(List> updates) {
+ List payload = Lists.newArrayList();
+ List locks = Lists.newArrayList();
+
+ try {
+ for (Map.Entry entry : updates) {
+ HiveTransaction hiveTxn = entry.getValue();
+
+ // 1. WRITE: stage transaction (writes metadata file only)
+ hiveTxn.commitTransaction();
+
+ TableMetadata base = hiveTxn.startMetadata();
+ HiveTableOperations ops = hiveTxn.ops();
+
+ // 2. LOCK
+ HiveLock lock = ops.lockObject(base);
+ lock.lock();
+ locks.add(lock);
+
+ // 3. VERIFY under lock: refresh from HMS via original ops
+ TableMetadata current = ops.refresh();
+
+ if (!Objects.equals(base.metadataFileLocation(), current.metadataFileLocation())) {
+ throw new CommitFailedException(
+ "Base metadata location '%s' is not same as the current table metadata location '%s' for %s",
+ base.metadataFileLocation(), current.metadataFileLocation(), entry.getKey());
+ }
+
+ // 4. Build payload using staged metadata
+ TableParamsUpdate paramsUpdate = buildTableParamsUpdate(
+ base, hiveTxn.currentMetadata(), ops, hiveTxn.stagingOps().metadataLocation());
+ payload.add(paramsUpdate);
+ }
+
+ // 5. PERSIST: batch commit (CAS guaranteed to pass — verified under lock)
+ locks.forEach(HiveLock::ensureActive);
+ msClient.updateTableParams(payload);
+
+ // 6. Verify locks still active after persist — if lost, commit state is unknown
+ try {
+ locks.forEach(HiveLock::ensureActive);
+ } catch (LockException le) {
+ throw new CommitStateUnknownException(
+ "Failed to heartbeat for hive lock while committing changes. " +
+ "This can lead to a concurrent commit attempt be able to overwrite this commit. " +
+ "Please check the commit history. If you are running into this issue, try reducing " +
+ "iceberg.hive.lock-heartbeat-interval-ms.",
+ le);
+ }
+
+ } catch (LockException e) {
+ // Lock acquisition or pre-persist ensureActive failed — safe to retry
+ throw new CommitFailedException(e);
+
+ } catch (TException e) {
+ if (isCasFailure(e)) {
+ throw new CommitFailedException(e,
+ "The table %s.%s has been modified concurrently",
+ payload.getLast().getDb_name(), payload.getLast().getTable_name());
+ }
+ // Non-CAS TException from updateTableParams — we can't tell if the batch update was applied
+ throw new CommitStateUnknownException(e);
+
+ } finally {
+ releaseLocks(locks);
+ }
+ }
+
+ private TableParamsUpdate buildTableParamsUpdate(
+ TableMetadata base, TableMetadata newMetadata, HiveTableOperations ops, String newMetadataLocation) {
+ Set removedProps =
+ base.properties().keySet().stream()
+ .filter(k -> !newMetadata.properties().containsKey(k))
+ .collect(Collectors.toSet());
+
+ Table tbl = new Table();
+ tbl.setParameters(Maps.newHashMap());
+
+ long maxPropSize = conf.getLong(
+ HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
+ HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
+
+ HMSTablePropertyHelper.updateHmsTableForIcebergTable(
+ newMetadataLocation,
+ tbl,
+ newMetadata,
+ removedProps,
+ true,
+ maxPropSize,
+ base.metadataFileLocation());
+
+ populateStatsState(newMetadata, tbl);
+
+ TableParamsUpdate newParams = new TableParamsUpdate();
+ newParams.setDb_name(ops.database());
+ newParams.setTable_name(ops.table());
+ newParams.setParams(tbl.getParameters());
+
+ newParams.setExpected_param_key(HiveTableOperations.METADATA_LOCATION_PROP);
+ if (base.metadataFileLocation() != null) {
+ newParams.setExpected_param_value(base.metadataFileLocation());
+ }
+ return newParams;
+ }
+
+ private static void populateStatsState(TableMetadata metadata, Table tbl) {
+ if (metadata.spec().isUnpartitioned() || !metadata.partitionStatisticsFiles().isEmpty()) {
+ StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.TRUE);
+ }
+ List colNames = getStatsColumnNames(metadata);
+ if (!colNames.isEmpty()) {
+ StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
+ }
+ }
+
+ private static List getStatsColumnNames(TableMetadata metadata) {
+ // Find the first statistics file that contains ColumnStatisticsObj blobs.
+ return metadata.statisticsFiles().stream()
+ .filter(sf -> sf.blobMetadata().stream()
+ .anyMatch(blob -> ColumnStatisticsObj.class.getSimpleName().equals(blob.type())))
+ .findFirst()
+ .map(sf -> {
+ // Unpartitioned: each blob = one column, need all blobs' fields.
+ // Partitioned: first blob has ALL column field IDs.
+ if (metadata.spec().isUnpartitioned()) {
+ return sf.blobMetadata().stream()
+ .flatMap(blob -> blob.fields().stream())
+ .map(fieldId -> metadata.schema().findColumnName(fieldId))
+ .toList();
+ }
+ return sf.blobMetadata().getFirst().fields().stream()
+ .map(fieldId -> metadata.schema().findColumnName(fieldId))
+ .toList();
+ })
+ .orElse(List.of());
+ }
+
+ private static boolean isCasFailure(TException ex) {
+ return ex.getMessage() != null &&
+ ex.getMessage().contains("The table has been modified. The parameter value for key '" +
+ HiveTableOperations.METADATA_LOCATION_PROP +
+ "' is");
+ }
+
+ private void releaseLocks(List locks) {
+ locks.forEach(HiveLock::unlock);
+ }
+
+ private void cleanUpOnCommitFailure(Exception ex) {
+ if (ex instanceof CommitStateUnknownException) {
+ return;
+ }
+ stagedUpdates.values().forEach(txn -> {
+ if (!txn.ops().requireStrictCleanup() || ex instanceof CleanableFailure) {
+ txn.cleanUpOnCommitFailure();
+ }
+ });
+ }
+
+ private void clearState() {
+ stagedUpdates.clear();
+ }
+
+}
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/StagingTableOperations.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/StagingTableOperations.java
new file mode 100644
index 000000000000..c61bde44d654
--- /dev/null
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/StagingTableOperations.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.io.FileIO;
+import org.apache.thrift.TException;
+
+/**
+ * TableOperations that only writes metadata files during commit.
+ * Skips HMS reads, locking, and persistence — all handled by the coordinator.
+ */
+public class StagingTableOperations extends HiveTableOperations {
+
+ private String newMetadataLocation;
+
+ public StagingTableOperations(
+ Configuration conf,
+ ClientPool metaClients,
+ FileIO fileIO,
+ String catalogName,
+ String database,
+ String table) {
+ super(conf, metaClients, fileIO, catalogName, database, table);
+ }
+
+ @Override
+ protected void doCommit(TableMetadata base, TableMetadata metadata) {
+ newMetadataLocation = writeNewMetadataIfRequired(base == null, metadata);
+ }
+
+ public String metadataLocation() {
+ return newMetadataLocation;
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 865116414459..8944e8619dba 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -600,7 +600,7 @@ private Expression generateExprFromPartitionNames(List partNames) throws
for (String partName : partNames) {
try {
Map partitionSpec = Warehouse.makeSpecFromName(partName);
- Expression partitionExpr = IcebergTableUtil.generateExprForIdentityPartition(
+ Expression partitionExpr = IcebergTableUtil.buildPartitionExpr(
icebergTable, partitionSpec, partitionFields);
predicate = Expressions.or(predicate, partitionExpr);
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 348fdea28ba6..d7929c3850ca 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -59,6 +59,7 @@
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.BaseTransaction;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -70,6 +71,7 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -106,6 +108,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
private static final String CONFLICT_DETECTION_FILTER = "Conflict detection Filter Expression: {}";
private ExecutorService workerPool;
+ private JobConf jobConf;
@Override
public void setupJob(JobContext jobContext) {
@@ -134,8 +137,8 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
TaskAttemptID attemptID = context.getTaskAttemptID();
- JobConf jobConf = context.getJobConf();
- Set mergedPaths = getCombinedLocations(jobConf);
+ JobConf conf = context.getJobConf();
+ Set mergedPaths = getCombinedLocations(conf);
Set outputs = outputTables(context.getJobConf());
Map> writers = Optional.ofNullable(WriterRegistry.writers(attemptID))
@@ -144,7 +147,7 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
return ImmutableMap.of();
});
- ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+ ExecutorService tableExecutor = tableExecutor(conf, outputs.size());
try {
// Generates commit files for the target tables in parallel
Tasks.foreach(outputs)
@@ -155,7 +158,7 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
.run(output -> {
Table table = HiveTableUtil.deserializeTable(context.getJobConf(), output);
if (table != null) {
- String fileForCommitLocation = HiveTableUtil.fileForCommitLocation(table.location(), jobConf,
+ String fileForCommitLocation = HiveTableUtil.fileForCommitLocation(table.location(), conf,
attemptID.getJobID(), attemptID.getTaskID().getId());
if (writers.get(output) != null) {
List dataFiles = Lists.newArrayList();
@@ -265,7 +268,7 @@ public void commitJobs(List originalContextList, Operation operation
.map(TezUtil::enrichContextWithVertexId)
.collect(Collectors.toList());
Multimap outputs = collectOutputs(jobContextList);
- JobConf jobConf = jobContextList.getFirst().getJobConf();
+ jobConf = jobContextList.getFirst().getJobConf();
long startTime = System.currentTimeMillis();
String ids = jobContextList.stream()
@@ -336,7 +339,7 @@ public void abortJobs(List originalContextList) throws IOException {
.map(TezUtil::enrichContextWithVertexId)
.collect(Collectors.toList());
Multimap outputs = collectOutputs(jobContextList);
- JobConf jobConf = jobContextList.getFirst().getJobConf();
+ jobConf = jobContextList.getFirst().getJobConf();
String ids = jobContextList.stream()
.map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(","));
@@ -531,8 +534,10 @@ private Long getSnapshotId(Table table, String branchName) {
private void commitWrite(Table table, String branchName, Long snapshotId, long startTime,
FilesForCommit results, Operation operation, Expression filterExpr) {
+ Transaction txn = IcebergAcidUtil.getOrCreateTransaction(table, jobConf);
+
if (!results.replacedDataFiles().isEmpty()) {
- OverwriteFiles write = table.newOverwrite();
+ OverwriteFiles write = txn.newOverwrite();
results.replacedDataFiles().forEach(write::deleteFile);
results.dataFiles().forEach(write::addFile);
@@ -548,20 +553,20 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
}
write.validateNoConflictingData();
write.validateNoConflictingDeletes();
- commit(write);
+ commit(txn, write);
return;
}
if (results.deleteFiles().isEmpty() && Operation.MERGE != operation) {
- AppendFiles write = table.newAppend();
+ AppendFiles write = txn.newAppend();
results.dataFiles().forEach(write::appendFile);
if (StringUtils.isNotEmpty(branchName)) {
write.toBranch(HiveUtils.getTableSnapshotRef(branchName));
}
- commit(write);
+ commit(txn, write);
} else {
- RowDelta write = table.newRowDelta();
+ RowDelta write = txn.newRowDelta();
results.dataFiles().forEach(write::addRows);
results.deleteFiles().forEach(write::addDeletes);
results.rewrittenDeleteFiles().forEach(write::removeDeletes);
@@ -582,7 +587,7 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
}
write.validateDataFilesExist(results.referencedDataFiles());
write.validateNoConflictingDataFiles();
- commit(write);
+ commit(txn, write);
}
LOG.info("Write commit took {} ms for table: {} with {} data and {} delete file(s)",
@@ -594,12 +599,16 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
* Calls the commit on the prepared SnapshotUpdate and supplies the ExecutorService if any.
* @param update the SnapshotUpdate of any kind (e.g. AppendFiles, DeleteFiles, etc.)
*/
- private void commit(SnapshotUpdate> update) {
+ private void commit(Transaction txn, SnapshotUpdate> update) {
if (workerPool != null) {
update.scanManifestsWith(workerPool);
}
update.commit();
+ if (txn.getClass() == BaseTransaction.class) {
+ txn.commitTransaction();
+ }
}
+
/**
* Creates and commits an Iceberg compaction change with the provided data files.
* Either full table or a selected partition contents is replaced with compacted files.
@@ -617,7 +626,9 @@ private void commitCompaction(Table table, Long snapshotId, long startTime, File
List existingDeleteFiles = fileSizeThreshold == -1 ?
IcebergCompactionUtil.getDeleteFiles(table, snapshotId, partitionPath) : Collections.emptyList();
- RewriteFiles rewriteFiles = table.newRewrite();
+ Transaction txn = IcebergAcidUtil.getOrCreateTransaction(table, jobConf);
+
+ RewriteFiles rewriteFiles = txn.newRewrite();
existingDataFiles.forEach(rewriteFiles::deleteFile);
existingDeleteFiles.forEach(rewriteFiles::deleteFile);
results.dataFiles().forEach(rewriteFiles::addFile);
@@ -625,7 +636,7 @@ private void commitCompaction(Table table, Long snapshotId, long startTime, File
if (snapshotId != null) {
rewriteFiles.validateFromSnapshot(snapshotId);
}
- rewriteFiles.commit();
+ commit(txn, rewriteFiles);
LOG.info("Compaction commit took {} ms for table: {} partition: {} with {} file(s)",
System.currentTimeMillis() - startTime, table, StringUtils.defaultString(partitionPath, "N/A"),
results.dataFiles().size());
@@ -646,8 +657,11 @@ private void commitCompaction(Table table, Long snapshotId, long startTime, File
private void commitOverwrite(Table table, String branchName, Long snapshotId, long startTime,
FilesForCommit results) {
Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite");
+
+ Transaction txn = IcebergAcidUtil.getOrCreateTransaction(table, jobConf);
+
if (!results.dataFiles().isEmpty()) {
- ReplacePartitions overwrite = table.newReplacePartitions();
+ ReplacePartitions overwrite = txn.newReplacePartitions();
results.dataFiles().forEach(overwrite::addFile);
if (StringUtils.isNotEmpty(branchName)) {
@@ -658,17 +672,17 @@ private void commitOverwrite(Table table, String branchName, Long snapshotId, lo
}
overwrite.validateNoConflictingDeletes();
overwrite.validateNoConflictingData();
- commit(overwrite);
+ commit(txn, overwrite);
LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime,
table, results.dataFiles().size());
} else if (table.spec().isUnpartitioned()) {
- DeleteFiles deleteFiles = table.newDelete();
+ DeleteFiles deleteFiles = txn.newDelete();
deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
if (StringUtils.isNotEmpty(branchName)) {
deleteFiles.toBranch(HiveUtils.getTableSnapshotRef(branchName));
}
- commit(deleteFiles);
+ commit(txn, deleteFiles);
LOG.info("Cleared table contents as part of empty overwrite for unpartitioned table. " +
"Commit took {} ms for table: {}", System.currentTimeMillis() - startTime, table);
}
@@ -683,7 +697,7 @@ private void commitOverwrite(Table table, String branchName, Long snapshotId, lo
* @throws IOException if there is a failure deleting the files
*/
private void cleanup(JobContext jobContext, Collection jobLocations) throws IOException {
- JobConf jobConf = jobContext.getJobConf();
+ JobConf conf = jobContext.getJobConf();
LOG.info("Cleaning for job {} started", jobContext.getJobID());
@@ -695,7 +709,7 @@ private void cleanup(JobContext jobContext, Collection jobLocations) thr
.run(jobLocation -> {
LOG.info("Cleaning location: {}", jobLocation);
Path toDelete = new Path(jobLocation);
- FileSystem fs = Util.getFs(toDelete, jobConf);
+ FileSystem fs = Util.getFs(toDelete, conf);
fs.delete(toDelete, true);
}, IOException.class);
@@ -893,7 +907,7 @@ private void cleanMergeTaskInputFiles(List jobContexts, ExecutorServ
// At this stage the file is written and task-committed, but the old files are still present.
Stream mergedPaths = jobContexts.stream()
.map(JobContext::getJobConf)
- .filter(jobConf -> jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
+ .filter(conf -> conf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
.map(Utilities::getMapWork).filter(Objects::nonNull)
.map(MapWork::getInputPaths).filter(Objects::nonNull)
.flatMap(List::stream);
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 7bf2aef49a7c..49e58b875c17 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -75,7 +75,6 @@
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
@@ -196,7 +195,6 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -232,7 +230,6 @@ public class HiveIcebergStorageHandler extends DefaultStorageHandler implements
private static final String TABLE_NAME_SEPARATOR = "..";
public static final String TABLE_DEFAULT_LOCATION = "TABLE_DEFAULT_LOCATION";
- private static final String SPEC_ID = "spec-id";
private static final String PARTITION = "partition";
public static final String STATS = "/stats/snap-";
@@ -346,7 +343,7 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
HiveCustomStorageHandlerUtils.WRITE_OPERATION_CONFIG_PREFIX + tableName)));
}
boolean isMergeTaskEnabled = Boolean.parseBoolean(tableDesc.getProperty(
- HiveCustomStorageHandlerUtils.MERGE_TASK_ENABLED + tableName));
+ HiveCustomStorageHandlerUtils.MERGE_TASK_ENABLED + tableName));
if (isMergeTaskEnabled) {
HiveCustomStorageHandlerUtils.setMergeTaskEnabled(jobConf, tableName, true);
}
@@ -574,24 +571,17 @@ private static Map getPartishSummary(Partish partish, Table tabl
recordSchema, table.io().newInputFile(statsFile.path()))) {
PartitionStats partitionStats = Iterables.tryFind(recordIterator, stats -> {
PartitionSpec spec = table.specs().get(stats.specId());
- PartitionData data = IcebergTableUtil.toPartitionData(stats.partition(), partitionType,
+ PartitionData data = IcebergTableUtil.toPartitionData(stats.partition(), partitionType,
spec.partitionType());
return spec.partitionToPath(data).equals(partish.getPartition().getName());
}).orNull();
if (partitionStats != null) {
- Map stats = ImmutableMap.of(
- TOTAL_DATA_FILES_PROP, String.valueOf(partitionStats.dataFileCount()),
- TOTAL_RECORDS_PROP, String.valueOf(partitionStats.dataRecordCount()),
- TOTAL_EQ_DELETES_PROP, String.valueOf(partitionStats.equalityDeleteRecordCount()),
- TOTAL_POS_DELETES_PROP, String.valueOf(partitionStats.positionDeleteRecordCount()),
- TOTAL_FILE_SIZE_PROP, String.valueOf(partitionStats.totalDataFileSizeInBytes())
- );
- return stats;
+ return IcebergTableUtil.toStatsMap(partitionStats);
} else {
- LOG.warn("Partition {} not found in stats file: {}",
+ LOG.warn("Partition {} not found in stats file: {}, falling back to metadata scan",
partish.getPartition().getName(), statsFile.path());
- return null;
+ return IcebergTableUtil.getPartitionStats(table, partish.getPartition().getSpec(), snapshot);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -606,7 +596,7 @@ private static Map getPartishSummary(Partish partish, Table tabl
private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
boolean skipCache = SessionStateUtil.getQueryState(conf)
- .map(queryState -> queryState.getNumModifiedRows() > 0)
+ .map(qs -> !qs.isTxnOpen() && qs.getNumModifiedRows() > 0)
.orElse(true);
return IcebergTableUtil.getTable(conf, hmsTable.getTTable(), skipCache);
}
@@ -622,6 +612,7 @@ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTabl
return writeColStats(colStats, tbl);
}
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
private boolean writeColStats(List colStats, Table tbl) {
try {
if (!shouldRewriteColStats(tbl)) {
@@ -630,41 +621,66 @@ private boolean writeColStats(List colStats, Table tbl) {
StatisticsFile statisticsFile;
String statsPath = tbl.location() + STATS + UUID.randomUUID();
- try (PuffinWriter puffinWriter = Puffin.write(tbl.io().newOutputFile(statsPath))
- .createdBy(Constants.HIVE_ENGINE).build()) {
+ try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(statsPath))
+ .createdBy(Constants.HIVE_ENGINE)
+ .build()) {
+
long snapshotId = tbl.currentSnapshot().snapshotId();
long snapshotSequenceNumber = tbl.currentSnapshot().sequenceNumber();
+ Schema schema = tbl.spec().schema();
- colStats.forEach(stats -> {
+ boolean first = true;
+
+ for (ColumnStatistics stats : colStats) {
boolean isTblLevel = stats.getStatsDesc().isIsTblLevel();
- for (Serializable statsObj : isTblLevel ? stats.getStatsObj() : Collections.singletonList(stats)) {
- byte[] serializeColStats = SerializationUtils.serialize(statsObj);
- puffinWriter.add(
- new Blob(
+ Map properties = isTblLevel ? Map.of() :
+ Map.of(PARTITION, String.valueOf(stats.getStatsDesc().getPartName()));
+
+ List extends Serializable> statsObjects = isTblLevel ?
+ stats.getStatsObj() : List.of(stats);
+
+ List fieldIds = null;
+
+ if (!isTblLevel) {
+ // For partition-level stats, we emit one blob per partition;
+ // therefore, only the first blob should contain the actual fieldIds.
+ fieldIds = !first ? List.of(-1) :
+ stats.getStatsObj().stream()
+ .map(obj -> schema.findField(obj.getColName()).fieldId())
+ .toList();
+ first = false;
+ }
+
+ for (Serializable statsObj : statsObjects) {
+ byte[] serialized = SerializationUtils.serialize(statsObj);
+
+ if (isTblLevel) {
+ fieldIds = List.of(schema.findField(
+ ((ColumnStatisticsObj) statsObj).getColName()).fieldId());
+ }
+
+ writer.add(new Blob(
ColumnStatisticsObj.class.getSimpleName(),
- ImmutableList.of(isTblLevel ? tbl.spec().schema().findField(
- ((ColumnStatisticsObj) statsObj).getColName()).fieldId() : 1),
+ fieldIds,
snapshotId,
snapshotSequenceNumber,
- ByteBuffer.wrap(serializeColStats),
+ ByteBuffer.wrap(serialized),
PuffinCompressionCodec.NONE,
- isTblLevel ?
- ImmutableMap.of(SPEC_ID, String.valueOf(tbl.spec().specId())) :
- ImmutableMap.of(PARTITION, String.valueOf(stats.getStatsDesc().getPartName()))
- ));
+ properties
+ ));
}
- });
+ }
- puffinWriter.finish();
+ writer.finish();
statisticsFile =
new GenericStatisticsFile(
snapshotId,
statsPath,
- puffinWriter.fileSize(),
- puffinWriter.footerSize(),
- puffinWriter.writtenBlobsMetadata().stream()
+ writer.fileSize(),
+ writer.footerSize(),
+ writer.writtenBlobsMetadata().stream()
.map(GenericBlobMetadata::from)
.collect(ImmutableList.toImmutableList())
);
@@ -712,13 +728,18 @@ public List getColStatistics(org.apache.hadoop.hive.ql.meta
if (snapshot == null) {
return Lists.newArrayList();
}
+ // If reading a non-current snapshot whose schema has undergone partition evolution, return an empty list.
+ // This should be revisited once isPartitioned() is refactored to provide full support for partitioned tables.
+ if (!snapshot.equals(table.currentSnapshot()) &&
+ IcebergTableUtil.hasUndergonePartitionEvolution(snapshot, table.io())) {
+ return Lists.newArrayList();
+ }
Predicate filter;
if (colNames != null) {
Set columns = Sets.newHashSet(colNames);
filter = metadata -> {
- int specId = Integer.parseInt(metadata.properties().get(SPEC_ID));
- String column = table.specs().get(specId).schema().findColumnName(metadata.inputFields().getFirst());
+ String column = table.schema().findColumnName(metadata.inputFields().getFirst());
return columns.contains(column);
};
} else {
@@ -790,9 +811,10 @@ private String getStatsSource() {
}
private boolean shouldRewriteColStats(Table tbl) {
- return SessionStateUtil.getQueryState(conf).map(QueryState::getHiveOperation)
- .filter(opType -> HiveOperation.ANALYZE_TABLE == opType).isPresent() ||
- IcebergTableUtil.getColStatsPath(tbl) != null;
+ return SessionStateUtil.getQueryState(conf)
+ .map(qs -> HiveOperation.ANALYZE_TABLE == qs.getHiveOperation())
+ .orElse(false) ||
+ IcebergTableUtil.getColStatsPath(tbl) != null;
}
private void checkAndMergeColStats(List statsNew, Table tbl) throws InvalidObjectException {
@@ -850,8 +872,7 @@ public LockType getLockType(WriteEntity writeEntity) {
}
return switch (writeEntity.getWriteType()) {
case INSERT_OVERWRITE -> LockType.EXCL_WRITE;
- case UPDATE, DELETE -> sharedWrite ?
- LockType.SHARED_WRITE : LockType.EXCL_WRITE;
+ case UPDATE, DELETE -> sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE;
default -> LockType.SHARED_WRITE;
};
}
@@ -898,7 +919,7 @@ private List getSortTransformSpec(Table table) {
@Override
public DynamicPartitionCtx createDPContext(
- HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation)
+ HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation)
throws SemanticException {
// delete records are already clustered by partition spec id and the hash of the partition struct
// there is no need to do any additional sorting based on partition columns
@@ -999,7 +1020,7 @@ private void addZOrderCustomExpr(Map props, DynamicPartitionCtx
}
}
- private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
+ private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Operation writeOperation, DynamicPartitionCtx dpCtx,
List transformSpecs) {
List fields = table.schema().columns();
@@ -1752,8 +1773,8 @@ private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps)
hasOrcTimeInSchema(tableProps, tableSchema) ||
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
// disable vectorization
- SessionStateUtil.getQueryState(conf).ifPresent(queryState ->
- queryState.getConf().setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false));
+ SessionStateUtil.getQueryState(conf).ifPresent(qs ->
+ qs.getConf().setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false));
}
}
@@ -2038,10 +2059,9 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
return false;
}
- Expression partitionExpr = IcebergTableUtil.generateExprForIdentityPartition(
- table, partitionSpec, true);
-
- FindFiles.Builder builder = new FindFiles.Builder(table).withRecordsMatching(partitionExpr);
+ Expression partitionExpr = IcebergTableUtil.buildPartitionExpr(table, partitionSpec);
+ FindFiles.Builder builder = new FindFiles.Builder(table)
+ .withRecordsMatching(partitionExpr);
Set dataFiles = Sets.newHashSet(builder.collect());
boolean result = true;
@@ -2072,7 +2092,8 @@ public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable)
Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
boolean readsNonCurrentSnapshot = snapshot != null && !snapshot.equals(table.currentSnapshot());
- if (readsNonCurrentSnapshot && IcebergTableUtil.hasUndergonePartitionEvolution(table)) {
+ if (readsNonCurrentSnapshot &&
+ IcebergTableUtil.hasUndergonePartitionEvolution(snapshot, table.io())) {
return false;
}
return table.spec().isPartitioned();
@@ -2085,9 +2106,8 @@ public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
validatePartSpec(table, partitionSpec, policy);
boolean isDescTable = SessionStateUtil.getQueryState(conf)
- .map(QueryState::getHiveOperation)
- .filter(op -> op == HiveOperation.DESCTABLE)
- .isPresent();
+ .map(qs -> HiveOperation.DESCTABLE == qs.getHiveOperation())
+ .orElse(false);
if (!isDescTable) {
return createDummyPartitionHandle(table, partitionSpec);
@@ -2224,31 +2244,38 @@ public List getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
if (exp == null) {
return ImmutableList.of();
}
+
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- int tableSpecId = table.spec().specId();
- Set partitions = Sets.newHashSet();
+ Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
+
+ boolean readsCurrentSnapshot = snapshot != null && snapshot.equals(table.currentSnapshot());
+ boolean filterBySpecId = latestSpecOnly != null && readsCurrentSnapshot;
+
+ Predicate specFilter = specId ->
+ !filterBySpecId || latestSpecOnly == (specId == table.spec().specId());
TableScan scan = table.newScan().filter(exp)
.caseSensitive(false).includeColumnStats().ignoreResiduals();
+ if (snapshot != null) {
+ scan = scan.useSnapshot(snapshot.snapshotId());
+ }
+
+ Set partitions = Sets.newHashSet();
try (CloseableIterable tasks = scan.planFiles()) {
FluentIterable.from(tasks)
.filter(task -> task.spec().isPartitioned())
+ .filter(task -> specFilter.test(task.file().specId()))
.forEach(task -> {
- DataFile file = task.file();
PartitionSpec spec = task.spec();
+ PartitionData partitionData = IcebergTableUtil.toPartitionData(task.partition(), spec.partitionType());
+ String partName = spec.partitionToPath(partitionData);
- if (latestSpecOnly == null || latestSpecOnly && file.specId() == tableSpecId ||
- !latestSpecOnly && file.specId() != tableSpecId) {
- PartitionData partitionData = IcebergTableUtil.toPartitionData(task.partition(), spec.partitionType());
- String partName = spec.partitionToPath(partitionData);
-
- Map partSpecMap = Maps.newLinkedHashMap();
- Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
+ Map partSpecMap = Maps.newLinkedHashMap();
+ Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
- DummyPartition partition = new DummyPartition(hmsTable, partName, partSpecMap);
- partitions.add(partition);
- }
+ DummyPartition partition = new DummyPartition(hmsTable, partName, partSpecMap);
+ partitions.add(partition);
});
} catch (IOException e) {
throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e));
@@ -2260,9 +2287,15 @@ public List getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
public boolean hasDataMatchingFilterExpr(org.apache.hadoop.hive.ql.metadata.Table hmsTable, ExprNodeDesc filter) {
SearchArgument sarg = ConvertAstToSearchArg.create(conf, (ExprNodeGenericFuncDesc) filter);
Expression exp = HiveIcebergFilterFactory.generateFilterExpression(sarg);
+
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
+ Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
+
TableScan scan = table.newScan().filter(exp)
.caseSensitive(false).includeColumnStats().ignoreResiduals();
+ if (snapshot != null) {
+ scan = scan.useSnapshot(snapshot.snapshotId());
+ }
boolean result = false;
try (CloseableIterable tasks = scan.planFiles()) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
index fbdc36be3a19..9de7c0b1111d 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java
@@ -172,8 +172,7 @@ public static void appendFiles(URI fromURI, String format, Table icebergTbl, boo
if (isOverwrite) {
DeleteFiles delete = transaction.newDelete();
if (partitionSpec != null) {
- Expression partitionExpr = IcebergTableUtil.generateExprForIdentityPartition(
- icebergTbl, partitionSpec, true);
+ Expression partitionExpr = IcebergTableUtil.buildPartitionExpr(icebergTbl, partitionSpec);
delete.deleteFromRowFilter(partitionExpr);
} else {
delete.deleteFromRowFilter(Expressions.alwaysTrue());
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
index b2feed784e39..33954d106eb3 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
@@ -23,12 +23,16 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
import org.apache.hadoop.hive.ql.io.RowLineageInfo;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionKey;
@@ -36,9 +40,11 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.hive.HiveTxnCoordinator;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.mr.mapreduce.RowLineageReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -218,6 +224,37 @@ public static void copyFields(GenericRecord source, int start, int len, GenericR
}
}
+ public static Transaction getOrCreateTransaction(Table table, Configuration conf) {
+ HiveTxnManager txnManager = Optional.ofNullable(SessionState.get())
+ .map(SessionState::getTxnMgr).orElse(null);
+ if (txnManager == null) {
+ return table.newTransaction();
+ }
+ boolean isExplicitTxnOpen = txnManager.isTxnOpen() && !txnManager.isImplicitTransactionOpen(null);
+ int outputCount = SessionStateUtil.getOutputTableCount(conf)
+ .orElse(1);
+
+ if (!isExplicitTxnOpen && outputCount < 2) {
+ return table.newTransaction();
+ }
+ HiveTxnCoordinator txnCoordinator = txnManager.getOrSetTxnCoordinator(
+ HiveTxnCoordinator.class, msClient -> new HiveTxnCoordinator(conf, msClient));
+ return txnCoordinator != null ?
+ txnCoordinator.getOrCreateTransaction(table) : table.newTransaction();
+ }
+
+ public static Transaction getTransaction(Table table) {
+ HiveTxnManager txnManager = Optional.ofNullable(SessionState.get())
+ .map(SessionState::getTxnMgr).orElse(null);
+ if (txnManager == null) {
+ return null;
+ }
+ HiveTxnCoordinator txnCoordinator = txnManager.getOrSetTxnCoordinator(
+ HiveTxnCoordinator.class, null);
+ return txnCoordinator != null ?
+ txnCoordinator.getTransaction(table) : null;
+ }
+
public static class VirtualColumnAwareIterator implements CloseableIterator {
private final CloseableIterator currentIterator;
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index ceb848c30952..386473f46fee 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -27,6 +27,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -77,34 +78,40 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.PartitionStats;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.ResidualEvaluator;
-import org.apache.iceberg.expressions.UnboundTerm;
import org.apache.iceberg.hive.IcebergCatalogProperties;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.Pair;
@@ -127,6 +134,9 @@ public class IcebergTableUtil {
public static final int SPEC_IDX = 1;
public static final int PART_IDX = 0;
+ private static final String SPEC_ID_FIELD = "spec_id";
+ private static final String NULL_VALUE = "null";
+
private IcebergTableUtil() {
}
@@ -185,8 +195,13 @@ static Table getTable(Configuration configuration, Properties properties, boolea
if (skipCache) {
return tableLoadFunc.apply(null);
} else {
- return SessionStateUtil.getResource(configuration, tableIdentifier).filter(o -> o instanceof Table)
- .map(o -> (Table) o).orElseGet(() -> {
+ return SessionStateUtil.getResource(configuration, tableIdentifier)
+ .filter(Table.class::isInstance)
+ .map(Table.class::cast)
+ .map(tbl -> Optional.ofNullable(IcebergAcidUtil.getTransaction(tbl))
+ .map(Transaction::table)
+ .orElse(tbl))
+ .orElseGet(() -> {
LOG.debug("Iceberg table {} is not found in QueryState. Loading table from configured catalog",
tableIdentifier);
return tableLoadFunc.apply(null);
@@ -445,120 +460,149 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy
return toPartitionData(projection, targetKeyType);
}
- public static Expression generateExprForIdentityPartition(Table table, Map partitionSpec,
- boolean latestSpecOnly) throws SemanticException {
-
- Map partitionFields = getPartitionFields(table, latestSpecOnly).stream()
- .collect(Collectors.toMap(PartitionField::name, Function.identity()));
+ /**
+ * Builds a filter expression for data table operations (deleteFromRowFilter, FindFiles).
+ * Only supports identity transforms. Keys are partition field names.
+ */
+ static Expression buildPartitionExpr(Table table, Map partitionSpec)
+ throws SemanticException {
- return generateExprForIdentityPartition(table, partitionSpec, partitionFields);
+ Map partitionFields = getPartitionFields(table, true).stream()
+ .collect(Collectors.toMap(
+ PartitionField::name, Function.identity())
+ );
+ return buildPartitionExpr(table, partitionSpec, partitionFields);
}
- public static Expression generateExprForIdentityPartition(Table table, Map partitionSpec,
+ static Expression buildPartitionExpr(Table table, Map partitionSpec,
Map partitionFields) throws SemanticException {
- return buildPartitionExpression(
- partitionSpec,
+ Types.StructType partitionType = Partitioning.partitionType(table);
+
+ return buildExpression(partitionSpec,
(column, value) ->
- buildIdentityPartitionPredicate(table, value, partitionFields.get(column)),
- partitionFields::containsKey
- );
+ buildIdentityFieldPredicate(partitionType, partitionFields.get(column), value),
+ partitionFields::containsKey);
}
- public static Expression generateExprFromPartitionSpec(Table table, Map partitionSpec,
+ /**
+ * Builds a filter expression for the PARTITIONS metadata table from source column names.
+ * Handles partition evolution by OR-ing predicates across different transforms on the same source column.
+ */
+ private static Expression buildPartitionsFilter(Table table, Map partitionSpec,
boolean latestSpecOnly) throws SemanticException {
// Group partition fields by source column name to handle partition evolution
// where the same source column may have multiple transforms across different specs
- Map> partitionFieldsBySourceColumn =
- getPartitionFields(table, latestSpecOnly).stream()
- .collect(Collectors.groupingBy(
- pf -> table.schema().findColumnName(pf.sourceId()))
- );
-
- return buildPartitionExpression(
- partitionSpec,
+ Map> bySourceColumn = getPartitionFields(table, latestSpecOnly).stream()
+ .collect(Collectors.groupingBy(pf ->
+ table.schema().findColumnName(pf.sourceId()))
+ );
+
+ return buildExpression(partitionSpec,
(column, value) ->
- buildTransformPartitionPredicate(table, value, partitionFieldsBySourceColumn.get(column)),
- partitionFieldsBySourceColumn::containsKey
- );
+ buildSourceValuePredicate(table, value, bySourceColumn.get(column)),
+ bySourceColumn::containsKey);
+ }
+
+ /**
+ * Builds a filter expression for the PARTITIONS metadata table from partition field names
+ * with already-transformed values (e.g. {key_bucket_8: 5}).
+ */
+ private static Expression buildPartitionsFilterFromPath(Table table, Map partitionSpec)
+ throws SemanticException {
+ Types.StructType partitionType = Partitioning.partitionType(table);
+
+ return buildExpression(partitionSpec,
+ (column, value) ->
+ buildFieldPredicate(partitionType, column, value, "partition."),
+ col -> partitionType.field(col) != null);
+ }
+
+ static List getPartitionFields(Table table, boolean latestSpecOnly) {
+ if (latestSpecOnly) {
+ return table.spec().fields();
+ }
+ return table.specs().values().stream()
+ .flatMap(spec -> spec.fields().stream()
+ .filter(f -> !f.transform().isVoid()))
+ .distinct()
+ .toList();
}
@FunctionalInterface
private interface PartitionPredicateBuilder {
- Expression build(String partitionColumn, String partitionValue) throws SemanticException;
+ Expression build(String column, String value) throws SemanticException;
}
- private static Expression buildPartitionExpression(
- Map partitionSpec,
- PartitionPredicateBuilder predicateBuilder,
- Predicate fieldValidator) throws SemanticException {
-
+ private static Expression buildExpression(Map partitionSpec,
+ PartitionPredicateBuilder predicateBuilder, Predicate fieldValidator) throws SemanticException {
Expression predicate = Expressions.alwaysTrue();
-
for (Map.Entry entry : partitionSpec.entrySet()) {
- String partitionColumn = entry.getKey();
-
- // Validate field exists
- if (!fieldValidator.test(partitionColumn)) {
- throw new SemanticException(
- "No partition column by the name: %s".formatted(partitionColumn));
+ String column = entry.getKey();
+ if (!fieldValidator.test(column)) {
+ throw new SemanticException("No partition column by the name: %s".formatted(column));
}
- Expression columnPredicate = predicateBuilder.build(partitionColumn, entry.getValue());
- predicate = Expressions.and(predicate, columnPredicate);
+ predicate = Expressions.and(predicate, predicateBuilder.build(column, entry.getValue()));
}
-
return predicate;
}
- private static Expression buildIdentityPartitionPredicate(Table table, String partitionValue,
- PartitionField partitionField) throws SemanticException {
+ private static Expression buildFieldPredicate(Types.StructType partitionType,
+ String fieldName, String value, String keyPrefix) {
+ String key = keyPrefix + fieldName;
+ Object columnValue = parsePartitionValue(partitionType.field(fieldName).type(), value);
- if (!partitionField.transform().isIdentity()) {
- throw new SemanticException(
- "Partition transforms are not supported here: %s".formatted(partitionField.name()));
- }
- Types.NestedField sourceField = table.schema().findField(partitionField.sourceId());
- Object columnValue = Conversions.fromPartitionString(sourceField.type(), partitionValue);
+ return columnValue == null ?
+ Expressions.isNull(key) : Expressions.equal(key, columnValue);
+ }
- return Expressions.equal(partitionField.name(), columnValue);
+ private static Expression buildIdentityFieldPredicate(Types.StructType partitionType,
+ PartitionField field, String value) throws SemanticException {
+ if (!field.transform().isIdentity()) {
+ throw new SemanticException("Partition transforms are not supported here: %s".formatted(field.name()));
+ }
+ return buildFieldPredicate(partitionType, field.name(), value, "");
}
- private static Expression buildTransformPartitionPredicate(Table table, String partitionValue,
+ private static Expression buildSourceValuePredicate(Table table, String partitionValue,
List partitionFields) {
// Get source field type from first partition field (all share same source)
Types.NestedField sourceField = table.schema().findField(
partitionFields.getFirst().sourceId());
- Object columnValue = Conversions.fromPartitionString(sourceField.type(), partitionValue);
+ Object columnValue = parsePartitionValue(sourceField.type(), partitionValue);
- Expression predicate = Expressions.alwaysFalse();
+ Expression partitionExpr = Expressions.alwaysFalse();
// Create OR expression for each transform on this source column
for (PartitionField partitionField : partitionFields) {
- // Apply the transform to the source value
- @SuppressWarnings("unchecked")
- Transform