Skip to content

Commit 0ff48d9

Browse files
Yxangpan3793
authored andcommitted
Spark 3.4: Write supports using V2 functions for distribution and ordering
1 parent e165698 commit 0ff48d9

File tree

6 files changed

+190
-86
lines changed

6 files changed

+190
-86
lines changed

spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/WriteDistributionAndOrderingSuite.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,8 @@ class WriteDistributionAndOrderingSuite extends SparkClickHouseSingleTest {
7878
WRITE_REPARTITION_BY_PARTITION.key -> repartitionByPartition.toString,
7979
WRITE_LOCAL_SORT_BY_KEY.key -> localSortByKey.toString
8080
) {
81-
if (!ignoreUnsupportedTransform && repartitionByPartition) {
82-
intercept[AnalysisException](write())
83-
} else {
84-
write()
85-
check()
86-
}
81+
write()
82+
check()
8783
}
8884

8985
Seq(true, false).foreach { ignoreUnsupportedTransform =>

spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala

Lines changed: 119 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -15,105 +15,168 @@
1515
package org.apache.spark.sql.clickhouse
1616

1717
import org.apache.spark.sql.AnalysisException
18-
import org.apache.spark.sql.catalyst.SQLConfHelper
19-
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression}
18+
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
19+
import org.apache.spark.sql.catalyst.{expressions, SQLConfHelper}
20+
import org.apache.spark.sql.catalyst.expressions.BoundReference
21+
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, TransformExpression, V2ExpressionUtils}
2022
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.IGNORE_UNSUPPORTED_TRANSFORM
23+
import org.apache.spark.sql.connector.catalog.Identifier
24+
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
2125
import org.apache.spark.sql.connector.expressions.Expressions._
22-
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, _}
23-
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
26+
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, SortOrder => V2SortOrder}
27+
import org.apache.spark.sql.connector.expressions._
28+
import org.apache.spark.sql.types.{StructField, StructType}
2429
import xenon.clickhouse.exception.CHClientException
2530
import xenon.clickhouse.expr._
31+
import xenon.clickhouse.func.FunctionRegistry
32+
import xenon.clickhouse.spec.ClusterSpec
2633

27-
import scala.annotation.tailrec
2834
import scala.util.{Failure, Success, Try}
2935

