diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala index 314c65f3..ca6e01bd 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala @@ -90,7 +90,15 @@ object ExprUtils extends SQLConfHelper { case unsupported => throw CHClientException(s"Unsupported ClickHouse expression: $unsupported") } - def toClickHouse(transform: Transform): Expr = transform match { + def toClickHouseOpt(v2Expr: V2Expression): Option[Expr] = Try(toClickHouse(v2Expr)).toOption + + def toClickHouse(v2Expr: V2Expression): Expr = v2Expr match { + // sort order + case sortOrder: SortOrder => + val asc = sortOrder.direction == SortDirection.ASCENDING + val nullFirst = sortOrder.nullOrdering == NullOrdering.NULLS_FIRST + OrderExpr(toClickHouse(sortOrder.expression), asc, nullFirst) + // transform case YearsTransform(FieldReference(Seq(col))) => FuncExpr("toYear", List(FieldRef(col))) case MonthsTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMM", List(FieldRef(col))) case DaysTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMMDD", List(FieldRef(col))) @@ -98,7 +106,12 @@ object ExprUtils extends SQLConfHelper { case IdentityTransform(fieldRefs) => FieldRef(fieldRefs.describe) case ApplyTransform(name, args) => FuncExpr(name, args.map(arg => SQLExpr(arg.describe())).toList) case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket") - case other: Transform => throw CHClientException(s"Unsupported transform: $other") + // others + case l: Literal[_] => SQLExpr(l.toString) + case FieldReference(Seq(col)) => FieldRef(col) + case gse: GeneralScalarExpression => SQLExpr(gse.toString) // TODO: excluding unsupported + // unsupported + case unsupported: V2Expression => throw CHClientException(s"Unsupported expression: $unsupported") } def inferTransformSchema( diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseRead.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseRead.scala index 47616847..a38b3282 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseRead.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseRead.scala @@ -16,8 +16,9 @@ package xenon.clickhouse.read import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.clickhouse.ClickHouseSQLConf._ -import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.clickhouse.ExprUtils import org.apache.spark.sql.connector.expressions.aggregate.Aggregation +import org.apache.spark.sql.connector.expressions.{SortOrder, Transform} import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.read.partitioning.{Partitioning, UnknownPartitioning} @@ -38,6 +39,7 @@ class ClickHouseScanBuilder( metadataSchema: StructType, partitionTransforms: Array[Transform] ) extends ScanBuilder + with SupportsPushDownTopN with SupportsPushDownLimit with SupportsPushDownFilters with SupportsPushDownAggregates @@ -56,6 +58,8 @@ class ClickHouseScanBuilder( physicalSchema.fields ++ reservedMetadataSchema.fields ) + private var _orders: Option[String] = None + private var _limit: Option[Int] = None override def pushLimit(limit: Int): Boolean = { @@ -63,6 +67,16 @@ class ClickHouseScanBuilder( true } + override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = { + val translated = orders.map(sortOrder => ExprUtils.toClickHouseOpt(sortOrder)) + if (translated.exists(_.isEmpty)) { + return false + } + this._orders = Some(translated.flatten.mkString(" ")) + this._limit = Some(limit) + true + } + private var _pushedFilters = Array.empty[Filter] override def pushedFilters: Array[Filter] = this._pushedFilters @@ -121,6 +135,7 @@ class ClickHouseScanBuilder( readSchema = _readSchema, filtersExpr = compileFilters(AlwaysTrue :: pushedFilters.toList), groupByClause = _groupByClause, + orderByClause = _orders.map(_.mkString("ORDER BY", " ", "")), limit = _limit )) } diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseReader.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseReader.scala index 42d14a6c..2fa692ca 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseReader.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ClickHouseReader.scala @@ -58,6 +58,7 @@ abstract class ClickHouseReader[Record]( |FROM `$database`.`$table` |WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr}) |${scanJob.groupByClause.getOrElse("")} + |${scanJob.orderByClause.getOrElse("")} |${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")} |""".stripMargin } diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ScanJobDescription.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ScanJobDescription.scala index f73f7c08..dd61f6ee 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ScanJobDescription.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/read/ScanJobDescription.scala @@ -35,6 +35,7 @@ case class ScanJobDescription( // into Scan tasks because the check happens in planing phase on driver side. filtersExpr: String = "1=1", groupByClause: Option[String] = None, + orderByClause: Option[String] = None, limit: Option[Int] = None ) {