Skip to content

Commit 2a5f538

Browse files
committed
Revert "Spark 3.4: Support distribute by any predefined transform"
This reverts commit 93a2b9d.
1 parent 93a2b9d commit 2a5f538

File tree

8 files changed

+39
-264
lines changed

8 files changed

+39
-264
lines changed

spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala

Lines changed: 0 additions & 98 deletions
This file was deleted.

spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/WriteDistributionAndOrderingSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,12 @@ class WriteDistributionAndOrderingSuite extends SparkClickHouseSingleTest {
7878
WRITE_REPARTITION_BY_PARTITION.key -> repartitionByPartition.toString,
7979
WRITE_LOCAL_SORT_BY_KEY.key -> localSortByKey.toString
8080
) {
81-
write()
82-
check()
81+
if (!ignoreUnsupportedTransform && repartitionByPartition) {
82+
intercept[AnalysisException](write())
83+
} else {
84+
write()
85+
check()
86+
}
8387
}
8488

8589
Seq(true, false).foreach { ignoreUnsupportedTransform =>

spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala

Lines changed: 9 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,18 @@ package org.apache.spark.sql.clickhouse
1616

1717
import org.apache.spark.sql.AnalysisException
1818
import org.apache.spark.sql.catalyst.SQLConfHelper
19-
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
20-
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, TransformExpression}
19+
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression}
2120
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.IGNORE_UNSUPPORTED_TRANSFORM
22-
import org.apache.spark.sql.connector.catalog.Identifier
23-
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
2421
import org.apache.spark.sql.connector.expressions.Expressions._
2522
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, _}
2623
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
2724
import xenon.clickhouse.exception.CHClientException
2825
import xenon.clickhouse.expr._
29-
import xenon.clickhouse.func.FunctionRegistry
3026

27+
import scala.annotation.tailrec
3128
import scala.util.{Failure, Success, Try}
3229

33-
class ExprUtils(functionRegistry: FunctionRegistry) extends SQLConfHelper with Serializable {
30+
object ExprUtils extends SQLConfHelper {
3431

3532
def toSparkPartitions(partitionKey: Option[List[Expr]]): Array[Transform] =
3633
partitionKey.seq.flatten.flatten(toSparkTransformOpt).toArray
@@ -50,28 +47,7 @@ class ExprUtils(functionRegistry: FunctionRegistry) extends SQLConfHelper with S
5047
toSparkTransformOpt(expr).map(trans => Expressions.sort(trans, direction, nullOrder))
5148
}.toArray
5249

53-
private def loadV2FunctionOpt(
54-
name: String,
55-
args: Seq[Expression]
56-
): Option[BoundFunction] = {
57-
def loadFunction(ident: Identifier): UnboundFunction =
58-
functionRegistry.load(ident.name).getOrElse(throw new NoSuchFunctionException(ident))
59-
val inputType = StructType(args.zipWithIndex.map {
60-
case (exp, pos) => StructField(s"_$pos", exp.dataType, exp.nullable)
61-
})
62-
try {
63-
val unbound = loadFunction(Identifier.of(Array.empty, name))
64-
Some(unbound.bind(inputType))
65-
} catch {
66-
case e: NoSuchFunctionException =>
67-
throw e
68-
case _: UnsupportedOperationException if conf.getConf(IGNORE_UNSUPPORTED_TRANSFORM) =>
69-
None
70-
case e: UnsupportedOperationException =>
71-
throw new AnalysisException(e.getMessage, cause = Some(e))
72-
}
73-
}
74-
50+
@tailrec
7551
def toCatalyst(v2Expr: V2Expression, fields: Array[StructField]): Expression =
7652
v2Expr match {
7753
case IdentityTransform(ref) => toCatalyst(ref, fields)
@@ -81,15 +57,8 @@ class ExprUtils(functionRegistry: FunctionRegistry) extends SQLConfHelper with S
8157
.find { case (field, _) => field.name == ref.fieldNames.head }
8258
.getOrElse(throw CHClientException(s"Invalid field reference: $ref"))
8359
BoundReference(ordinal, field.dataType, field.nullable)
84-
case t: Transform =>
85-
val catalystArgs = t.arguments().map(toCatalyst(_, fields))
86-
loadV2FunctionOpt(t.name(), catalystArgs).map { bound =>
87-
TransformExpression(bound, catalystArgs)
88-
}.getOrElse {
89-
throw CHClientException(s"Unsupported expression: $v2Expr")
90-
}
9160
case _ => throw CHClientException(
92-
s"Unsupported expression: $v2Expr"
61+
s"Unsupported V2 expression: $v2Expr, SPARK-33779: Spark 3.3 only support IdentityTransform"
9362
)
9463
}
9564

@@ -114,10 +83,10 @@ class ExprUtils(functionRegistry: FunctionRegistry) extends SQLConfHelper with S
11483
case FuncExpr("toYYYYMMDD", List(FieldRef(col))) => days(col)
11584
case FuncExpr("toHour", List(FieldRef(col))) => hours(col)
11685
case FuncExpr("HOUR", List(FieldRef(col))) => hours(col)
86+
// TODO support arbitrary functions
87+
// case FuncExpr("xxHash64", List(FieldRef(col))) => apply("ck_xx_hash64", column(col))
11788
case FuncExpr("rand", Nil) => apply("rand")
11889
case FuncExpr("toYYYYMMDD", List(FuncExpr("toDate", List(FieldRef(col))))) => identity(col)
119-
case FuncExpr(funName, List(FieldRef(col))) if functionRegistry.clickHouseToSparkFunc.contains(funName) =>
120-
apply(functionRegistry.clickHouseToSparkFunc(funName), column(col))
12190
case unsupported => throw CHClientException(s"Unsupported ClickHouse expression: $unsupported")
12291
}
12392