30-
object ExprUtils extends SQLConfHelper {
36+
object ExprUtils extends SQLConfHelper with Serializable {
3137

32-
def toSparkPartitions(partitionKey: Option[List[Expr]]): Array[Transform] =
33-
partitionKey.seq.flatten.flatten(toSparkTransformOpt).toArray
38+
def toSparkPartitions(
39+
partitionKey: Option[List[Expr]],
40+
functionRegistry: FunctionRegistry
41+
): Array[Transform] =
42+
partitionKey.seq.flatten.flatten(toSparkTransformOpt(_, functionRegistry)).toArray
3443

35-
def toSparkSplits(shardingKey: Option[Expr], partitionKey: Option[List[Expr]]): Array[Transform] =
36-
(shardingKey.seq ++ partitionKey.seq.flatten).flatten(toSparkTransformOpt).toArray
44+
def toSparkSplits(
45+
shardingKey: Option[Expr],
46+
partitionKey: Option[List[Expr]],
47+
functionRegistry: FunctionRegistry
48+
): Array[Transform] =
49+
(shardingKey.seq ++ partitionKey.seq.flatten).flatten(toSparkTransformOpt(_, functionRegistry)).toArray
3750

3851
def toSparkSortOrders(
3952
shardingKeyIgnoreRand: Option[Expr],
4053
partitionKey: Option[List[Expr]],
41-
sortingKey: Option[List[OrderExpr]]
42-
): Array[SortOrder] =
43-
toSparkSplits(shardingKeyIgnoreRand, partitionKey).map(Expressions.sort(_, SortDirection.ASCENDING)) ++:
54+
sortingKey: Option[List[OrderExpr]],
55+
cluster: Option[ClusterSpec],
56+
functionRegistry: FunctionRegistry
57+
): Array[V2SortOrder] =
58+
toSparkSplits(
59+
shardingKeyIgnoreRand,
60+
partitionKey,
61+
functionRegistry
62+
).map(Expressions.sort(_, SortDirection.ASCENDING)) ++:
4463
sortingKey.seq.flatten.flatten { case OrderExpr(expr, asc, nullFirst) =>
4564
val direction = if (asc) SortDirection.ASCENDING else SortDirection.DESCENDING
4665
val nullOrder = if (nullFirst) NullOrdering.NULLS_FIRST else NullOrdering.NULLS_LAST
47-
toSparkTransformOpt(expr).map(trans => Expressions.sort(trans, direction, nullOrder))
66+
toSparkTransformOpt(expr, functionRegistry).map(trans =>
67+
Expressions.sort(trans, direction, nullOrder)
68+
)
4869
}.toArray
4970

50-
@tailrec
51-
def toCatalyst(v2Expr: V2Expression, fields: Array[StructField]): Expression =
71+
private def loadV2FunctionOpt(
72+
name: String,
73+
args: Seq[Expression],
74+
functionRegistry: FunctionRegistry
75+
): Option[BoundFunction] = {
76+
def loadFunction(ident: Identifier): UnboundFunction =
77+
functionRegistry.load(ident.name).getOrElse(throw new NoSuchFunctionException(ident))
78+
val inputType = StructType(args.zipWithIndex.map {
79+
case (exp, pos) => StructField(s"_$pos", exp.dataType, exp.nullable)
80+
})
81+
try {
82+
val unbound = loadFunction(Identifier.of(Array.empty, name))
83+
Some(unbound.bind(inputType))
84+
} catch {
85+
case e: NoSuchFunctionException =>
86+
throw e
87+
case _: UnsupportedOperationException if conf.getConf(IGNORE_UNSUPPORTED_TRANSFORM) =>
88+
None
89+
case e: UnsupportedOperationException =>
90+
throw new AnalysisException(e.getMessage, cause = Some(e))
91+
}
92+
}
93+
94+
def resolveTransformCatalyst(
95+
catalystExpr: Expression,
96+
timeZoneId: Option[String] = None
97+
): Expression = catalystExpr match {
98+
case TransformExpression(function: ScalarFunction[_], args, _) =>
99+
val resolvedArgs: Seq[Expression] = args.map(resolveTransformCatalyst(_, timeZoneId))
100+
val castedArgs: Seq[Expression] = resolvedArgs.zip(function.inputTypes()).map {
101+
case (arg, expectedType) if !arg.dataType.sameType(expectedType) => Cast(arg, expectedType, timeZoneId)
102+
case (arg, _) => arg
103+
}
104+
V2ExpressionUtils.resolveScalarFunction(function, castedArgs)
105+
case other => other
106+
}
107+
108+
def toCatalyst(
109+
v2Expr: V2Expression,
110+
fields: Array[StructField],
111+
functionRegistry: FunctionRegistry
112+
): Expression =
52113
v2Expr match {
53-
case IdentityTransform(ref) => toCatalyst(ref, fields)
114+
case IdentityTransform(ref) => toCatalyst(ref, fields, functionRegistry)
54115
case ref: NamedReference if ref.fieldNames.length == 1 =>
55116
val (field, ordinal) = fields
56117
.zipWithIndex
57118
.find { case (field, _) => field.name == ref.fieldNames.head }
58119
.getOrElse(throw CHClientException(s"Invalid field reference: $ref"))
59120
BoundReference(ordinal, field.dataType, field.nullable)
121+
case t: Transform =>
122+
val catalystArgs = t.arguments().map(toCatalyst(_, fields, functionRegistry))
123+
loadV2FunctionOpt(t.name(), catalystArgs, functionRegistry)
124+
.map(bound => TransformExpression(bound, catalystArgs)).getOrElse {
125+
throw CHClientException(s"Unsupported expression: $v2Expr")
126+
}
127+
case literal: LiteralValue[Any] => expressions.Literal(literal.value)
60128
case _ => throw CHClientException(
61-
s"Unsupported V2 expression: $v2Expr, SPARK-33779: Spark 3.3 only support IdentityTransform"
129+
s"Unsupported expression: $v2Expr"
62130
)
63131
}
64132

65-
def toSparkTransformOpt(expr: Expr): Option[Transform] = Try(toSparkTransform(expr)) match {
66-
case Success(t) => Some(t)
67-
case Failure(_) if conf.getConf(IGNORE_UNSUPPORTED_TRANSFORM) => None
68-
case Failure(rethrow) => throw new AnalysisException(rethrow.getMessage, cause = Some(rethrow))
69-
}
70-
71-
// Some functions of ClickHouse which match Spark pre-defined Transforms
72-
//
73-
// toYear, YEAR - Converts a date or date with time to a UInt16 (AD)
74-
// toYYYYMM - Converts a date or date with time to a UInt32 (YYYY*100 + MM)
75-
// toYYYYMMDD - Converts a date or date with time to a UInt32 (YYYY*10000 + MM*100 + DD)
76-
// toHour, HOUR - Converts a date with time to a UInt8 (0-23)
133+
def toSparkTransformOpt(expr: Expr, functionRegistry: FunctionRegistry): Option[Transform] =
134+
Try(toSparkExpression(expr, functionRegistry)) match {
135+
// need this function because spark `Table`'s `partitioning` field should be `Transform`
136+
case Success(t: Transform) => Some(t)
137+
case Success(_) => None
138+
case Failure(_) if conf.getConf(IGNORE_UNSUPPORTED_TRANSFORM) => None
139+
case Failure(rethrow) => throw new AnalysisException(rethrow.getMessage, cause = Some(rethrow))
140+
}
77141

78-
def toSparkTransform(expr: Expr): Transform = expr match {
79-
case FieldRef(col) => identity(col)
80-
case FuncExpr("toYear", List(FieldRef(col))) => years(col)
81-
case FuncExpr("YEAR", List(FieldRef(col))) => years(col)
82-
case FuncExpr("toYYYYMM", List(FieldRef(col))) => months(col)
83-
case FuncExpr("toYYYYMMDD", List(FieldRef(col))) => days(col)
84-
case FuncExpr("toHour", List(FieldRef(col))) => hours(col)
85-
case FuncExpr("HOUR", List(FieldRef(col))) => hours(col)
86-
// TODO support arbitrary functions
87-
// case FuncExpr("xxHash64", List(FieldRef(col))) => apply("ck_xx_hash64", column(col))
88-
case FuncExpr("rand", Nil) => apply("rand")
89-
case FuncExpr("toYYYYMMDD", List(FuncExpr("toDate", List(FieldRef(col))))) => identity(col)
90-
case unsupported => throw CHClientException(s"Unsupported ClickHouse expression: $unsupported")
91-
}
142+
def toSparkExpression(expr: Expr, functionRegistry: FunctionRegistry): V2Expression =
143+
expr match {
144+
case FieldRef(col) => identity(col)
145+
case StringLiteral(value) => literal(value)
146+
case FuncExpr("rand", Nil) => apply("rand")
147+
case FuncExpr("toYYYYMMDD", List(FuncExpr("toDate", List(FieldRef(col))))) => identity(col)
148+
case FuncExpr(funName, args) if functionRegistry.clickHouseToSparkFunc.contains(funName) =>
149+
apply(functionRegistry.clickHouseToSparkFunc(funName), args.map(toSparkExpression(_, functionRegistry)): _*)
150+
case unsupported => throw CHClientException(s"Unsupported ClickHouse expression: $unsupported")
151+
}
92152

93-
def toClickHouse(transform: Transform): Expr = transform match {
94-
case YearsTransform(FieldReference(Seq(col))) => FuncExpr("toYear", List(FieldRef(col)))
95-
case MonthsTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMM", List(FieldRef(col)))
96-
case DaysTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMMDD", List(FieldRef(col)))
97-
case HoursTransform(FieldReference(Seq(col))) => FuncExpr("toHour", List(FieldRef(col)))
153+
def toClickHouse(
154+
transform: Transform,
155+
functionRegistry: FunctionRegistry
156+
): Expr = transform match {
98157
case IdentityTransform(fieldRefs) => FieldRef(fieldRefs.describe)
99-
case ApplyTransform(name, args) => FuncExpr(name, args.map(arg => SQLExpr(arg.describe())).toList)
158+
case ApplyTransform(name, args) if functionRegistry.sparkToClickHouseFunc.contains(name) =>
159+
FuncExpr(functionRegistry.sparkToClickHouseFunc(name), args.map(arg => SQLExpr(arg.describe())).toList)
100160
case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket")
101161
case other: Transform => throw CHClientException(s"Unsupported transform: $other")
102162
}
103163

104164
def inferTransformSchema(
105165
primarySchema: StructType,
106166
secondarySchema: StructType,
107-
transform: Transform
167+
transform: Transform,
168+
functionRegistry: FunctionRegistry
108169
): StructField = transform match {
109-
case years: YearsTransform => StructField(years.toString, IntegerType)
110-
case months: MonthsTransform => StructField(months.toString, IntegerType)
111-
case days: DaysTransform => StructField(days.toString, IntegerType)
112-
case hours: HoursTransform => StructField(hours.toString, IntegerType)
113170
case IdentityTransform(FieldReference(Seq(col))) => primarySchema.find(_.name == col)
114171
.orElse(secondarySchema.find(_.name == col))
115172
.getOrElse(throw CHClientException(s"Invalid partition column: $col"))
116-
case ckXxhHash64 @ ApplyTransform("ck_xx_hash64", _) => StructField(ckXxhHash64.toString, LongType)
173+
case t @ ApplyTransform(transformName, _) if functionRegistry.load(transformName).isDefined =>
174+
val resType =
175+
functionRegistry.load(transformName).getOrElse(throw new NoSuchFunctionException(transformName)) match {
176+
case f: ScalarFunction[_] => f.resultType()
177+
case other => throw CHClientException(s"Unsupported function: $other")
178+
}
179+
StructField(t.toString, resType)
117180
case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket")
118181
case other: Transform => throw CHClientException(s"Unsupported transform: $other")
119182
}

spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import xenon.clickhouse.Constants._
2626
import xenon.clickhouse.client.NodeClient
2727
import xenon.clickhouse.exception.CHClientException
2828
import xenon.clickhouse.exception.ClickHouseErrCode._
29-
import xenon.clickhouse.func.{FunctionRegistry, _}
29+
import xenon.clickhouse.func.{ClickHouseXxHash64Shard, FunctionRegistry, _}
3030
import xenon.clickhouse.spec._
3131

3232
import java.time.ZoneId
@@ -91,6 +91,7 @@ class ClickHouseCatalog extends TableCatalog
9191

9292
log.info(s"Detect ${clusterSpecs.size} ClickHouse clusters: ${clusterSpecs.map(_.name).mkString(",")}")
9393
log.info(s"ClickHouse clusters' detail: $clusterSpecs")
94+
log.info(s"Registered functions: ${this.functionRegistry.list.mkString(",")}")
9495
}
9596

9697
override def name(): String = catalogName
@@ -141,7 +142,8 @@ class ClickHouseCatalog extends TableCatalog
141142
tableClusterSpec,
142143
_tz,
143144
tableSpec,
144-
tableEngineSpec
145+
tableEngineSpec,
146+
functionRegistry
145147
)
146148
}
147149

