Skip to content

Commit 93a2b9d

Browse files
Yxangpan3793
authored andcommitted
Spark 3.4: Support distribute by any predefined transform
1 parent e165698 commit 93a2b9d

File tree

8 files changed

+264
-39
lines changed

8 files changed

+264
-39
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package org.apache.spark.sql.clickhouse.cluster
16+
17+
import org.apache.spark.SparkConf
18+
import org.apache.spark.sql.Row
19+
20+
class ClusterShardByTransformSuite extends SparkClickHouseClusterTest {
21+
override protected def sparkConf: SparkConf = {
22+
val _conf = super.sparkConf
23+
.set("spark.clickhouse.write.distributed.convertLocal", "true")
24+
_conf
25+
}
26+
27+
def runTest(func_name: String, func_args: Array[String]): Unit = {
28+
val func_expr = s"$func_name(${func_args.mkString(",")})"
29+
val cluster = "single_replica"
30+
val db = s"db_${func_name}_shard"
31+
val tbl_dist = s"tbl_${func_name}_shard"
32+
val tbl_local = s"${tbl_dist}_local"
33+
34+
try {
35+
runClickHouseSQL(s"CREATE DATABASE IF NOT EXISTS $db ON CLUSTER $cluster")
36+
37+
spark.sql(
38+
s"""CREATE TABLE $db.$tbl_local (
39+
| create_time TIMESTAMP NOT NULL,
40+
| value STRING NOT NULL
41+
|) USING ClickHouse
42+
|TBLPROPERTIES (
43+
| cluster = '$cluster',
44+
| engine = 'MergeTree()',
45+
| order_by = 'create_time'
46+
|)
47+
|""".stripMargin
48+
)
49+
50+
runClickHouseSQL(
51+
s"""CREATE TABLE $db.$tbl_dist ON CLUSTER $cluster
52+
|AS $db.$tbl_local
53+
|ENGINE = Distributed($cluster, '$db', '$tbl_local', $func_expr)
54+
|""".stripMargin
55+
)
56+
spark.sql(
57+
s"""INSERT INTO `$db`.`$tbl_dist`
58+
|VALUES
59+
| (timestamp'2021-01-01 10:10:10', '1'),
60+
| (timestamp'2022-02-02 10:10:10', '2'),
61+
| (timestamp'2023-03-03 10:10:10', '3'),
62+
| (timestamp'2024-04-04 10:10:10', '4') AS tab(create_time, value)
63+
|""".stripMargin
64+
)
65+
// check that data is indeed written
66+
checkAnswer(
67+
spark.table(s"$db.$tbl_dist").select("value").orderBy("create_time"),
68+
Seq(Row("1"), Row("2"), Row("3"), Row("4"))
69+
)
70+
71+
// check same data is sharded in the same server comparing native sharding
72+
runClickHouseSQL(
73+
s"""INSERT INTO `$db`.`$tbl_dist`
74+
|VALUES
75+
| (timestamp'2021-01-01 10:10:10', '1'),
76+
| (timestamp'2022-02-02 10:10:10', '2'),
77+
| (timestamp'2023-03-03 10:10:10', '3'),
78+
| (timestamp'2024-04-04 10:10:10', '4')
79+
|""".stripMargin
80+
)
81+
checkAnswer(
82+
spark.table(s"$db.$tbl_local")
83+
.groupBy("value").count().filter("count != 2"),
84+
Seq.empty
85+
)
86+
87+
} finally {
88+
runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl_dist ON CLUSTER $cluster")
89+
runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl_local ON CLUSTER $cluster")
90+
runClickHouseSQL(s"DROP DATABASE IF EXISTS $db ON CLUSTER $cluster")
91+
}
92+
}
93+
94+
Seq(("xxHash64", Array("value")), ("toYYYYMM", Array("create_time"))).foreach { case (func_name, func_args) =>
95+
test(s"shard by $func_name")(runTest(func_name, func_args))
96+
}
97+
98+
}

spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/WriteDistributionAndOrderingSuite.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,8 @@ class WriteDistributionAndOrderingSuite extends SparkClickHouseSingleTest {
7878
WRITE_REPARTITION_BY_PARTITION.key -> repartitionByPartition.toString,
7979
WRITE_LOCAL_SORT_BY_KEY.key -> localSortByKey.toString
8080
) {
81-
if (!ignoreUnsupportedTransform && repartitionByPartition) {
82-
intercept[AnalysisException](write())
83-
} else {
84-
write()
85-
check()
86-
}
81+
write()
82+
check()
8783
}
8884