@@ -127,8 +96,7 @@ class ExprUtils(functionRegistry: FunctionRegistry) extends SQLConfHelper with S
12796
case DaysTransform(FieldReference(Seq(col))) => FuncExpr("toYYYYMMDD", List(FieldRef(col)))
12897
case HoursTransform(FieldReference(Seq(col))) => FuncExpr("toHour", List(FieldRef(col)))
12998
case IdentityTransform(fieldRefs) => FieldRef(fieldRefs.describe)
130-
case ApplyTransform(name, args) if functionRegistry.sparkToClickHouseFunc.contains(name) =>
131-
FuncExpr(functionRegistry.sparkToClickHouseFunc(name), args.map(arg => SQLExpr(arg.describe())).toList)
99+
case ApplyTransform(name, args) => FuncExpr(name, args.map(arg => SQLExpr(arg.describe())).toList)
132100
case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket")
133101
case other: Transform => throw CHClientException(s"Unsupported transform: $other")
134102
}
@@ -145,18 +113,8 @@ class ExprUtils(functionRegistry: FunctionRegistry) extends SQLConfHelper with S
145113
case IdentityTransform(FieldReference(Seq(col))) => primarySchema.find(_.name == col)
146114
.orElse(secondarySchema.find(_.name == col))
147115
.getOrElse(throw CHClientException(s"Invalid partition column: $col"))
148-
case t @ ApplyTransform(transformName, _) =>
149-
val resType =
150-
functionRegistry.load(transformName).getOrElse(throw new NoSuchFunctionException(transformName)) match {
151-
case f: ScalarFunction[_] => f.resultType()
152-
case other => throw CHClientException(s"Unsupported function: $other")
153-
}
154-
StructField(t.toString, resType)
116+
case ckXxhHash64 @ ApplyTransform("ck_xx_hash64", _) => StructField(ckXxhHash64.toString, LongType)
155117
case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket")
156118
case other: Transform => throw CHClientException(s"Unsupported transform: $other")
157119
}
158120
}
159-
160-
object ExprUtils {
161-
def apply(functionRegistry: FunctionRegistry): ExprUtils = new ExprUtils(functionRegistry)
162-
}

spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,12 @@ class ClickHouseCatalog extends TableCatalog
8585

8686
val dynamicFunctionRegistry = new DynamicFunctionRegistry
8787
val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs)
88-
val monthsFunc = new Months()
8988
dynamicFunctionRegistry.register("ck_xx_hash64_shard", xxHash64ShardFunc) // for compatible
9089
dynamicFunctionRegistry.register("clickhouse_shard_xxHash64", xxHash64ShardFunc)
91-
dynamicFunctionRegistry.register("months", monthsFunc)
9290
this.functionRegistry = new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry))
9391

9492
log.info(s"Detect ${clusterSpecs.size} ClickHouse clusters: ${clusterSpecs.map(_.name).mkString(",")}")
9593
log.info(s"ClickHouse clusters' detail: $clusterSpecs")
96-
log.info(s"functionRegistry: ${this.functionRegistry.list.mkString(",")}")
9794
}
9895

9996
override def name(): String = catalogName
@@ -144,8 +141,7 @@ class ClickHouseCatalog extends TableCatalog
144141
tableClusterSpec,
145142
_tz,
146143
tableSpec,
147-
tableEngineSpec,
148-
functionRegistry
144+
tableEngineSpec
149145
)
150146
}
151147

@@ -210,7 +206,7 @@ class ClickHouseCatalog extends TableCatalog
210206

