Skip to content

Commit 4eb66ab

Browse files
authored
Spark: Allow setting arbitrary options for ClickHouseClient (#203)
* Spark: Allow setting arbitrary options for ClickHouseClient * option * docs
1 parent 553b8b5 commit 4eb66ab

File tree

8 files changed

+84
-43
lines changed

8 files changed

+84
-43
lines changed

clickhouse-core/src/main/scala/xenon/clickhouse/client/NodeClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ object NodeClient {
3434
class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
3535

3636
private val node: ClickHouseNode = ClickHouseNode.builder()
37+
.options(nodeSpec.options)
3738
.host(nodeSpec.host)
3839
.port(nodeSpec.protocol, nodeSpec.port)
3940
.database(nodeSpec.database)
4041
.credentials(ClickHouseCredentials.fromUserAndPassword(nodeSpec.username, nodeSpec.password))
4142
.build()
4243

4344
private val client: ClickHouseClient = ClickHouseClient.builder()
44-
.option(ClickHouseClientOption.ASYNC, false)
4545
.option(ClickHouseClientOption.FORMAT, ClickHouseFormat.RowBinary)
4646
.nodeSelector(ClickHouseNodeSelector.of(node.getProtocol))
4747
.build()

clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty}
2020
import xenon.clickhouse.ToJson
2121
import xenon.clickhouse.Utils._
2222

23+
import java.util
24+
import scala.collection.JavaConverters._
25+
2326
trait Nodes {
2427
def nodes: Array[NodeSpec]
2528
}
@@ -32,7 +35,8 @@ case class NodeSpec(
3235
@JsonProperty("protocol") protocol: ClickHouseProtocol = GRPC,
3336
@JsonProperty("username") username: String = "default",
3437
@JsonProperty("password") password: String = "",
35-
@JsonProperty("database") database: String = "default"
38+
@JsonProperty("database") database: String = "default",
39+
@JsonProperty("options") options: util.Map[String, String] = Map.empty[String, String].asJava
3640
) extends Nodes with ToJson with Serializable {
3741
@JsonProperty("host") def host: String = findHost(_host)
3842
@JsonProperty("http_port") def http_port: Option[Int] = findPort(_http_port)

docs/configurations/01_catalog_configurations.md

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,20 @@ Suppose you have one ClickHouse instance which installed on `10.0.0.1` and expos
2121
Edit `$SPARK_HOME/conf/spark-defaults.conf`.
2222

2323
```
24-
spark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog
25-
spark.sql.catalog.clickhouse.host 10.0.0.1
26-
spark.sql.catalog.clickhouse.protocol http
27-
spark.sql.catalog.clickhouse.http_port 8123
28-
spark.sql.catalog.clickhouse.user default
24+
# register a catalog named "clickhouse"
25+
spark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog
26+
27+
# basic configurations for "clickhouse" catalog
28+
spark.sql.catalog.clickhouse.host 10.0.0.1
29+
spark.sql.catalog.clickhouse.protocol http
30+
spark.sql.catalog.clickhouse.http_port 8123
31+
spark.sql.catalog.clickhouse.user default
2932
spark.sql.catalog.clickhouse.password
30-
spark.sql.catalog.clickhouse.database default
33+
spark.sql.catalog.clickhouse.database default
34+
35+
# custom options of clickhouse-client for "clickhouse" catalog
36+
spark.sql.catalog.clickhouse.option.async false
37+
spark.sql.catalog.clickhouse.option.client_name spark
3138
```
3239

3340
Then you can access ClickHouse table `<ck_db>.<ck_table>` from Spark SQL by using `clickhouse.<ck_db>.<ck_table>`.
@@ -42,21 +49,23 @@ clickhouse1, and another installed on `10.0.0.2` and exposes gRPC on port `9100`
4249
Edit `$SPARK_HOME/conf/spark-defaults.conf`.
4350

4451
```
45-
spark.sql.catalog.clickhouse1 xenon.clickhouse.ClickHouseCatalog
46-
spark.sql.catalog.clickhouse1.host 10.0.0.1
47-
spark.sql.catalog.clickhouse1.protocol grpc
48-
spark.sql.catalog.clickhouse1.grpc_port 9100
49-
spark.sql.catalog.clickhouse1.user default
52+
spark.sql.catalog.clickhouse1 xenon.clickhouse.ClickHouseCatalog
53+
spark.sql.catalog.clickhouse1.host 10.0.0.1
54+
spark.sql.catalog.clickhouse1.protocol grpc
55+
spark.sql.catalog.clickhouse1.grpc_port 9100
56+
spark.sql.catalog.clickhouse1.user default
5057
spark.sql.catalog.clickhouse1.password
51-
spark.sql.catalog.clickhouse1.database default
58+
spark.sql.catalog.clickhouse1.database default
59+
spark.sql.catalog.clickhouse1.option.async false
5260
53-
spark.sql.catalog.clickhouse2 xenon.clickhouse.ClickHouseCatalog
54-
spark.sql.catalog.clickhouse2.host 10.0.0.2
55-
spark.sql.catalog.clickhouse2.protocol grpc
56-
spark.sql.catalog.clickhouse2.grpc_port 9100
57-
spark.sql.catalog.clickhouse2.user default
61+
spark.sql.catalog.clickhouse2 xenon.clickhouse.ClickHouseCatalog
62+
spark.sql.catalog.clickhouse2.host 10.0.0.2
63+
spark.sql.catalog.clickhouse2.protocol grpc
64+
spark.sql.catalog.clickhouse2.grpc_port 9100
65+
spark.sql.catalog.clickhouse2.user default
5866
spark.sql.catalog.clickhouse2.password
59-
spark.sql.catalog.clickhouse2.database default
67+
spark.sql.catalog.clickhouse2.database default
68+
spark.sql.catalog.clickhouse2.option.async false
6069
```
6170

6271
Then you can access clickhouse1 table `<ck_db>.<ck_table>` from Spark SQL by `clickhouse1.<ck_db>.<ck_table>`,

spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,27 +37,31 @@ trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn {
3737
.set("spark.sql.catalog.clickhouse_s1r1.user", "default")
3838
.set("spark.sql.catalog.clickhouse_s1r1.password", "")
3939
.set("spark.sql.catalog.clickhouse_s1r1.database", "default")
40+
.set("spark.sql.catalog.clickhouse_s1r1.option.async", "false")
4041
.set("spark.sql.catalog.clickhouse_s1r2", "xenon.clickhouse.ClickHouseCatalog")
4142
.set("spark.sql.catalog.clickhouse_s1r2.host", clickhouse_s1r2_host)
4243
.set("spark.sql.catalog.clickhouse_s1r2.http_port", clickhouse_s1r2_http_port.toString)
4344
.set("spark.sql.catalog.clickhouse_s1r2.protocol", "http")
4445
.set("spark.sql.catalog.clickhouse_s1r2.user", "default")
4546
.set("spark.sql.catalog.clickhouse_s1r2.password", "")
4647
.set("spark.sql.catalog.clickhouse_s1r2.database", "default")
48+
.set("spark.sql.catalog.clickhouse_s1r2.option.async", "false")
4749
.set("spark.sql.catalog.clickhouse_s2r1", "xenon.clickhouse.ClickHouseCatalog")
4850
.set("spark.sql.catalog.clickhouse_s2r1.host", clickhouse_s2r1_host)
4951
.set("spark.sql.catalog.clickhouse_s2r1.http_port", clickhouse_s2r1_http_port.toString)
5052
.set("spark.sql.catalog.clickhouse_s2r1.protocol", "http")
5153
.set("spark.sql.catalog.clickhouse_s2r1.user", "default")
5254
.set("spark.sql.catalog.clickhouse_s2r1.password", "")
5355
.set("spark.sql.catalog.clickhouse_s2r1.database", "default")
56+
.set("spark.sql.catalog.clickhouse_s2r1.option.async", "false")
5457
.set("spark.sql.catalog.clickhouse_s2r2", "xenon.clickhouse.ClickHouseCatalog")
5558
.set("spark.sql.catalog.clickhouse_s2r2.host", clickhouse_s2r2_host)
5659
.set("spark.sql.catalog.clickhouse_s2r2.http_port", clickhouse_s2r2_http_port.toString)
5760
.set("spark.sql.catalog.clickhouse_s2r2.protocol", "http")
5861
.set("spark.sql.catalog.clickhouse_s2r2.user", "default")
5962
.set("spark.sql.catalog.clickhouse_s2r2.password", "")
6063
.set("spark.sql.catalog.clickhouse_s2r2.database", "default")
64+
.set("spark.sql.catalog.clickhouse_s2r2.option.async", "false")
6165
// extended configurations
6266
.set("spark.clickhouse.write.batchSize", "2")
6367
.set("spark.clickhouse.write.maxRetry", "2")

spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn {
3838
.set("spark.sql.catalog.clickhouse.user", CLICKHOUSE_USER)
3939
.set("spark.sql.catalog.clickhouse.password", CLICKHOUSE_PASSWORD)
4040
.set("spark.sql.catalog.clickhouse.database", CLICKHOUSE_DB)
41+
.set("spark.sql.catalog.clickhouse.option.async", "false")
4142
// extended configurations
4243
.set("spark.clickhouse.write.batchSize", "2")
4344
.set("spark.clickhouse.write.maxRetry", "2")

spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ import xenon.clickhouse.exception.ClickHouseErrCode._
2020

2121
import java.util.concurrent.TimeUnit
2222

23+
/**
24+
* Run the following command to update the configuration docs.
25+
* UPDATE=1 ./gradlew test --tests=ConfigurationSuite
26+
*/
2327
object ClickHouseSQLConf {
2428

2529
val WRITE_BATCH_SIZE: ConfigEntry[Int] =

spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseHelper.scala

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@
1414

1515
package xenon.clickhouse
1616

17-
import java.time.{LocalDateTime, ZoneId}
18-
19-
import scala.collection.JavaConverters._
20-
2117
import com.clickhouse.client.ClickHouseProtocol
18+
import com.clickhouse.client.config.ClickHouseClientOption
2219
import com.fasterxml.jackson.databind.JsonNode
2320
import com.fasterxml.jackson.databind.node.NullNode
2421
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
@@ -32,28 +29,44 @@ import xenon.clickhouse.client.NodeClient
3229
import xenon.clickhouse.exception.CHException
3330
import xenon.clickhouse.spec._
3431

32+
import java.time.{LocalDateTime, ZoneId}
33+
import scala.collection.JavaConverters._
34+
3535
trait ClickHouseHelper extends Logging {
3636

3737
@volatile lazy val DEFAULT_ACTION_IF_NO_SUCH_DATABASE: String => Unit =
38-
(db: String) => throw new NoSuchNamespaceException(db)
38+
(db: String) => throw NoSuchNamespaceException(db)
3939

4040
@volatile lazy val DEFAULT_ACTION_IF_NO_SUCH_TABLE: (String, String) => Unit =
41-
(database, table) => throw new NoSuchTableException(s"$database.$table")
41+
(database, table) => throw NoSuchTableException(s"$database.$table")
4242

4343
def unwrap(ident: Identifier): Option[(String, String)] = ident.namespace() match {
4444
case Array(database) => Some((database, ident.name()))
4545
case _ => None
4646
}
4747

48-
def buildNodeSpec(options: CaseInsensitiveStringMap): NodeSpec = NodeSpec(
49-
_host = options.getOrDefault(CATALOG_PROP_HOST, "localhost"),
50-
_grpc_port = Some(options.getInt(CATALOG_PROP_GRPC_PORT, 9100)),
51-
_http_port = Some(options.getInt(CATALOG_PROP_HTTP_PORT, 8123)),
52-
protocol = ClickHouseProtocol.fromUriScheme(options.getOrDefault(CATALOG_PROP_PROTOCOL, "http")),
53-
username = options.getOrDefault(CATALOG_PROP_USER, "default"),
54-
password = options.getOrDefault(CATALOG_PROP_PASSWORD, ""),
55-
database = options.getOrDefault(CATALOG_PROP_DATABASE, "default")
56-
)
48+
def buildNodeSpec(options: CaseInsensitiveStringMap): NodeSpec = {
49+
val clientOpts = options.asScala
50+
.filterKeys(_.startsWith(CATALOG_PROP_OPTION_PREFIX))
51+
.filterKeys { key =>
52+
val clientOpt = key.substring(CATALOG_PROP_OPTION_PREFIX.length)
53+
val ignore = CATALOG_PROP_IGNORE_OPTIONS.contains(clientOpt)
54+
if (ignore) {
55+
log.warn(s"Ignore configuration $key.")
56+
}
57+
!ignore
58+
}.toMap
59+
NodeSpec(
60+
_host = options.getOrDefault(CATALOG_PROP_HOST, "localhost"),
61+
_grpc_port = Some(options.getInt(CATALOG_PROP_GRPC_PORT, 9100)),
62+
_http_port = Some(options.getInt(CATALOG_PROP_HTTP_PORT, 8123)),
63+
protocol = ClickHouseProtocol.fromUriScheme(options.getOrDefault(CATALOG_PROP_PROTOCOL, "http")),
64+
username = options.getOrDefault(CATALOG_PROP_USER, "default"),
65+
password = options.getOrDefault(CATALOG_PROP_PASSWORD, ""),
66+
database = options.getOrDefault(CATALOG_PROP_DATABASE, "default"),
67+
options = clientOpts.asJava
68+
)
69+
}
5770

5871
def queryClusterSpecs(nodeSpec: NodeSpec)(implicit nodeClient: NodeClient): Seq[ClusterSpec] = {
5972
val clustersOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow(

spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/Constants.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,25 @@
1414

1515
package xenon.clickhouse
1616

17+
import com.clickhouse.client.config.ClickHouseClientOption._
18+
1719
object Constants {
1820
// format: off
1921
//////////////////////////////////////////////////////////
2022
//////// clickhouse datasource catalog properties ////////
2123
//////////////////////////////////////////////////////////
22-
final val CATALOG_PROP_HOST = "host"
23-
final val CATALOG_PROP_GRPC_PORT = "grpc_port"
24-
final val CATALOG_PROP_HTTP_PORT = "http_port"
25-
final val CATALOG_PROP_PROTOCOL = "protocol"
26-
final val CATALOG_PROP_USER = "user"
27-
final val CATALOG_PROP_PASSWORD = "password"
28-
final val CATALOG_PROP_DATABASE = "database"
29-
final val CATALOG_PROP_TZ = "timezone" // server(default), client, UTC+3, Asia/Shanghai, etc.
24+
final val CATALOG_PROP_HOST = "host"
25+
final val CATALOG_PROP_GRPC_PORT = "grpc_port"
26+
final val CATALOG_PROP_HTTP_PORT = "http_port"
27+
final val CATALOG_PROP_PROTOCOL = "protocol"
28+
final val CATALOG_PROP_USER = "user"
29+
final val CATALOG_PROP_PASSWORD = "password"
30+
final val CATALOG_PROP_DATABASE = "database"
31+
final val CATALOG_PROP_TZ = "timezone" // server(default), client, UTC+3, Asia/Shanghai, etc.
32+
final val CATALOG_PROP_OPTION_PREFIX = "option."
33+
final val CATALOG_PROP_IGNORE_OPTIONS = Seq(
34+
DATABASE.getKey, COMPRESS.getKey, DECOMPRESS.getKey, FORMAT.getKey, RETRY.getKey,
35+
USE_SERVER_TIME_ZONE.getKey, USE_SERVER_TIME_ZONE_FOR_DATES.getKey, SERVER_TIME_ZONE.getKey, USE_TIME_ZONE.getKey)
3036

3137
//////////////////////////////////////////////////////////
3238
////////// clickhouse datasource read properties /////////

0 commit comments

Comments
 (0)