8985
Seq(true, false).foreach { ignoreUnsupportedTransform =>

spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@ package org.apache.spark.sql.clickhouse
1616

1717
import org.apache.spark.sql.AnalysisException
1818
import org.apache.spark.sql.catalyst.SQLConfHelper
19-
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression}
19+
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
20+
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, TransformExpression}
2021
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.IGNORE_UNSUPPORTED_TRANSFORM
22+
import org.apache.spark.sql.connector.catalog.Identifier
23+
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
2124
import org.apache.spark.sql.connector.expressions.Expressions._
2225
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, _}
2326
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
2427
import xenon.clickhouse.exception.CHClientException
2528
import xenon.clickhouse.expr._
29+
import xenon.clickhouse.func.FunctionRegistry
2630

27-
import scala.annotation.tailrec
2831
import scala.util.{Failure, Success, Try}
2932

30-
object ExprUtils extends SQLConfHelper {
33+
class ExprUtils(functionRegistry: FunctionRegistry) extends SQLConfHelper with Serializable {
3134

3235
def toSparkPartitions(partitionKey: Option[List[Expr]]): Array[Transform] =
3336
partitionKey.seq.flatten.flatten(toSparkTransformOpt).toArray
@@ -47,7 +50,28 @@ object ExprUtils extends SQLConfHelper {
4750
toSparkTransformOpt(expr).map(trans => Expressions.sort(trans, direction, nullOrder))
4851
}.toArray
4952

50-
@tailrec
53+
private def loadV2FunctionOpt(
54+
name: String,
55+
args: Seq[Expression]
56+
): Option[BoundFunction] = {
57+
def loadFunction(ident: Identifier): UnboundFunction =
58+
functionRegistry.load(ident.name).getOrElse(throw new NoSuchFunctionException(ident))
59+
val inputType = StructType(args.zipWithIndex.map {
60+
case (exp, pos) => StructField(s"_$pos", exp.dataType, exp.nullable)
61+
})
62+
try {
63+
val unbound = loadFunction(Identifier.of(Array.empty, name))
64+
Some(unbound.bind(inputType))
65+
} catch {
66+
case e: NoSuchFunctionException =>
67+
throw e
68+
case _: UnsupportedOperationException if conf.getConf(IGNORE_UNSUPPORTED_TRANSFORM) =>
69+
None
70+
case e: UnsupportedOperationException =>
71+
throw new AnalysisException(e.getMessage, cause = Some(e))
72+
}
73+
}
74+
5175
def toCatalyst(v2Expr: V2Expression, fields: Array[StructField]): Expression =
5276
v2Expr match {
5377
case IdentityTransform(ref) => toCatalyst(ref, fields)
@@ -57,8 +81,15 @@ object ExprUtils extends SQLConfHelper {
5781
.find { case (field, _) => field.name == ref.fieldNames.head }
5882
.getOrElse(throw CHClientException(s"Invalid field reference: $ref"))
5983
BoundReference(ordinal, field.dataType, field.nullable)
84+
case t: Transform =>
85+
val catalystArgs = t.arguments().map(toCatalyst(_, fields))
86+
loadV2FunctionOpt(t.name(), catalystArgs).map { bound =>
87+
TransformExpression(bound, catalystArgs)
88+
}.getOrElse {
89+
throw CHClientException(s"Unsupported expression: $v2Expr")
90+
}
6091
case _ => throw CHClientException(
61-
s"Unsupported V2 expression: $v2Expr, SPARK-33779: Spark 3.3 only support IdentityTransform"
92+
s"Unsupported expression: $v2Expr"
6293
)
6394
}
6495

@@ -83,10 +114,10 @@ object ExprUtils extends SQLConfHelper {
83114
case FuncExpr("toYYYYMMDD", List(FieldRef(col))) => days(col)
84115
case FuncExpr("toHour", List(FieldRef(col))) => hours(col)
85116
case FuncExpr("HOUR", List(FieldRef(col))) => hours(col)
86-
// TODO support arbitrary functions
87-
// case FuncExpr("xxHash64", List(FieldRef(col))) => apply("ck_xx_hash64", column(col))
88117
case FuncExpr("rand", Nil) => apply("rand")
89118
case FuncExpr("toYYYYMMDD", List(FuncExpr("toDate", List(FieldRef(col))))) => identity(col)
119+
case FuncExpr(funName, List(FieldRef(col))) if functionRegistry.clickHouseToSparkFunc.contains(funName) =>
120+
apply(functionRegistry.clickHouseToSparkFunc(funName), column(col))
90121
case unsupported => throw CHClientException(s"Unsupported ClickHouse expression: $unsupported")
91122
}
92123

@@ -96,7 +127,8 @@ object ExprUtils extends SQLConfHelper {
96127
case DaysTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMMDD", List(FieldRef(col)))
97128
case HoursTransform(FieldReference(Seq(col))) => FuncExpr("toHour", List(FieldRef(col)))
98129
case IdentityTransform(fieldRefs) => FieldRef(fieldRefs.describe)
99-
case ApplyTransform(name, args) => FuncExpr(name, args.map(arg => SQLExpr(arg.describe())).toList)
130+
case ApplyTransform(name, args) if functionRegistry.sparkToClickHouseFunc.contains(name) =>
131+
FuncExpr(functionRegistry.sparkToClickHouseFunc(name), args.map(arg => SQLExpr(arg.describe())).toList)
100132
case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket")
101133
case other: Transform => throw CHClientException(s"Unsupported transform: $other")
102134
}
@@ -113,8 +145,18 @@ object ExprUtils extends SQLConfHelper {
113145
case IdentityTransform(FieldReference(Seq(col))) => primarySchema.find(_.name == col)
114146
.orElse(secondarySchema.find(_.name == col))
115147
.getOrElse(throw CHClientException(s"Invalid partition column: $col"))
116-
case ckXxhHash64 @ ApplyTransform("ck_xx_hash64", _) => StructField(ckXxhHash64.toString, LongType)
148+
case t @ ApplyTransform(transformName, _) =>
149+
val resType =
150+
functionRegistry.load(transformName).getOrElse(throw new NoSuchFunctionException(transformName)) match {
151+
case f: ScalarFunction[_] => f.resultType()
152+
case other => throw CHClientException(s"Unsupported function: $other")
153+
}
154+
StructField(t.toString, resType)
117155
case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket")
118156
case other: Transform => throw CHClientException(s"Unsupported transform: $other")
119157
}
120158
}
159+
160+
object ExprUtils {
161+
def apply(functionRegistry: FunctionRegistry): ExprUtils = new ExprUtils(functionRegistry)
162+
}

spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,15 @@ class ClickHouseCatalog extends TableCatalog
8585

8686
val dynamicFunctionRegistry = new DynamicFunctionRegistry
8787
val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs)
88+
val monthsFunc = new Months()
8889
dynamicFunctionRegistry.register("ck_xx_hash64_shard", xxHash64ShardFunc) // for compatible
8990
dynamicFunctionRegistry.register("clickhouse_shard_xxHash64", xxHash64ShardFunc)
91+
dynamicFunctionRegistry.register("months", monthsFunc)
9092
this.functionRegistry = new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry))
9193