211207
val partitionsClause = partitions match {
212208
case transforms if transforms.nonEmpty =>
213-
transforms.map(ExprUtils(functionRegistry).toClickHouse(_).sql).mkString("PARTITION BY (", ", ", ")")
209+
transforms.map(ExprUtils.toClickHouse(_).sql).mkString("PARTITION BY (", ", ", ")")
214210
case _ => ""
215211
}
216212

@@ -301,7 +297,7 @@ class ClickHouseCatalog extends TableCatalog
301297
}
302298
tableOpt match {
303299
case None => false
304-
case Some(ClickHouseTable(_, cluster, _, tableSpec, _, _)) =>
300+
case Some(ClickHouseTable(_, cluster, _, tableSpec, _)) =>
305301
val (db, tbl) = (tableSpec.database, tableSpec.name)
306302
val isAtomic = loadNamespaceMetadata(Array(db)).get("engine").equalsIgnoreCase("atomic")
307303
val syncClause = if (isAtomic) "SYNC" else ""

spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseTable.scala

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,16 @@
1414

1515
package xenon.clickhouse
1616

17-
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
17+
import java.lang.{Integer => JInt, Long => JLong}
18+
import java.time.{LocalDate, ZoneId}
19+
import java.util
20+
import scala.collection.JavaConverters._
1821
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
19-
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.{READ_DISTRIBUTED_CONVERT_LOCAL, USE_NULLABLE_QUERY_SCHEMA}
22+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
2023
import org.apache.spark.sql.clickhouse.{ExprUtils, ReadOptions, WriteOptions}
21-
import org.apache.spark.sql.connector.catalog.TableCapability._
24+
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.{READ_DISTRIBUTED_CONVERT_LOCAL, USE_NULLABLE_QUERY_SCHEMA}
2225
import org.apache.spark.sql.connector.catalog._
26+
import org.apache.spark.sql.connector.catalog.TableCapability._
2327
import org.apache.spark.sql.connector.expressions.Transform
2428
import org.apache.spark.sql.connector.read.ScanBuilder
2529
import org.apache.spark.sql.connector.write.LogicalWriteInfo
@@ -30,23 +34,16 @@ import org.apache.spark.unsafe.types.UTF8String
3034
import xenon.clickhouse.Utils._
3135
import xenon.clickhouse.client.NodeClient
3236
import xenon.clickhouse.expr.{Expr, OrderExpr}
33-
import xenon.clickhouse.func.FunctionRegistry
3437
import xenon.clickhouse.read.{ClickHouseMetadataColumn, ClickHouseScanBuilder, ScanJobDescription}
3538
import xenon.clickhouse.spec._
3639
import xenon.clickhouse.write.{ClickHouseWriteBuilder, WriteJobDescription}
3740

38-
import java.lang.{Integer => JInt, Long => JLong}
39-
import java.time.{LocalDate, ZoneId}
40-
import java.util
41-
import scala.collection.JavaConverters._
42-
4341
case class ClickHouseTable(
4442
node: NodeSpec,
4543
cluster: Option[ClusterSpec],
4644
implicit val tz: ZoneId,
4745
spec: TableSpec,
48-
engineSpec: TableEngineSpec,
49-
functionRegistry: FunctionRegistry
46+
engineSpec: TableEngineSpec
5047
) extends Table
5148
with SupportsRead
5249
with SupportsWrite
@@ -133,12 +130,10 @@ case class ClickHouseTable(
133130
private lazy val metadataSchema: StructType =
134131
StructType(metadataColumns.map(_.asInstanceOf[ClickHouseMetadataColumn].toStructField))
135132

136-
override lazy val partitioning: Array[Transform] = ExprUtils(functionRegistry).toSparkPartitions(partitionKey)
133+
override lazy val partitioning: Array[Transform] = ExprUtils.toSparkPartitions(partitionKey)
137134

138135
override lazy val partitionSchema: StructType = StructType(
139-
partitioning.map(partTransform =>
140-
ExprUtils(functionRegistry).inferTransformSchema(schema, metadataSchema, partTransform)
141-
)
136+
partitioning.map(partTransform => ExprUtils.inferTransformSchema(schema, metadataSchema, partTransform))
142137
)
143138

144139
override lazy val properties: util.Map[String, String] = spec.toJavaMap
@@ -175,8 +170,7 @@ case class ClickHouseTable(
175170
shardingKey = shardingKey,
176171
partitionKey = partitionKey,
177172
sortingKey = sortingKey,
178-
writeOptions = new WriteOptions(info.options.asCaseSensitiveMap()),
179-
functionRegistry = functionRegistry
173+
writeOptions = new WriteOptions(info.options.asCaseSensitiveMap())
180174
)
181175

182176
new ClickHouseWriteBuilder(writeJob)

spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala

Lines changed: 0 additions & 47 deletions
This file was deleted.

0 commit comments

Comments
 (0)