diff --git a/.github/workflows/paimon.yml b/.github/workflows/paimon.yml index 25188e81c..25c32f6fa 100644 --- a/.github/workflows/paimon.yml +++ b/.github/workflows/paimon.yml @@ -56,12 +56,29 @@ jobs: java-version: ${{ matrix.javaver }} cache: 'maven' + - name: Build dependencies (skip tests) + run: > + ./build/mvn -B install + -pl ${{ matrix.module }} + -am + -Pscala-${{ matrix.scalaver }} + -Ppaimon-${{ matrix.paimon }} + -P${{ matrix.sparkver }} + -Prelease + -DskipTests + - name: Test Paimon Module - run: ./build/mvn -B test -pl ${{ matrix.module }} -am -Pscala-${{ matrix.scalaver }} -Ppaimon-${{ matrix.paimon }} -P${{ matrix.sparkver }} -Prelease + run: > + ./build/mvn -B test + -pl ${{ matrix.module }} + -Pscala-${{ matrix.scalaver }} + -Ppaimon-${{ matrix.paimon }} + -P${{ matrix.sparkver }} + -Prelease - name: Upload reports if: failure() uses: actions/upload-artifact@v7 with: - name: ${{ matrix.module }}-test-report + name: auron-paimon-test-report path: ${{ matrix.module }}/target/surefire-reports \ No newline at end of file diff --git a/thirdparty/auron-paimon/pom.xml b/thirdparty/auron-paimon/pom.xml index 2f93a035b..d5fb2b0fa 100644 --- a/thirdparty/auron-paimon/pom.xml +++ b/thirdparty/auron-paimon/pom.xml @@ -50,5 +50,37 @@ spark-hive_${scalaVersion} provided + + + + org.apache.paimon + paimon-spark-${shortSparkVersion} + ${paimonVersion} + test + + + org.apache.spark + spark-core_${scalaVersion} + test-jar + test + + + org.apache.spark + spark-catalyst_${scalaVersion} + test-jar + test + + + org.apache.spark + spark-sql_${scalaVersion} + test-jar + test + + + org.apache.auron + spark-extension-shims-spark_${scalaVersion} + ${project.version} + test + diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala new file mode 100644 index 000000000..c0c9b5b14 --- /dev/null +++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron.paimon + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.source.{DataSplit, Split} +import org.apache.paimon.utils.RowDataToObjectArrayConverter +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.NativeConverters +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.hive.auron.paimon.PaimonUtil +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +final case class PaimonFile(filePath: String, fileSize: Long, partitionValues: InternalRow) + +final case class PaimonScanPlan( + table: FileStoreTable, + files: Seq[PaimonFile], + fileFormat: String, + readSchema: StructType, + fileSchema: StructType, + partitionSchema: StructType) + +object PaimonScanSupport extends Logging { + + private val PaimonBaseScanClassName = "org.apache.paimon.spark.PaimonBaseScan" + private val PaimonInputPartitionClassName = "org.apache.paimon.spark.PaimonInputPartition" + + def isPaimonScan(exec: BatchScanExec): Boolean = isPaimonScan(exec.scan) + + private def isPaimonScan(scan: AnyRef): Boolean = { + isInstanceOfClass(scan, PaimonBaseScanClassName) + } + + def plan(exec: BatchScanExec): Option[PaimonScanPlan] = { + val scan = exec.scan + if (!isPaimonScan(scan)) { + return None + } + + val table = paimonTable(scan) match { + case Some(t) => t + case None => + logDebug("Skip native Paimon scan: cannot resolve FileStoreTable from PaimonScan.") + return None + } + + if (!PaimonUtil.isPaimonCowTable(table)) { + logDebug("Skip native Paimon scan: only Paimon COW tables are supported.") + return None + } + + val fileFormat = PaimonUtil.paimonFileFormat(table) + if (!fileFormat.equalsIgnoreCase(PaimonUtil.parquetFormat) && + !fileFormat.equalsIgnoreCase(PaimonUtil.orcFormat)) { + logDebug(s"Skip native Paimon scan: unsupported file format $fileFormat.") + return None + } + + val readSchema = scan.readSchema() + if (!readSchema.fields.forall(f => NativeConverters.isTypeSupported(f.dataType))) { + logDebug("Skip native Paimon scan: unsupported column data type in read schema.") + return None + } + + val partitionKeys = table.schema().partitionKeys().asScala.toSet + val partitionFields = readSchema.fields.filter(f => containsName(partitionKeys, f.name)) + val fileFields = readSchema.fields.filterNot(f => containsName(partitionKeys, f.name)) + val partitionSchema = StructType(partitionFields) + val fileSchema = StructType(fileFields) + + val partitions = inputPartitions(exec) match { + case Some(p) => p + case None => + logDebug("Skip native Paimon scan: failed to obtain input partitions.") + return None + } + if (partitions.isEmpty) { + logDebug("Paimon scan planned with empty input partitions.") + return Some( + PaimonScanPlan(table, Seq.empty, fileFormat, readSchema, fileSchema, partitionSchema)) + } + + val splitsOpt = collectSplits(partitions) + val splits = splitsOpt match { + case Some(s) => s + case None => + logDebug("Skip native Paimon scan: cannot extract splits from input partitions.") + return None + } + + // Only allow COW-style raw-readable splits; reject MOR/MOW or splits with deletion vectors. + val unsupported = splits.find { s => + !s.rawConvertible() || + (s.deletionFiles().isPresent && { + val list = s.deletionFiles().get() + list != null && list.asScala.exists(_ != null) + }) + } + if (unsupported.isDefined) { + logDebug("Skip native Paimon scan: split is not raw-convertible or has deletion files.") + return None + } + + val partitionConverter = new RowDataToObjectArrayConverter( + table.schema().logicalPartitionType()) + val sessionLocalTimeZone = SQLConf.get.sessionLocalTimeZone + val tzOption: String = { + val props = CaseInsensitiveMap(table.options().asScala.toMap) + props.getOrElse(DateTimeUtils.TIMEZONE_OPTION, sessionLocalTimeZone) + } + + val files = splits.flatMap { split => + val partitionValues = if (partitionSchema.isEmpty) { + InternalRow.empty + } else { + toPartitionRow( + partitionConverter.convert(split.partition()), + partitionSchema, + table.schema().partitionKeys().asScala.toSeq, + tzOption) + } + split.dataFiles().asScala.map { dataFile => + val filePath = s"${split.bucketPath()}/${dataFile.fileName()}" + PaimonFile(filePath, dataFile.fileSize(), partitionValues) + } + } + + Some(PaimonScanPlan(table, files, fileFormat, readSchema, fileSchema, partitionSchema)) + } + + private def containsName(names: Set[String], target: String): Boolean = { + val resolver = SQLConf.get.resolver + names.exists(n => resolver(n, target)) + } + + // Build a Spark InternalRow for partition values matching partitionSchema's data types. + // Partition values from Paimon are returned in the table's partition-key order; we reorder + // them to match partitionSchema and cast strings/temporals into the requested types. + private def toPartitionRow( + paimonValues: Array[AnyRef], + partitionSchema: StructType, + partitionKeys: Seq[String], + timeZoneId: String): InternalRow = { + val resolver = SQLConf.get.resolver + val indexByName = partitionKeys.zipWithIndex.toMap + InternalRow.fromSeq(partitionSchema.fields.map { field => + val idx = indexByName + .find { case (k, _) => resolver(k, field.name) } + .map(_._2) + .getOrElse(-1) + val raw = if (idx >= 0 && idx < paimonValues.length) paimonValues(idx) else null + val literal: Literal = raw match { + case null => Literal(null, field.dataType) + case v => Literal(v.toString) + } + Cast(literal, field.dataType, Option(timeZoneId)).eval() + }) + } + + private def collectSplits(partitions: Seq[InputPartition]): Option[Seq[DataSplit]] = { + val buf = scala.collection.mutable.ArrayBuffer.empty[DataSplit] + partitions.foreach { p => + if (!isInstanceOfClass(p, PaimonInputPartitionClassName)) { + return None + } + val splits = invokeMethod(p, "splits") match { + case Some(s: scala.collection.Seq[_]) => s.toSeq + case _ => return None + } + splits.foreach { + case ds: DataSplit => buf += ds + case _: Split => return None + case _ => return None + } + } + Some(buf.toSeq) + } + + private def paimonTable(scan: AnyRef): Option[FileStoreTable] = { + invokeMethod(scan, "table") match { + case Some(t: FileStoreTable) => Some(t) + case Some(other) => + logDebug(s"Unexpected Paimon table type: ${other.getClass.getName}") + None + case None => None + } + } + + // DSv2 BatchScanExec exposes input partitions via Scan.toBatch (preferred) or a method on + // the exec itself; the latter varies across Spark versions, so we attempt both. + // Returns Some(partitions) on success (possibly empty if the table is empty), or None when + // partition planning fails - the caller falls back to Spark execution on None. + private def inputPartitions(exec: BatchScanExec): Option[Seq[InputPartition]] = { + try { + val batch = exec.scan.toBatch + if (batch != null) { + val parts = batch.planInputPartitions() + if (parts != null) return Some(parts.toSeq) + logWarning("Paimon Scan.toBatch.planInputPartitions() returned null.") + return None + } + logWarning("Paimon Scan.toBatch returned null.") + } catch { + case NonFatal(t) => + logWarning("Failed to plan Paimon input partitions via Scan.toBatch.", t) + return None + } + + val methods = exec.getClass.getMethods + val m = + methods.find(_.getName == "inputPartitions").orElse(methods.find(_.getName == "partitions")) + if (m.isEmpty) { + logWarning( + "BatchScanExec exposes no inputPartitions/partitions method; cannot plan Paimon scan.") + return None + } + try { + m.map(_.invoke(exec)) match { + case Some(s: scala.collection.Seq[_]) + if s.nonEmpty && s.head.isInstanceOf[scala.collection.Seq[_]] => + Some( + s.asInstanceOf[scala.collection.Seq[scala.collection.Seq[InputPartition]]] + .flatten + .toSeq) + case Some(s: scala.collection.Seq[_]) => + Some(s.asInstanceOf[scala.collection.Seq[InputPartition]].toSeq) + case other => + logWarning( + s"Unexpected return type from BatchScanExec partitions method: ${other.getClass}.") + None + } + } catch { + case NonFatal(t) => + logWarning("Failed to read Paimon input partitions via reflection.", t) + None + } + } + + private def isInstanceOfClass(obj: AnyRef, className: String): Boolean = { + if (obj == null) return false + var c: Class[_] = obj.getClass + while (c != null) { + if (c.getName == className) return true + c.getInterfaces.foreach { i => + if (i.getName == className) return true + } + c = c.getSuperclass + } + false + } + + private def invokeMethod(target: AnyRef, methodName: String): Option[Any] = { + try { + var cls: Class[_] = target.getClass + while (cls != null) { + cls.getDeclaredMethods.find(m => + m.getName == methodName && m.getParameterCount == 0) match { + case Some(m) => + m.setAccessible(true) + return Some(m.invoke(target)) + case None => + cls = cls.getSuperclass + } + } + None + } catch { + case NonFatal(t) => + logDebug(s"Failed to invoke $methodName on ${target.getClass.getName}", t) + None + } + } +} diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePaimonV2TableScanExec.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePaimonV2TableScanExec.scala new file mode 100644 index 000000000..b1cefcb26 --- /dev/null +++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePaimonV2TableScanExec.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.auron.plan + +import java.net.URI +import java.security.PrivilegedExceptionAction +import java.util.Locale +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.FileSystem +import org.apache.spark.Partition +import org.apache.spark.TaskContext +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims} +import org.apache.spark.sql.auron.paimon.{PaimonFile, PaimonScanPlan} +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.hive.auron.paimon.PaimonUtil +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +import org.apache.auron.{protobuf => pb} +import org.apache.auron.jni.JniBridge +import org.apache.auron.metric.SparkMetricNode + +case class NativePaimonV2TableScanExec(basedScan: BatchScanExec, plan: PaimonScanPlan) + extends LeafExecNode + with NativeSupports + with Logging { + + override lazy val metrics: Map[String, SQLMetric] = + NativeHelper.getNativeFileScanMetrics(sparkContext) ++ Seq( + "numPartitions" -> SQLMetrics.createMetric(sparkContext, "Native.partitions_read"), + "numFiles" -> SQLMetrics.createMetric(sparkContext, "Native.files_read")) + + override val output = basedScan.output + override val outputPartitioning = basedScan.outputPartitioning + + private lazy val fileSchema: StructType = plan.fileSchema + private lazy val partitionSchema: StructType = plan.partitionSchema + private lazy val files: Seq[PaimonFile] = plan.files + + private lazy val partitions: Array[FilePartition] = { + val filePartitions = buildFilePartitions() + postDriverMetrics(filePartitions) + filePartitions + } + private lazy val fileSizes: Map[String, Long] = + files.map(f => f.filePath -> f.fileSize).toMap + + private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema) + private lazy val nativePartitionSchema: pb.Schema = + NativeConverters.convertSchema(partitionSchema) + + // Project the output attributes onto the (fileSchema ++ partitionSchema) layout used by the + // native scan. Index lookup follows SQLConf.caseSensitiveAnalysis so that the projection + // remains correct under case-insensitive analysis (mirrors NativeIcebergTableScanExec). + private lazy val combinedSchema: StructType = + StructType(fileSchema.fields ++ partitionSchema.fields) + + private lazy val caseSensitive: Boolean = SQLConf.get.caseSensitiveAnalysis + + private lazy val fieldIndexByName: Map[String, Int] = { + if (caseSensitive) { + combinedSchema.fieldNames.zipWithIndex.toMap + } else { + combinedSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap + } + } + + private def fieldIndexFor(name: String): Int = { + if (caseSensitive) { + fieldIndexByName.getOrElse(name, combinedSchema.fieldIndex(name)) + } else { + fieldIndexByName.getOrElse(name.toLowerCase(Locale.ROOT), combinedSchema.fieldIndex(name)) + } + } + + private lazy val projection: Seq[Integer] = + output.map(attr => Integer.valueOf(fieldIndexFor(attr.name))) + + override def doExecuteNative(): NativeRDD = { + if (partitions.isEmpty) { + return new EmptyNativeRDD(sparkContext) + } + + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + + val fileFormat = plan.fileFormat + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + val nativeFileGroups = this.nativeFileGroups + val nativeFileSchema = this.nativeFileSchema + val nativePartitionSchema = this.nativePartitionSchema + val projection = this.projection + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativePaimonV2TableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + + if (fileFormat.equalsIgnoreCase(PaimonUtil.orcFormat)) { + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) + + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + } else { + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.PaimonV2Scan") + } + + override val nodeName: String = "NativePaimonV2TableScan" + + override protected def doCanonicalize(): SparkPlan = basedScan.canonicalized + + private lazy val nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => + { + val nativePartitionedFile = (file: PartitionedFile) => { + val filePath = file.filePath.toString + val size = fileSizes.getOrElse(filePath, file.length) + val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => + NativeConverters + .convertExpr( + org.apache.spark.sql.catalyst.expressions + .Literal(file.partitionValues.get(index, field.dataType), field.dataType)) + .getLiteral + } + pb.PartitionedFile + .newBuilder() + .setPath(filePath) + .setSize(size) + .setLastModifiedNs(0) + .addAllPartitionValues(nativePartitionValues.asJava) + .setRange( + pb.FileRange + .newBuilder() + .setStart(file.start) + .setEnd(file.start + file.length) + .build()) + .build() + } + pb.FileGroup + .newBuilder() + .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava) + .build() + } + + private def postDriverMetrics(filePartitions: Array[FilePartition]): Unit = { + val numPartitions = filePartitions.length + metrics("numPartitions").add(numPartitions) + val numFiles = filePartitions.foldLeft(0L)(_ + _.files.length) + metrics("numFiles").add(numFiles) + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates( + sparkContext, + executionId, + Seq(metrics("numPartitions"), metrics("numFiles"))) + } + + private def buildFilePartitions(): Array[FilePartition] = { + if (files.isEmpty) { + return Array.empty + } + + val sparkSession = Shims.get.getSqlContext(basedScan).sparkSession + val isSplitable = + plan.fileFormat.equalsIgnoreCase(PaimonUtil.parquetFormat) || + plan.fileFormat.equalsIgnoreCase(PaimonUtil.orcFormat) + val maxSplitBytes = getMaxSplitBytes(sparkSession, files) + val partitionedFiles = files + .flatMap { f => + if (isSplitable) { + (0L until f.fileSize by maxSplitBytes).map { offset => + val remaining = f.fileSize - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + Shims.get.getPartitionedFile(f.partitionValues, f.filePath, offset, size) + } + } else { + Seq(Shims.get.getPartitionedFile(f.partitionValues, f.filePath, 0, f.fileSize)) + } + } + .sortBy(_.length)(Ordering[Long].reverse) + .toSeq + + FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes).toArray + } + + private def getMaxSplitBytes(sparkSession: SparkSession, fs: Seq[PaimonFile]): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + val minPartitionNum = Shims.get.getMinPartitionNum(sparkSession) + val totalBytes = fs.map(_.fileSize + openCostInBytes).sum + val bytesPerCore = if (minPartitionNum > 0) totalBytes / minPartitionNum else totalBytes + + Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + } + + private def putJniBridgeResource( + resourceId: String, + broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = { + val sharedConf = broadcastedHadoopConf.value.value + JniBridge.putResource( + resourceId, + (location: String) => { + val getFsTimeMetric = metrics("io_time_getfs") + val currentTimeMillis = System.currentTimeMillis() + val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] { + override def run(): FileSystem = FileSystem.get(new URI(location), sharedConf) + }) + getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000) + fs + }) + } + + private def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = { + val sparkSession = Shims.get.getSqlContext(basedScan).sparkSession + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(Map.empty) + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + } +} diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala index 21b2ed03a..47a3f5e2e 100644 --- a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala +++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.hive.auron.paimon import org.apache.spark.internal.Logging import org.apache.spark.sql.auron.AuronConverters import org.apache.spark.sql.auron.AuronConvertProvider +import org.apache.spark.sql.auron.paimon.PaimonScanSupport import org.apache.spark.sql.auron.util.AuronLogUtils import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.auron.plan.NativePaimonV2TableScanExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.hive.execution.auron.plan.NativePaimonTableScanExec @@ -32,6 +35,8 @@ class PaimonConvertProvider extends AuronConvertProvider with Logging { exec match { case _: HiveTableScanExec => SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get() + case _: BatchScanExec => + SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get() case _ => false } @@ -43,6 +48,8 @@ class PaimonConvertProvider extends AuronConvertProvider with Logging { if e.relation.tableMeta.storage.serde.isDefined && e.relation.tableMeta.storage.serde.get.contains("Paimon") => true + case e: BatchScanExec => + PaimonScanSupport.isPaimonScan(e) && PaimonScanSupport.plan(e).nonEmpty case _ => false } } @@ -50,6 +57,7 @@ class PaimonConvertProvider extends AuronConvertProvider with Logging { override def convert(exec: SparkPlan): SparkPlan = { exec match { case hiveExec: HiveTableScanExec => convertPaimonTableScanExec(hiveExec) + case batchScan: BatchScanExec => convertPaimonV2BatchScanExec(batchScan) case _ => exec } } @@ -72,4 +80,19 @@ class PaimonConvertProvider extends AuronConvertProvider with Logging { AuronConverters.addRenameColumnsExec(NativePaimonTableScanExec(exec)) } + + private def convertPaimonV2BatchScanExec(exec: BatchScanExec): SparkPlan = { + PaimonScanSupport.plan(exec) match { + case Some(plan) => + AuronLogUtils.logDebugPlanConversion( + exec, + Seq( + "scan" -> exec.scan.getClass, + "output" -> exec.output, + "fileFormat" -> plan.fileFormat, + "numFiles" -> plan.files.size)) + AuronConverters.addRenameColumnsExec(NativePaimonV2TableScanExec(exec, plan)) + case None => exec + } + } } diff --git a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala new file mode 100644 index 000000000..8f32b4f53 --- /dev/null +++ b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.paimon + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ + +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.auron.plan.NativePaimonV2TableScanExec +import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates + +class AuronPaimonV2IntegrationSuite + extends org.apache.spark.sql.QueryTest + with BaseAuronPaimonSuite { + + override def beforeAll(): Unit = { + super.beforeAll() + sql("create database if not exists paimon.db") + } + + test("paimon v2 native scan runs simple COW select") { + withTable("paimon.db.t1") { + sql("create table paimon.db.t1 (id int, v string) using paimon") + sql("insert into paimon.db.t1 values (1, 'a'), (2, 'b')") + val df = sql("select * from paimon.db.t1") + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + assertNativePaimonScanApplied(df) + } + } + + test("paimon v2 native scan supports projection") { + withTable("paimon.db.t_proj") { + sql("create table paimon.db.t_proj (id int, v string) using paimon") + sql("insert into paimon.db.t_proj values (1, 'a'), (2, 'b')") + val df = sql("select id from paimon.db.t_proj") + checkAnswer(df, Seq(Row(1), Row(2))) + assertNativePaimonScanApplied(df) + } + } + + test("paimon v2 native scan supports partitioned table with predicate") { + withTable("paimon.db.t_part") { + sql(""" + |create table paimon.db.t_part (id int, v string, p string) + |using paimon + |partitioned by (p) + |""".stripMargin) + sql("insert into paimon.db.t_part values (1, 'a', 'p1'), (2, 'b', 'p2')") + val df = sql("select * from paimon.db.t_part where p = 'p1'") + checkAnswer(df, Seq(Row(1, "a", "p1"))) + assertNativePaimonScanApplied(df) + } + } + + test("paimon v2 native scan supports ORC COW table") { + withTable("paimon.db.t_orc") { + sql(""" + |create table paimon.db.t_orc (id int, v string) + |using paimon + |tblproperties ('file.format' = 'orc') + |""".stripMargin) + sql("insert into paimon.db.t_orc values (1, 'a'), (2, 'b')") + val df = sql("select * from paimon.db.t_orc") + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + assertNativePaimonScanApplied(df) + } + } + + test("paimon v2 native scan handles empty table") { + withTable("paimon.db.t_empty") { + sql("create table paimon.db.t_empty (id int, v string) using paimon") + val df = sql("select * from paimon.db.t_empty") + checkAnswer(df, Seq.empty) + assertNativePaimonScanApplied(df) + } + } + + test("paimon v2 scan exposes file scan driver metrics") { + withTable("paimon.db.t_metrics") { + sql("create table paimon.db.t_metrics (id int, v string) using paimon") + sql("insert into paimon.db.t_metrics values (1, 'a')") + withSQLConf("spark.sql.adaptive.enabled" -> "false") { + val df = sql("select * from paimon.db.t_metrics") + val nativeScan = executedNativeScan(df) + val metricIds = Map( + "numPartitions" -> nativeScan.metrics("numPartitions").id, + "numFiles" -> nativeScan.metrics("numFiles").id) + val driverMetricUpdates = new ConcurrentLinkedQueue[(Long, Long)]() + val driverMetricUpdatesPosted = new CountDownLatch(1) + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case SparkListenerDriverAccumUpdates(_, updates) => + updates.foreach { case (metricId, value) => + driverMetricUpdates.add(metricId -> value) + } + val updatedMetricIds = driverMetricUpdates.iterator().asScala.map(_._1).toSet + if (metricIds.values.forall(updatedMetricIds.contains)) { + driverMetricUpdatesPosted.countDown() + } + case _ => + } + } + + spark.sparkContext.addSparkListener(listener) + try { + checkAnswer(df, Seq(Row(1, "a"))) + assert(driverMetricUpdatesPosted.await(30, TimeUnit.SECONDS)) + } finally { + spark.sparkContext.removeSparkListener(listener) + } + + val driverMetricValues = driverMetricUpdates + .iterator() + .asScala + .toSeq + .groupBy(_._1) + .mapValues(_.map(_._2).sum) + .toMap + assert(driverMetricValues.getOrElse(metricIds("numPartitions"), 0L) > 0) + assert(driverMetricValues.getOrElse(metricIds("numFiles"), 0L) > 0) + } + } + } + + test("paimon v2 native scan falls back when spark.auron.enable.paimon.scan=false") { + withTable("paimon.db.t_disable") { + sql("create table paimon.db.t_disable (id int, v string) using paimon") + sql("insert into paimon.db.t_disable values (1, 'a')") + withSQLConf("spark.auron.enable.paimon.scan" -> "false") { + val df = sql("select * from paimon.db.t_disable") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativePaimonV2TableScan")) + } + } + } + + test("paimon v2 native scan falls back for MOR (merge-on-read) table") { + withTable("paimon.db.t_mor") { + sql(""" + |create table paimon.db.t_mor (id int, v string) + |using paimon + |tblproperties ( + | 'primary-key' = 'id', + | 'bucket' = '2' + |) + |""".stripMargin) + sql("insert into paimon.db.t_mor values (1, 'a'), (2, 'b')") + val df = sql("select * from paimon.db.t_mor") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativePaimonV2TableScan")) + } + } + + private def assertNativePaimonScanApplied(df: DataFrame): Unit = { + val plan = df.queryExecution.executedPlan.toString() + assert( + plan.contains("NativePaimonV2TableScan"), + s"plan should use native paimon scan:\n$plan") + } + + private def executedNativeScan(df: DataFrame): NativePaimonV2TableScanExec = { + val nativeScan = df.queryExecution.executedPlan.collectFirst { + case scan: NativePaimonV2TableScanExec => scan + } + assert(nativeScan.nonEmpty, "expected NativePaimonV2TableScanExec in executed plan") + nativeScan.get + } +} diff --git a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/BaseAuronPaimonSuite.scala b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/BaseAuronPaimonSuite.scala new file mode 100644 index 000000000..8f324d296 --- /dev/null +++ b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/BaseAuronPaimonSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.paimon + +import java.io.File +import java.nio.file.{Files, Paths} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.test.SharedSparkSession + +trait BaseAuronPaimonSuite extends SharedSparkSession { + + protected lazy val paimonWarehouse: String = { + // java.io.tmpdir is set to target/tmp in the parent pom, which may not exist yet + val tmpDir = Files.createDirectories(Paths.get(System.getProperty("java.io.tmpdir"))) + val dir = Files.createTempDirectory(tmpDir, "auron-paimon-warehouse-").toFile + dir.deleteOnExit() + dir.getAbsolutePath + } + + override protected def sparkConf: SparkConf = { + val extraJavaOptions = + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " + + "-Dio.netty.tryReflectionSetAccessible=true" + super.sparkConf + .set( + "spark.sql.extensions", + "org.apache.spark.sql.auron.AuronSparkSessionExtension," + + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .set("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog") + .set("spark.sql.catalog.paimon.warehouse", s"file:$paimonWarehouse") + .set("spark.auron.enabled", "true") + .set("spark.auron.enable.paimon.scan", "true") + .set( + "spark.shuffle.manager", + "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") + .set("spark.auron.enable.shuffleExchange", "true") + .set("spark.auron.enable.project", "false") + .set("spark.auron.ui.enabled", "false") + .set("spark.ui.enabled", "false") + .set("spark.driver.extraJavaOptions", extraJavaOptions) + .set("spark.executor.extraJavaOptions", extraJavaOptions) + } + + override def afterAll(): Unit = { + try { + super.afterAll() + } finally { + deleteRecursively(new File(paimonWarehouse)) + } + } + + private def deleteRecursively(file: File): Unit = { + if (file.isDirectory) { + val children = file.listFiles() + if (children != null) children.foreach(deleteRecursively) + } + file.delete() + } +}