9294
log.info(s"Detect ${clusterSpecs.size} ClickHouse clusters: ${clusterSpecs.map(_.name).mkString(",")}")
9395
log.info(s"ClickHouse clusters' detail: $clusterSpecs")
96+
log.info(s"functionRegistry: ${this.functionRegistry.list.mkString(",")}")
9497
}
9598

9699
override def name(): String = catalogName
@@ -141,7 +144,8 @@ class ClickHouseCatalog extends TableCatalog
141144
tableClusterSpec,
142145
_tz,
143146
tableSpec,
144-
tableEngineSpec
147+
tableEngineSpec,
148+
functionRegistry
145149
)
146150
}
147151

@@ -206,7 +210,7 @@ class ClickHouseCatalog extends TableCatalog
206210

207211
val partitionsClause = partitions match {
208212
case transforms if transforms.nonEmpty =>
209-
transforms.map(ExprUtils.toClickHouse(_).sql).mkString("PARTITION BY (", ", ", ")")
213+
transforms.map(ExprUtils(functionRegistry).toClickHouse(_).sql).mkString("PARTITION BY (", ", ", ")")
210214
case _ => ""
211215
}
212216

@@ -297,7 +301,7 @@ class ClickHouseCatalog extends TableCatalog
297301
}
298302
tableOpt match {
299303
case None => false
300-
case Some(ClickHouseTable(_, cluster, _, tableSpec, _)) =>
304+
case Some(ClickHouseTable(_, cluster, _, tableSpec, _, _)) =>
301305
val (db, tbl) = (tableSpec.database, tableSpec.name)
302306
val isAtomic = loadNamespaceMetadata(Array(db)).get("engine").equalsIgnoreCase("atomic")
303307
val syncClause = if (isAtomic) "SYNC" else ""

spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseTable.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,12 @@
1414

1515
package xenon.clickhouse
1616

17-
import java.lang.{Integer => JInt, Long => JLong}
18-
import java.time.{LocalDate, ZoneId}
19-
import java.util
20-
import scala.collection.JavaConverters._
21-
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
2217
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
23-
import org.apache.spark.sql.clickhouse.{ExprUtils, ReadOptions, WriteOptions}
18+
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
2419
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.{READ_DISTRIBUTED_CONVERT_LOCAL, USE_NULLABLE_QUERY_SCHEMA}
25-
import org.apache.spark.sql.connector.catalog._
20+
import org.apache.spark.sql.clickhouse.{ExprUtils, ReadOptions, WriteOptions}
2621
import org.apache.spark.sql.connector.catalog.TableCapability._
22+
import org.apache.spark.sql.connector.catalog._
2723
import org.apache.spark.sql.connector.expressions.Transform
2824
import org.apache.spark.sql.connector.read.ScanBuilder
2925
import org.apache.spark.sql.connector.write.LogicalWriteInfo
@@ -34,16 +30,23 @@ import org.apache.spark.unsafe.types.UTF8String
3430
import xenon.clickhouse.Utils._
3531
import xenon.clickhouse.client.NodeClient
3632
import xenon.clickhouse.expr.{Expr, OrderExpr}
33+
import xenon.clickhouse.func.FunctionRegistry
3734
import xenon.clickhouse.read.{ClickHouseMetadataColumn, ClickHouseScanBuilder, ScanJobDescription}
3835
import xenon.clickhouse.spec._
3936
import xenon.clickhouse.write.{ClickHouseWriteBuilder, WriteJobDescription}
4037

38+
import java.lang.{Integer => JInt, Long => JLong}
39+
import java.time.{LocalDate, ZoneId}
40+
import java.util
41+
import scala.collection.JavaConverters._
42+
4143
case class ClickHouseTable(
4244
node: NodeSpec,
4345
cluster: Option[ClusterSpec],
4446
implicit val tz: ZoneId,
4547
spec: TableSpec,
46-
engineSpec: TableEngineSpec
48+
engineSpec: TableEngineSpec,
49+
functionRegistry: FunctionRegistry
4750
) extends Table
4851
with SupportsRead
4952
with SupportsWrite
@@ -130,10 +133,12 @@ case class ClickHouseTable(
130133
private lazy val metadataSchema: StructType =
131134
StructType(metadataColumns.map(_.asInstanceOf[ClickHouseMetadataColumn].toStructField))
132135

133-
override lazy val partitioning: Array[Transform] = ExprUtils.toSparkPartitions(partitionKey)
136+
override lazy val partitioning: Array[Transform] = ExprUtils(functionRegistry).toSparkPartitions(partitionKey)
134137

135138
override lazy val partitionSchema: StructType = StructType(
136-
partitioning.map(partTransform => ExprUtils.inferTransformSchema(schema, metadataSchema, partTransform))
139+
partitioning.map(partTransform =>
140+
ExprUtils(functionRegistry).inferTransformSchema(schema, metadataSchema, partTransform)
141+
)
137142
)
138143

139144
override lazy val properties: util.Map[String, String] = spec.toJavaMap
@@ -170,7 +175,8 @@ case class ClickHouseTable(
170175
shardingKey = shardingKey,
171176
partitionKey = partitionKey,
172177
sortingKey = sortingKey,
173-
writeOptions = new WriteOptions(info.options.asCaseSensitiveMap())
178+
writeOptions = new WriteOptions(info.options.asCaseSensitiveMap()),
179+
functionRegistry = functionRegistry
174180
)
175181

176182
new ClickHouseWriteBuilder(writeJob)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package xenon.clickhouse.func
16+
17+
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
18+
import org.apache.spark.sql.types._
19+
20+
import java.sql.Timestamp
21+
22+
class Months extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction {
23+
24+
override def name: String = "months"
25+
26+
override def canonicalName: String = s"months"
27+
28+
override val ckFuncNames: Array[String] = Array("toYYYYMM")
29+
30+
override def description: String = s"$name: (time: timestamp) => shard_num: int"
31+
32+
override def bind(inputType: StructType): BoundFunction = inputType.fields match {
33+
case Array(StructField(_, TimestampType, _, _)) => this
34+
case _ => throw new UnsupportedOperationException(s"Expect 1 TIMESTAMP argument. $description")
35+
}
36+
37+
override def inputTypes: Array[DataType] = Array(TimestampType)
38+
39+
override def resultType: DataType = IntegerType
40+
41+
override def isResultNullable: Boolean = false
42+
43+
def invoke(time: Long): Int = {
44+
val ts = new Timestamp(time / 1000).toLocalDateTime
45+
ts.getYear * 100 + ts.getMonthValue
46+
}
47+
}

0 commit comments

Comments
 (0)