@@ -206,7 +208,7 @@ class ClickHouseCatalog extends TableCatalog
206208

207209
val partitionsClause = partitions match {
208210
case transforms if transforms.nonEmpty =>
209-
transforms.map(ExprUtils.toClickHouse(_).sql).mkString("PARTITION BY (", ", ", ")")
211+
transforms.map(ExprUtils.toClickHouse(_, functionRegistry).sql).mkString("PARTITION BY (", ", ", ")")
210212
case _ => ""
211213
}
212214

@@ -297,7 +299,7 @@ class ClickHouseCatalog extends TableCatalog
297299
}
298300
tableOpt match {
299301
case None => false
300-
case Some(ClickHouseTable(_, cluster, _, tableSpec, _)) =>
302+
case Some(ClickHouseTable(_, cluster, _, tableSpec, _, _)) =>
301303
val (db, tbl) = (tableSpec.database, tableSpec.name)
302304
val isAtomic = loadNamespaceMetadata(Array(db)).get("engine").equalsIgnoreCase("atomic")
303305
val syncClause = if (isAtomic) "SYNC" else ""

spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseTable.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,12 @@
1414

1515
package xenon.clickhouse
1616

17-
import java.lang.{Integer => JInt, Long => JLong}
18-
import java.time.{LocalDate, ZoneId}
19-
import java.util
20-
import scala.collection.JavaConverters._
21-
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
2217
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
23-
import org.apache.spark.sql.clickhouse.{ExprUtils, ReadOptions, WriteOptions}
18+
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
2419
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.{READ_DISTRIBUTED_CONVERT_LOCAL, USE_NULLABLE_QUERY_SCHEMA}
25-
import org.apache.spark.sql.connector.catalog._
20+
import org.apache.spark.sql.clickhouse.{ExprUtils, ReadOptions, WriteOptions}
2621
import org.apache.spark.sql.connector.catalog.TableCapability._
22+
import org.apache.spark.sql.connector.catalog._
2723
import org.apache.spark.sql.connector.expressions.Transform
2824
import org.apache.spark.sql.connector.read.ScanBuilder
2925
import org.apache.spark.sql.connector.write.LogicalWriteInfo
@@ -34,16 +30,23 @@ import org.apache.spark.unsafe.types.UTF8String
3430
import xenon.clickhouse.Utils._
3531
import xenon.clickhouse.client.NodeClient
3632
import xenon.clickhouse.expr.{Expr, OrderExpr}
33+
import xenon.clickhouse.func.FunctionRegistry
3734
import xenon.clickhouse.read.{ClickHouseMetadataColumn, ClickHouseScanBuilder, ScanJobDescription}
3835
import xenon.clickhouse.spec._
3936
import xenon.clickhouse.write.{ClickHouseWriteBuilder, WriteJobDescription}
4037

38+
import java.lang.{Integer => JInt, Long => JLong}
39+
import java.time.{LocalDate, ZoneId}
40+
import java.util
41+
import scala.collection.JavaConverters._
42+
4143
case class ClickHouseTable(
4244
node: NodeSpec,
4345
cluster: Option[ClusterSpec],
4446
implicit val tz: ZoneId,
4547
spec: TableSpec,
46-
engineSpec: TableEngineSpec
48+
engineSpec: TableEngineSpec,
49+
functionRegistry: FunctionRegistry
4750
) extends Table
4851
with SupportsRead
4952
with SupportsWrite
@@ -130,10 +133,12 @@ case class ClickHouseTable(
130133
private lazy val metadataSchema: StructType =
131134
StructType(metadataColumns.map(_.asInstanceOf[ClickHouseMetadataColumn].toStructField))
132135

133-
override lazy val partitioning: Array[Transform] = ExprUtils.toSparkPartitions(partitionKey)
136+
override lazy val partitioning: Array[Transform] = ExprUtils.toSparkPartitions(partitionKey, functionRegistry)
134137

135138
override lazy val partitionSchema: StructType = StructType(
136-
partitioning.map(partTransform => ExprUtils.inferTransformSchema(schema, metadataSchema, partTransform))
139+
partitioning.map { partTransform =>
140+
ExprUtils.inferTransformSchema(schema, metadataSchema, partTransform, functionRegistry)
141+
}
137142
)
138143

139144
override lazy val properties: util.Map[String, String] = spec.toJavaMap
@@ -170,7 +175,8 @@ case class ClickHouseTable(
170175
shardingKey = shardingKey,
171176
partitionKey = partitionKey,
172177
sortingKey = sortingKey,
173-
writeOptions = new WriteOptions(info.options.asCaseSensitiveMap())
178+
writeOptions = new WriteOptions(info.options.asCaseSensitiveMap()),
179+
functionRegistry = functionRegistry
174180
)
175181

176182
new ClickHouseWriteBuilder(writeJob)

0 commit comments

Comments
 (0)