From 2f46a1ccd8f4e3a69f24fd4a39f4cd8769de1308 Mon Sep 17 00:00:00 2001 From: Xinyuan Yang Date: Wed, 17 May 2023 14:54:38 +0800 Subject: [PATCH 1/5] Spark 3.4: Enhance FunctionRegistry to support some hash functions --- .../ClickHouseClusterHashUDFSuite.scala | 103 ++++++++++++++++++ .../cluster/ClickHouseClusterUDFSuite.scala | 55 ---------- .../xenon/clickhouse/func/CityHash64.scala | 27 +++++ .../clickhouse/func/FunctionRegistry.scala | 41 ++++++- .../clickhouse/func/MultiStringArgsHash.scala | 62 +++++++++++ .../xenon/clickhouse/func/MurmurHash2.scala | 37 +++++++ .../xenon/clickhouse/func/MurmurHash3.scala | 37 +++++++ ...lickHouseXxHash64.scala => XxHash64.scala} | 7 +- .../clickhouse/FunctionRegistrySuite.scala | 55 ++++++++++ 9 files changed, 366 insertions(+), 58 deletions(-) create mode 100644 spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala delete mode 100644 spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterUDFSuite.scala create mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala create mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala create mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala create mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{ClickHouseXxHash64.scala => XxHash64.scala} (94%) create mode 100644 spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala new file mode 100644 index 00000000..c885a5d3 --- /dev/null +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.clickhouse.cluster + +import org.apache.spark.sql.clickhouse.TestUtils.om +import xenon.clickhouse.func._ +import java.lang.{Long => JLong} + +class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { + // only for query function names + val dummyRegistry: CompositeFunctionRegistry = { + val dynamicFunctionRegistry = new DynamicFunctionRegistry + val xxHash64ShardFunc = new ClickHouseXxHash64Shard(Seq.empty) + dynamicFunctionRegistry.register("ck_xx_hash64_shard", xxHash64ShardFunc) // for compatible + dynamicFunctionRegistry.register("clickhouse_shard_xxHash64", xxHash64ShardFunc) + new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry)) + } + + def runTest(funcSparkName: String, funcCkName: String, stringVal: String): Unit = { + val sparkResult = spark.sql( + s"""SELECT + | $funcSparkName($stringVal) AS hash_value + |""".stripMargin + ).collect + assert(sparkResult.length == 1) + val sparkHashVal = sparkResult.head.getAs[Long]("hash_value") + + val clickhouseResultJsonStr = runClickHouseSQL( + s"""SELECT + | $funcCkName($stringVal) AS hash_value + |""".stripMargin + ).head.getString(0) + val clickhouseResultJson = om.readTree(clickhouseResultJsonStr) + val clickhouseHashVal = JLong.parseUnsignedLong(clickhouseResultJson.get("hash_value").asText) + assert( + sparkHashVal == clickhouseHashVal, + s"ck_function: $funcCkName, spark_function: $funcSparkName, args: ($stringVal)" + ) + } + + Seq( + "clickhouse_xxHash64", + "clickhouse_murmurHash3_64", + "clickhouse_murmurHash3_32", + "clickhouse_murmurHash2_64", + "clickhouse_murmurHash2_32", + "clickhouse_cityHash64" + ).foreach { funcSparkName => + val funcCkName = dummyRegistry.sparkToClickHouseFunc(funcSparkName) + test(s"UDF $funcSparkName") { + Seq( + "spark-clickhouse-connector", + "Apache Spark", + "ClickHouse", + "Yandex", + "热爱", + "在传统的行式数据库系统中,数据按如下顺序存储:", + "🇨🇳" + ).foreach { rawStringVal => + val stringVal = s"\'$rawStringVal\'" + runTest(funcSparkName, funcCkName, stringVal) + } + } + } + + Seq( + "clickhouse_murmurHash3_64", + "clickhouse_murmurHash3_32", + "clickhouse_murmurHash2_64", + "clickhouse_murmurHash2_32", + "clickhouse_cityHash64" + ).foreach { funcSparkName => + val funcCkName = dummyRegistry.sparkToClickHouseFunc(funcSparkName) + test(s"UDF $funcSparkName multiple args") { + val strings = Seq( + "\'spark-clickhouse-connector\'", + "\'Apache Spark\'", + "\'ClickHouse\'", + "\'Yandex\'", + "\'热爱\'", + "\'在传统的行式数据库系统中,数据按如下顺序存储:\'", + "\'🇨🇳\'" + ) + val test_5 = strings.combinations(5) + test_5.foreach { seq => + val stringVal = seq.mkString(", ") + runTest(funcSparkName, funcCkName, stringVal) + } + } + } +} diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterUDFSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterUDFSuite.scala deleted file mode 100644 index 3d97cc25..00000000 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterUDFSuite.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.clickhouse.cluster - -import org.apache.spark.sql.clickhouse.TestUtils.om - -import java.lang.{Long => JLong} - -class ClickHouseClusterUDFSuite extends SparkClickHouseClusterTest { - - test("UDF ck_xx_hash64") { - Seq("spark-clickhouse-connector", "Apache Spark", "ClickHouse", "Yandex", "热爱", "🇨🇳").foreach { stringVal => - val sparkResult = spark.sql( - s"""SELECT - | ck_xx_hash64('$stringVal') AS hash_value_legacy, - | clickhouse_xxHash64('$stringVal') AS hash_value, - | ck_xx_hash64_shard('single_replica', '$stringVal') AS shard_num_legacy, -- one based ordinal defined in `remote_servers.xml` - | clickhouse_shard_xxHash64('single_replica', '$stringVal') AS shard_num -- one based ordinal defined in `remote_servers.xml` - |""".stripMargin - ).collect - assert(sparkResult.length == 1) - val sparkHashValLegacy = sparkResult.head.getAs[Long]("hash_value_legacy") - val sparkHashVal = sparkResult.head.getAs[Long]("hash_value") - assert(sparkHashValLegacy === sparkHashVal) - val sparkShardNumLegacy = sparkResult.head.getAs[Int]("shard_num_legacy") - val sparkShardNum = sparkResult.head.getAs[Int]("shard_num") - assert(sparkShardNumLegacy === sparkShardNum) - - val clickhouseResultJsonStr = runClickHouseSQL( - s"""SELECT - | xxHash64('$stringVal') AS hash_value, - | xxHash64('$stringVal') % 4 AS shard_num -- zero based ordinal - |""".stripMargin - ).head.getString(0) - val clickhouseResultJson = om.readTree(clickhouseResultJsonStr) - val clickhouseHashVal = JLong.parseUnsignedLong(clickhouseResultJson.get("hash_value").asText) - val clickhouseShardNum = JLong.parseUnsignedLong(clickhouseResultJson.get("shard_num").asText) - - assert(sparkHashVal == clickhouseHashVal) - assert(sparkShardNum == clickhouseShardNum + 1) - } - } -} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala new file mode 100644 index 00000000..d9efa705 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import xenon.clickhouse.hash + +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L694 +object CityHash64 extends MultiStringArgsHash { + + override protected def funcName: String = "clickhouse_cityHash64" + + override val ckFuncNames: Array[String] = Array("cityHash64") + + override def applyHash(input: Array[Any]): Long = hash.CityHash64(input) +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala index b41a7d1a..8faf6564 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala @@ -18,11 +18,19 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import scala.collection.mutable -trait FunctionRegistry { +trait FunctionRegistry extends Serializable { def list: Array[String] def load(name: String): Option[UnboundFunction] + + def sparkToClickHouseFunc: Map[String, String] + + def clickHouseToSparkFunc: Map[String, String] +} + +trait ClickhouseEquivFunction { + val ckFuncNames: Array[String] } class CompositeFunctionRegistry(registries: Array[FunctionRegistry]) extends FunctionRegistry { @@ -30,18 +38,37 @@ class CompositeFunctionRegistry(registries: Array[FunctionRegistry]) extends Fun override def list: Array[String] = registries.flatMap(_.list) override def load(name: String): Option[UnboundFunction] = registries.flatMap(_.load(name)).headOption + + override def sparkToClickHouseFunc: Map[String, String] = registries.flatMap(_.sparkToClickHouseFunc).toMap + + override def clickHouseToSparkFunc: Map[String, String] = registries.flatMap(_.clickHouseToSparkFunc).toMap } object StaticFunctionRegistry extends FunctionRegistry { private val functions = Map[String, UnboundFunction]( "ck_xx_hash64" -> ClickHouseXxHash64, // for compatible - "clickhouse_xxHash64" -> ClickHouseXxHash64 + "clickhouse_xxHash64" -> ClickHouseXxHash64, + "clickhouse_murmurHash2_32" -> MurmurHash2_32, + "clickhouse_murmurHash2_64" -> MurmurHash2_64, + "clickhouse_murmurHash3_32" -> MurmurHash3_32, + "clickhouse_murmurHash3_64" -> MurmurHash3_64, + "clickhouse_cityHash64" -> CityHash64 ) override def list: Array[String] = functions.keys.toArray override def load(name: String): Option[UnboundFunction] = functions.get(name) + + override val sparkToClickHouseFunc: Map[String, String] = + functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).flatMap { case (k, v) => + v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((k, _)) + } + + override val clickHouseToSparkFunc: Map[String, String] = + functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).flatMap { case (k, v) => + v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((_, k)) + } } class DynamicFunctionRegistry extends FunctionRegistry { @@ -56,4 +83,14 @@ class DynamicFunctionRegistry extends FunctionRegistry { override def list: Array[String] = functions.keys.toArray override def load(name: String): Option[UnboundFunction] = functions.get(name) + + override def sparkToClickHouseFunc: Map[String, String] = + functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).toMap.flatMap { case (k, v) => + v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((k, _)) + } + + override def clickHouseToSparkFunc: Map[String, String] = + functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).toMap.flatMap { case (k, v) => + v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((_, k)) + } } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala new file mode 100644 index 00000000..8987f919 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +abstract class MultiStringArgsHash extends UnboundFunction with ClickhouseEquivFunction { + + def applyHash(input: Array[Any]): Long + + protected def funcName: String + + override val ckFuncNames: Array[String] + + override def description: String = s"$name: (value: string, ...) => hash_value: long" + + private def isExceptedType(dt: DataType): Boolean = + dt.isInstanceOf[StringType] + + final override def name: String = funcName + + final override def bind(inputType: StructType): BoundFunction = { + val inputDataTypes = inputType.fields.map(_.dataType) + if (inputDataTypes.forall(isExceptedType)) { + // need to new a ScalarFunction instance for each bind, + // because we do not know the number of arguments in advance + new ScalarFunction[Long] { + override def inputTypes(): Array[DataType] = inputDataTypes + override def name: String = funcName + override def canonicalName: String = s"clickhouse.$name" + override def resultType: DataType = LongType + override def toString: String = name + override def produceResult(input: InternalRow): Long = { + val inputStrings = new Array[Any](input.numFields) + var i = 0 + do { + inputStrings(i) = input.getUTF8String(i).getBytes + i += 1 + } while (i < input.numFields) + applyHash(inputStrings) + } + } + } else throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description") + + } + +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala new file mode 100644 index 00000000..02b07a5f --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import xenon.clickhouse.hash +import xenon.clickhouse.hash.HashUtils + +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L460 +object MurmurHash2_64 extends MultiStringArgsHash { + + override protected def funcName: String = "clickhouse_murmurHash2_64" + + override val ckFuncNames: Array[String] = Array("murmurHash2_64") + + override def applyHash(input: Array[Any]): Long = hash.Murmurhash2_64(input) +} + +object MurmurHash2_32 extends MultiStringArgsHash { + // https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 + + override protected def funcName: String = "clickhouse_murmurHash2_32" + override val ckFuncNames: Array[String] = Array("murmurHash2_32") + + override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash2_32(input)) +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala new file mode 100644 index 00000000..1c952e62 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import xenon.clickhouse.hash +import xenon.clickhouse.hash.HashUtils + +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L543 +object MurmurHash3_64 extends MultiStringArgsHash { + + override protected def funcName: String = "clickhouse_murmurHash3_64" + + override val ckFuncNames: Array[String] = Array("murmurHash3_64") + + override def applyHash(input: Array[Any]): Long = hash.Murmurhash3_64(input) +} + +object MurmurHash3_32 extends MultiStringArgsHash { + // https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 + + override protected def funcName: String = "clickhouse_murmurHash3_32" + override val ckFuncNames: Array[String] = Array("murmurHash3_32") + + override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash3_32(input)) +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/ClickHouseXxHash64.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala similarity index 94% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/ClickHouseXxHash64.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala index e7f223b0..3c4a5b1a 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/ClickHouseXxHash64.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala @@ -26,12 +26,16 @@ import xenon.clickhouse.spec.{ClusterSpec, ShardUtils} * select xxHash64(concat(project_id, toString(seq)) * }}} */ -object ClickHouseXxHash64 extends UnboundFunction with ScalarFunction[Long] { +object ClickHouseXxHash64 extends UnboundFunction with ScalarFunction[Long] with ClickhouseEquivFunction { override def name: String = "clickhouse_xxHash64" override def canonicalName: String = s"clickhouse.$name" + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("xxHash64") + override def description: String = s"$name: (value: string) => hash_value: long" override def bind(inputType: StructType): BoundFunction = inputType.fields match { @@ -45,6 +49,7 @@ object ClickHouseXxHash64 extends UnboundFunction with ScalarFunction[Long] { override def isResultNullable: Boolean = false + // ignore UInt64 vs Int64 def invoke(value: UTF8String): Long = XxHash64Function.hash(value, StringType, 0L) } diff --git a/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala b/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala new file mode 100644 index 00000000..0f3422fd --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.clickhouse + +import org.scalatest.funsuite.AnyFunSuite +import xenon.clickhouse.func._ + +class FunctionRegistrySuite extends AnyFunSuite { + + val staticFunctionRegistry: StaticFunctionRegistry.type = StaticFunctionRegistry + val dynamicFunctionRegistry = new DynamicFunctionRegistry + dynamicFunctionRegistry.register("ck_xx_hash64", ClickHouseXxHash64) + dynamicFunctionRegistry.register("clickhouse_xxHash64", ClickHouseXxHash64) + + test("check StaticFunctionRegistry mappings") { + assert(staticFunctionRegistry.sparkToClickHouseFunc.forall { case (k, v) => + staticFunctionRegistry.load(k).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(v) + }) + assert(staticFunctionRegistry.clickHouseToSparkFunc.forall { case (k, v) => + staticFunctionRegistry.load(v).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(k) + }) + } + + test("check DynamicFunctionRegistry mappings") { + assert(dynamicFunctionRegistry.sparkToClickHouseFunc.forall { case (k, v) => + dynamicFunctionRegistry.load(k).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(v) + }) + assert(dynamicFunctionRegistry.clickHouseToSparkFunc.forall { case (k, v) => + dynamicFunctionRegistry.load(v).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(k) + }) + } + + test("check CompositeFunctionRegistry mappings") { + val compositeFunctionRegistry = + new CompositeFunctionRegistry(Array(staticFunctionRegistry, dynamicFunctionRegistry)) + assert(compositeFunctionRegistry.sparkToClickHouseFunc.forall { case (k, v) => + compositeFunctionRegistry.load(k).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(v) + }) + assert(compositeFunctionRegistry.clickHouseToSparkFunc.forall { case (k, v) => + compositeFunctionRegistry.load(v).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(k) + }) + } +} From 0ad84e46c58dde1e3f2d9b1c19816b15290bd24e Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sat, 26 Aug 2023 02:13:40 +0800 Subject: [PATCH 2/5] style --- .../ClickHouseClusterHashUDFSuite.scala | 23 ++++++++----------- .../clickhouse/func/MultiStringArgsHash.scala | 5 ++-- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala index c885a5d3..906738fa 100644 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala @@ -68,8 +68,7 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { "热爱", "在传统的行式数据库系统中,数据按如下顺序存储:", "🇨🇳" - ).foreach { rawStringVal => - val stringVal = s"\'$rawStringVal\'" + ).map("'" + _ + "'").foreach { stringVal => runTest(funcSparkName, funcCkName, stringVal) } } @@ -84,17 +83,15 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { ).foreach { funcSparkName => val funcCkName = dummyRegistry.sparkToClickHouseFunc(funcSparkName) test(s"UDF $funcSparkName multiple args") { - val strings = Seq( - "\'spark-clickhouse-connector\'", - "\'Apache Spark\'", - "\'ClickHouse\'", - "\'Yandex\'", - "\'热爱\'", - "\'在传统的行式数据库系统中,数据按如下顺序存储:\'", - "\'🇨🇳\'" - ) - val test_5 = strings.combinations(5) - test_5.foreach { seq => + Seq( + "spark-clickhouse-connector", + "Apache Spark", + "ClickHouse", + "Yandex", + "热爱", + "在传统的行式数据库系统中,数据按如下顺序存储:", + "🇨🇳" + ).map("'" + _ + "'").combinations(5).foreach { seq => val stringVal = seq.mkString(", ") runTest(funcSparkName, funcCkName, stringVal) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala index 8987f919..c99bf247 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala @@ -55,8 +55,9 @@ abstract class MultiStringArgsHash extends UnboundFunction with ClickhouseEquivF applyHash(inputStrings) } } - } else throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description") - + } else { + throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description") + } } } From 9b5f6fc7ba1240f74b7743a114c2d1924045f19b Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sat, 26 Aug 2023 02:17:29 +0800 Subject: [PATCH 3/5] nit --- .../ClickHouseClusterHashUDFSuite.scala | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala index 906738fa..d6bab146 100644 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala @@ -28,25 +28,21 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry)) } - def runTest(funcSparkName: String, funcCkName: String, stringVal: String): Unit = { + def runTest(sparkFuncName: String, ckFuncName: String, stringVal: String): Unit = { val sparkResult = spark.sql( - s"""SELECT - | $funcSparkName($stringVal) AS hash_value - |""".stripMargin + s"SELECT $sparkFuncName($stringVal) AS hash_value" ).collect assert(sparkResult.length == 1) val sparkHashVal = sparkResult.head.getAs[Long]("hash_value") val clickhouseResultJsonStr = runClickHouseSQL( - s"""SELECT - | $funcCkName($stringVal) AS hash_value - |""".stripMargin + s"SELECT $ckFuncName($stringVal) AS hash_value " ).head.getString(0) val clickhouseResultJson = om.readTree(clickhouseResultJsonStr) val clickhouseHashVal = JLong.parseUnsignedLong(clickhouseResultJson.get("hash_value").asText) assert( sparkHashVal == clickhouseHashVal, - s"ck_function: $funcCkName, spark_function: $funcSparkName, args: ($stringVal)" + s"ck_function: $ckFuncName, spark_function: $sparkFuncName, args: ($stringVal)" ) } @@ -57,9 +53,9 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { "clickhouse_murmurHash2_64", "clickhouse_murmurHash2_32", "clickhouse_cityHash64" - ).foreach { funcSparkName => - val funcCkName = dummyRegistry.sparkToClickHouseFunc(funcSparkName) - test(s"UDF $funcSparkName") { + ).foreach { sparkFuncName => + val ckFuncName = dummyRegistry.sparkToClickHouseFunc(sparkFuncName) + test(s"UDF $sparkFuncName") { Seq( "spark-clickhouse-connector", "Apache Spark", @@ -69,7 +65,7 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { "在传统的行式数据库系统中,数据按如下顺序存储:", "🇨🇳" ).map("'" + _ + "'").foreach { stringVal => - runTest(funcSparkName, funcCkName, stringVal) + runTest(sparkFuncName, ckFuncName, stringVal) } } } @@ -80,9 +76,9 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { "clickhouse_murmurHash2_64", "clickhouse_murmurHash2_32", "clickhouse_cityHash64" - ).foreach { funcSparkName => - val funcCkName = dummyRegistry.sparkToClickHouseFunc(funcSparkName) - test(s"UDF $funcSparkName multiple args") { + ).foreach { sparkFuncName => + val ckFuncName = dummyRegistry.sparkToClickHouseFunc(sparkFuncName) + test(s"UDF $sparkFuncName multiple args") { Seq( "spark-clickhouse-connector", "Apache Spark", @@ -93,7 +89,7 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { "🇨🇳" ).map("'" + _ + "'").combinations(5).foreach { seq => val stringVal = seq.mkString(", ") - runTest(funcSparkName, funcCkName, stringVal) + runTest(sparkFuncName, ckFuncName, stringVal) } } } From 808798dfdbbafc79fc30b63d7b62c9cf5ef55874 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sat, 26 Aug 2023 02:19:03 +0800 Subject: [PATCH 4/5] nit --- .../src/main/scala/xenon/clickhouse/func/MurmurHash3.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala index 1c952e62..3a53c952 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala @@ -27,10 +27,11 @@ object MurmurHash3_64 extends MultiStringArgsHash { override def applyHash(input: Array[Any]): Long = hash.Murmurhash3_64(input) } +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 object MurmurHash3_32 extends MultiStringArgsHash { - // https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 override protected def funcName: String = "clickhouse_murmurHash3_32" + override val ckFuncNames: Array[String] = Array("murmurHash3_32") override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash3_32(input)) From 02ca075f8876044bf166b294f75c6bd861fec296 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sat, 26 Aug 2023 02:20:58 +0800 Subject: [PATCH 5/5] nit --- .../src/main/scala/xenon/clickhouse/func/MurmurHash2.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala index 02b07a5f..82332e32 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala @@ -27,10 +27,11 @@ object MurmurHash2_64 extends MultiStringArgsHash { override def applyHash(input: Array[Any]): Long = hash.Murmurhash2_64(input) } +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 object MurmurHash2_32 extends MultiStringArgsHash { - // https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 override protected def funcName: String = "clickhouse_murmurHash2_32" + override val ckFuncNames: Array[String] = Array("murmurHash2_32") override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash2_32(input))