diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala new file mode 100644 index 00000000..d6bab146 --- /dev/null +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.clickhouse.cluster + +import org.apache.spark.sql.clickhouse.TestUtils.om +import xenon.clickhouse.func._ +import java.lang.{Long => JLong} + +class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { + // only for query function names + val dummyRegistry: CompositeFunctionRegistry = { + val dynamicFunctionRegistry = new DynamicFunctionRegistry + val xxHash64ShardFunc = new ClickHouseXxHash64Shard(Seq.empty) + dynamicFunctionRegistry.register("ck_xx_hash64_shard", xxHash64ShardFunc) // for compatible + dynamicFunctionRegistry.register("clickhouse_shard_xxHash64", xxHash64ShardFunc) + new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry)) + } + + def runTest(sparkFuncName: String, ckFuncName: String, stringVal: String): Unit = { + val sparkResult = spark.sql( + s"SELECT $sparkFuncName($stringVal) AS hash_value" + ).collect + assert(sparkResult.length == 1) + val sparkHashVal = sparkResult.head.getAs[Long]("hash_value") + + val clickhouseResultJsonStr = runClickHouseSQL( + s"SELECT $ckFuncName($stringVal) AS hash_value " + ).head.getString(0) + val clickhouseResultJson = om.readTree(clickhouseResultJsonStr) + val clickhouseHashVal = JLong.parseUnsignedLong(clickhouseResultJson.get("hash_value").asText) + assert( + sparkHashVal == clickhouseHashVal, + s"ck_function: $ckFuncName, spark_function: $sparkFuncName, args: ($stringVal)" + ) + } + + Seq( + "clickhouse_xxHash64", + "clickhouse_murmurHash3_64", + "clickhouse_murmurHash3_32", + "clickhouse_murmurHash2_64", + "clickhouse_murmurHash2_32", + "clickhouse_cityHash64" + ).foreach { sparkFuncName => + val ckFuncName = dummyRegistry.sparkToClickHouseFunc(sparkFuncName) + test(s"UDF $sparkFuncName") { + Seq( + "spark-clickhouse-connector", + "Apache Spark", + "ClickHouse", + "Yandex", + "热爱", + "在传统的行式数据库系统中,数据按如下顺序存储:", + "🇨🇳" + ).map("'" + _ + "'").foreach { stringVal => + runTest(sparkFuncName, ckFuncName, stringVal) + } + } + } + + Seq( + "clickhouse_murmurHash3_64", + "clickhouse_murmurHash3_32", + "clickhouse_murmurHash2_64", + "clickhouse_murmurHash2_32", + "clickhouse_cityHash64" + ).foreach { sparkFuncName => + val ckFuncName = dummyRegistry.sparkToClickHouseFunc(sparkFuncName) + test(s"UDF $sparkFuncName multiple args") { + Seq( + "spark-clickhouse-connector", + "Apache Spark", + "ClickHouse", + "Yandex", + "热爱", + "在传统的行式数据库系统中,数据按如下顺序存储:", + "🇨🇳" + ).map("'" + _ + "'").combinations(5).foreach { seq => + val stringVal = seq.mkString(", ") + runTest(sparkFuncName, ckFuncName, stringVal) + } + } + } +} diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterUDFSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterUDFSuite.scala deleted file mode 100644 index 3d97cc25..00000000 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterUDFSuite.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.clickhouse.cluster - -import org.apache.spark.sql.clickhouse.TestUtils.om - -import java.lang.{Long => JLong} - -class ClickHouseClusterUDFSuite extends SparkClickHouseClusterTest { - - test("UDF ck_xx_hash64") { - Seq("spark-clickhouse-connector", "Apache Spark", "ClickHouse", "Yandex", "热爱", "🇨🇳").foreach { stringVal => - val sparkResult = spark.sql( - s"""SELECT - | ck_xx_hash64('$stringVal') AS hash_value_legacy, - | clickhouse_xxHash64('$stringVal') AS hash_value, - | ck_xx_hash64_shard('single_replica', '$stringVal') AS shard_num_legacy, -- one based ordinal defined in `remote_servers.xml` - | clickhouse_shard_xxHash64('single_replica', '$stringVal') AS shard_num -- one based ordinal defined in `remote_servers.xml` - |""".stripMargin - ).collect - assert(sparkResult.length == 1) - val sparkHashValLegacy = sparkResult.head.getAs[Long]("hash_value_legacy") - val sparkHashVal = sparkResult.head.getAs[Long]("hash_value") - assert(sparkHashValLegacy === sparkHashVal) - val sparkShardNumLegacy = sparkResult.head.getAs[Int]("shard_num_legacy") - val sparkShardNum = sparkResult.head.getAs[Int]("shard_num") - assert(sparkShardNumLegacy === sparkShardNum) - - val clickhouseResultJsonStr = runClickHouseSQL( - s"""SELECT - | xxHash64('$stringVal') AS hash_value, - | xxHash64('$stringVal') % 4 AS shard_num -- zero based ordinal - |""".stripMargin - ).head.getString(0) - val clickhouseResultJson = om.readTree(clickhouseResultJsonStr) - val clickhouseHashVal = JLong.parseUnsignedLong(clickhouseResultJson.get("hash_value").asText) - val clickhouseShardNum = JLong.parseUnsignedLong(clickhouseResultJson.get("shard_num").asText) - - assert(sparkHashVal == clickhouseHashVal) - assert(sparkShardNum == clickhouseShardNum + 1) - } - } -} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala new file mode 100644 index 00000000..d9efa705 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import xenon.clickhouse.hash + +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L694 +object CityHash64 extends MultiStringArgsHash { + + override protected def funcName: String = "clickhouse_cityHash64" + + override val ckFuncNames: Array[String] = Array("cityHash64") + + override def applyHash(input: Array[Any]): Long = hash.CityHash64(input) +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala index b41a7d1a..8faf6564 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala @@ -18,11 +18,19 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import scala.collection.mutable -trait FunctionRegistry { +trait FunctionRegistry extends Serializable { def list: Array[String] def load(name: String): Option[UnboundFunction] + + def sparkToClickHouseFunc: Map[String, String] + + def clickHouseToSparkFunc: Map[String, String] +} + +trait ClickhouseEquivFunction { + val ckFuncNames: Array[String] } class CompositeFunctionRegistry(registries: Array[FunctionRegistry]) extends FunctionRegistry { @@ -30,18 +38,37 @@ class CompositeFunctionRegistry(registries: Array[FunctionRegistry]) extends Fun override def list: Array[String] = registries.flatMap(_.list) override def load(name: String): Option[UnboundFunction] = registries.flatMap(_.load(name)).headOption + + override def sparkToClickHouseFunc: Map[String, String] = registries.flatMap(_.sparkToClickHouseFunc).toMap + + override def clickHouseToSparkFunc: Map[String, String] = registries.flatMap(_.clickHouseToSparkFunc).toMap } object StaticFunctionRegistry extends FunctionRegistry { private val functions = Map[String, UnboundFunction]( "ck_xx_hash64" -> ClickHouseXxHash64, // for compatible - "clickhouse_xxHash64" -> ClickHouseXxHash64 + "clickhouse_xxHash64" -> ClickHouseXxHash64, + "clickhouse_murmurHash2_32" -> MurmurHash2_32, + "clickhouse_murmurHash2_64" -> MurmurHash2_64, + "clickhouse_murmurHash3_32" -> MurmurHash3_32, + "clickhouse_murmurHash3_64" -> MurmurHash3_64, + "clickhouse_cityHash64" -> CityHash64 ) override def list: Array[String] = functions.keys.toArray override def load(name: String): Option[UnboundFunction] = functions.get(name) + + override val sparkToClickHouseFunc: Map[String, String] = + functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).flatMap { case (k, v) => + v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((k, _)) + } + + override val clickHouseToSparkFunc: Map[String, String] = + functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).flatMap { case (k, v) => + v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((_, k)) + } } class DynamicFunctionRegistry extends FunctionRegistry { @@ -56,4 +83,14 @@ class DynamicFunctionRegistry extends FunctionRegistry { override def list: Array[String] = functions.keys.toArray override def load(name: String): Option[UnboundFunction] = functions.get(name) + + override def sparkToClickHouseFunc: Map[String, String] = + functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).toMap.flatMap { case (k, v) => + v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((k, _)) + } + + override def clickHouseToSparkFunc: Map[String, String] = + functions.filter(_._2.isInstanceOf[ClickhouseEquivFunction]).toMap.flatMap { case (k, v) => + v.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.map((_, k)) + } } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala new file mode 100644 index 00000000..c99bf247 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +abstract class MultiStringArgsHash extends UnboundFunction with ClickhouseEquivFunction { + + def applyHash(input: Array[Any]): Long + + protected def funcName: String + + override val ckFuncNames: Array[String] + + override def description: String = s"$name: (value: string, ...) => hash_value: long" + + private def isExceptedType(dt: DataType): Boolean = + dt.isInstanceOf[StringType] + + final override def name: String = funcName + + final override def bind(inputType: StructType): BoundFunction = { + val inputDataTypes = inputType.fields.map(_.dataType) + if (inputDataTypes.forall(isExceptedType)) { + // need to new a ScalarFunction instance for each bind, + // because we do not know the number of arguments in advance + new ScalarFunction[Long] { + override def inputTypes(): Array[DataType] = inputDataTypes + override def name: String = funcName + override def canonicalName: String = s"clickhouse.$name" + override def resultType: DataType = LongType + override def toString: String = name + override def produceResult(input: InternalRow): Long = { + val inputStrings = new Array[Any](input.numFields) + var i = 0 + do { + inputStrings(i) = input.getUTF8String(i).getBytes + i += 1 + } while (i < input.numFields) + applyHash(inputStrings) + } + } + } else { + throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description") + } + } + +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala new file mode 100644 index 00000000..82332e32 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import xenon.clickhouse.hash +import xenon.clickhouse.hash.HashUtils + +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L460 +object MurmurHash2_64 extends MultiStringArgsHash { + + override protected def funcName: String = "clickhouse_murmurHash2_64" + + override val ckFuncNames: Array[String] = Array("murmurHash2_64") + + override def applyHash(input: Array[Any]): Long = hash.Murmurhash2_64(input) +} + +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 +object MurmurHash2_32 extends MultiStringArgsHash { + + override protected def funcName: String = "clickhouse_murmurHash2_32" + + override val ckFuncNames: Array[String] = Array("murmurHash2_32") + + override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash2_32(input)) +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala new file mode 100644 index 00000000..3a53c952 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import xenon.clickhouse.hash +import xenon.clickhouse.hash.HashUtils + +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L543 +object MurmurHash3_64 extends MultiStringArgsHash { + + override protected def funcName: String = "clickhouse_murmurHash3_64" + + override val ckFuncNames: Array[String] = Array("murmurHash3_64") + + override def applyHash(input: Array[Any]): Long = hash.Murmurhash3_64(input) +} + +// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 +object MurmurHash3_32 extends MultiStringArgsHash { + + override protected def funcName: String = "clickhouse_murmurHash3_32" + + override val ckFuncNames: Array[String] = Array("murmurHash3_32") + + override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash3_32(input)) +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/ClickHouseXxHash64.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala similarity index 94% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/ClickHouseXxHash64.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala index e7f223b0..3c4a5b1a 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/ClickHouseXxHash64.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala @@ -26,12 +26,16 @@ import xenon.clickhouse.spec.{ClusterSpec, ShardUtils} * select xxHash64(concat(project_id, toString(seq)) * }}} */ -object ClickHouseXxHash64 extends UnboundFunction with ScalarFunction[Long] { +object ClickHouseXxHash64 extends UnboundFunction with ScalarFunction[Long] with ClickhouseEquivFunction { override def name: String = "clickhouse_xxHash64" override def canonicalName: String = s"clickhouse.$name" + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("xxHash64") + override def description: String = s"$name: (value: string) => hash_value: long" override def bind(inputType: StructType): BoundFunction = inputType.fields match { @@ -45,6 +49,7 @@ object ClickHouseXxHash64 extends UnboundFunction with ScalarFunction[Long] { override def isResultNullable: Boolean = false + // ignore UInt64 vs Int64 def invoke(value: UTF8String): Long = XxHash64Function.hash(value, StringType, 0L) } diff --git a/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala b/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala new file mode 100644 index 00000000..0f3422fd --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.clickhouse + +import org.scalatest.funsuite.AnyFunSuite +import xenon.clickhouse.func._ + +class FunctionRegistrySuite extends AnyFunSuite { + + val staticFunctionRegistry: StaticFunctionRegistry.type = StaticFunctionRegistry + val dynamicFunctionRegistry = new DynamicFunctionRegistry + dynamicFunctionRegistry.register("ck_xx_hash64", ClickHouseXxHash64) + dynamicFunctionRegistry.register("clickhouse_xxHash64", ClickHouseXxHash64) + + test("check StaticFunctionRegistry mappings") { + assert(staticFunctionRegistry.sparkToClickHouseFunc.forall { case (k, v) => + staticFunctionRegistry.load(k).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(v) + }) + assert(staticFunctionRegistry.clickHouseToSparkFunc.forall { case (k, v) => + staticFunctionRegistry.load(v).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(k) + }) + } + + test("check DynamicFunctionRegistry mappings") { + assert(dynamicFunctionRegistry.sparkToClickHouseFunc.forall { case (k, v) => + dynamicFunctionRegistry.load(k).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(v) + }) + assert(dynamicFunctionRegistry.clickHouseToSparkFunc.forall { case (k, v) => + dynamicFunctionRegistry.load(v).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(k) + }) + } + + test("check CompositeFunctionRegistry mappings") { + val compositeFunctionRegistry = + new CompositeFunctionRegistry(Array(staticFunctionRegistry, dynamicFunctionRegistry)) + assert(compositeFunctionRegistry.sparkToClickHouseFunc.forall { case (k, v) => + compositeFunctionRegistry.load(k).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(v) + }) + assert(compositeFunctionRegistry.clickHouseToSparkFunc.forall { case (k, v) => + compositeFunctionRegistry.load(v).get.asInstanceOf[ClickhouseEquivFunction].ckFuncNames.contains(k) + }) + } +}