Skip to content

Commit cd1ff2a

Browse files
vinodkcdongjoon-hyun
authored andcommitted
[SPARK-54206][CONNECT] Support BINARY type data in SparkConnectResultSet
### What changes were proposed in this pull request? Add BINARY type support to the Spark Connect JDBC client ### Why are the changes needed? BINARY is a fundamental SQL type required for JDBC compliance and interoperability. ### Does this PR introduce _any_ user-facing change? Yes, it's part of a new feature under Spark connect JDBC support. ### How was this patch tested? Added new UTs in `SparkConnectJdbcDataTypeSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #53018 from vinodkc/br_SPARK-54206. Authored-by: vinodkc <vinod.kc.in@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit ecaec3d) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 09aadf7 commit cd1ff2a

File tree

3 files changed

+87
-8
lines changed

3 files changed

+87
-8
lines changed

sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,11 @@ class SparkConnectResultSet(
137137
override def getBigDecimal(columnIndex: Int, scale: Int): java.math.BigDecimal =
138138
throw new SQLFeatureNotSupportedException
139139

140-
override def getBytes(columnIndex: Int): Array[Byte] =
141-
throw new SQLFeatureNotSupportedException
140+
override def getBytes(columnIndex: Int): Array[Byte] = {
141+
getColumnValue(columnIndex, null: Array[Byte]) { idx =>
142+
currentRow.get(idx).asInstanceOf[Array[Byte]]
143+
}
144+
}
142145

143146
override def getDate(columnIndex: Int): Date = {
144147
getColumnValue(columnIndex, null: Date) { idx => currentRow.getDate(idx) }
@@ -187,7 +190,7 @@ class SparkConnectResultSet(
187190
throw new SQLFeatureNotSupportedException
188191

189192
override def getBytes(columnLabel: String): Array[Byte] =
190-
throw new SQLFeatureNotSupportedException
193+
getBytes(findColumn(columnLabel))
191194

192195
override def getDate(columnLabel: String): Date =
193196
getDate(findColumn(columnLabel))

sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcTypeUtils.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ private[jdbc] object JdbcTypeUtils {
3737
case StringType => Types.VARCHAR
3838
case _: DecimalType => Types.DECIMAL
3939
case DateType => Types.DATE
40+
case BinaryType => Types.BINARY
4041
case other =>
4142
throw new SQLFeatureNotSupportedException(s"DataType $other is not supported yet.")
4243
}
@@ -53,14 +54,15 @@ private[jdbc] object JdbcTypeUtils {
5354
case StringType => classOf[String].getName
5455
case _: DecimalType => classOf[JBigDecimal].getName
5556
case DateType => classOf[Date].getName
57+
case BinaryType => classOf[Array[Byte]].getName
5658
case other =>
5759
throw new SQLFeatureNotSupportedException(s"DataType $other is not supported yet.")
5860
}
5961

6062
def isSigned(field: StructField): Boolean = field.dataType match {
6163
case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
6264
_: DecimalType => true
63-
case NullType | BooleanType | StringType | DateType => false
65+
case NullType | BooleanType | StringType | DateType | BinaryType => false
6466
case other =>
6567
throw new SQLFeatureNotSupportedException(s"DataType $other is not supported yet.")
6668
}
@@ -77,6 +79,7 @@ private[jdbc] object JdbcTypeUtils {
7779
case StringType => 255
7880
case DecimalType.Fixed(p, _) => p
7981
case DateType => 10
82+
case BinaryType => Int.MaxValue
8083
case other =>
8184
throw new SQLFeatureNotSupportedException(s"DataType $other is not supported yet.")
8285
}
@@ -85,7 +88,7 @@ private[jdbc] object JdbcTypeUtils {
8588
case FloatType => 7
8689
case DoubleType => 15
8790
case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | StringType |
88-
DateType => 0
91+
DateType | BinaryType => 0
8992
case DecimalType.Fixed(_, s) => s
9093
case other =>
9194
throw new SQLFeatureNotSupportedException(s"DataType $other is not supported yet.")
@@ -101,6 +104,7 @@ private[jdbc] object JdbcTypeUtils {
101104
case StringType =>
102105
getPrecision(field)
103106
case DateType => 10 // length of `YYYY-MM-DD`
107+
case BinaryType => Int.MaxValue
104108
// precision + negative sign + leading zero + decimal point, like DECIMAL(5,5) = -0.12345
105109
case DecimalType.Fixed(p, s) if p == s => p + 3
106110
// precision + negative sign, like DECIMAL(5,0) = -12345

sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectJdbcDataTypeSuite.scala

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,8 @@ class SparkConnectJdbcDataTypeSuite extends ConnectFunSuite with RemoteSparkSess
259259
("cast(1 AS BIGINT)", (rs: ResultSet) => rs.getLong(999)),
260260
("cast(1 AS FLOAT)", (rs: ResultSet) => rs.getFloat(999)),
261261
("cast(1 AS DOUBLE)", (rs: ResultSet) => rs.getDouble(999)),
262-
("cast(1 AS DECIMAL(10,5))", (rs: ResultSet) => rs.getBigDecimal(999))
262+
("cast(1 AS DECIMAL(10,5))", (rs: ResultSet) => rs.getBigDecimal(999)),
263+
("CAST(X'0A0B0C' AS BINARY)", (rs: ResultSet) => rs.getBytes(999))
263264
).foreach {
264265
case (query, getter) =>
265266
withExecuteQuery(s"SELECT $query") { rs =>
@@ -284,13 +285,18 @@ class SparkConnectJdbcDataTypeSuite extends ConnectFunSuite with RemoteSparkSess
284285
("cast(1 AS FLOAT)", (rs: ResultSet) => rs.getFloat(1), 1.toFloat),
285286
("cast(1 AS DOUBLE)", (rs: ResultSet) => rs.getDouble(1), 1.toDouble),
286287
("cast(1 AS DECIMAL(10,5))", (rs: ResultSet) => rs.getBigDecimal(1),
287-
new java.math.BigDecimal("1.00000"))
288+
new java.math.BigDecimal("1.00000")),
289+
("CAST(X'0A0B0C' AS BINARY)", (rs: ResultSet) => rs.getBytes(1),
290+
Array[Byte](0x0A, 0x0B, 0x0C))
288291
).foreach {
289292
case (query, getter, expectedValue) =>
290293
var resultSet: Option[ResultSet] = None
291294
withExecuteQuery(s"SELECT $query") { rs =>
292295
assert(rs.next())
293-
assert(getter(rs) === expectedValue)
296+
expectedValue match {
297+
case arr: Array[Byte] => assert(getter(rs).asInstanceOf[Array[Byte]].sameElements(arr))
298+
case other => assert(getter(rs) === other)
299+
}
294300
assert(!rs.wasNull)
295301
resultSet = Some(rs)
296302
}
@@ -410,4 +416,70 @@ class SparkConnectJdbcDataTypeSuite extends ConnectFunSuite with RemoteSparkSess
410416
assert(!rs.next())
411417
}
412418
}
419+
420+
test("get binary type") {
421+
val testBytes = Array[Byte](0x01, 0x02, 0x03, 0x04, 0x05)
422+
val hexString = testBytes.map(b => "%02X".format(b)).mkString
423+
withExecuteQuery(s"SELECT CAST(X'$hexString' AS BINARY)") { rs =>
424+
assert(rs.next())
425+
val bytes = rs.getBytes(1)
426+
assert(bytes !== null)
427+
assert(bytes.length === testBytes.length)
428+
assert(bytes.sameElements(testBytes))
429+
assert(!rs.wasNull)
430+
assert(!rs.next())
431+
432+
val metaData = rs.getMetaData
433+
assert(metaData.getColumnCount === 1)
434+
assert(metaData.getColumnType(1) === Types.BINARY)
435+
assert(metaData.getColumnTypeName(1) === "BINARY")
436+
assert(metaData.getColumnClassName(1) === "[B")
437+
assert(metaData.isSigned(1) === false)
438+
}
439+
}
440+
441+
test("get binary type with null") {
442+
withExecuteQuery("SELECT cast(null as binary)") { rs =>
443+
assert(rs.next())
444+
assert(rs.getBytes(1) === null)
445+
assert(rs.wasNull)
446+
assert(!rs.next())
447+
448+
val metaData = rs.getMetaData
449+
assert(metaData.getColumnCount === 1)
450+
assert(metaData.getColumnType(1) === Types.BINARY)
451+
assert(metaData.getColumnTypeName(1) === "BINARY")
452+
assert(metaData.getColumnClassName(1) === "[B")
453+
}
454+
}
455+
456+
test("get binary type by column label") {
457+
val testBytes = Array[Byte](0x0A, 0x0B, 0x0C)
458+
val hexString = testBytes.map(b => "%02X".format(b)).mkString
459+
withExecuteQuery(s"SELECT CAST(X'$hexString' AS BINARY) as test_binary") { rs =>
460+
assert(rs.next())
461+
val bytes = rs.getBytes("test_binary")
462+
assert(bytes !== null)
463+
assert(bytes.length === testBytes.length)
464+
assert(bytes.sameElements(testBytes))
465+
assert(!rs.wasNull)
466+
assert(!rs.next())
467+
468+
val metaData = rs.getMetaData
469+
assert(metaData.getColumnCount === 1)
470+
assert(metaData.getColumnName(1) === "test_binary")
471+
assert(metaData.getColumnLabel(1) === "test_binary")
472+
}
473+
}
474+
475+
test("get empty binary") {
476+
withExecuteQuery("SELECT CAST(X'' AS BINARY)") { rs =>
477+
assert(rs.next())
478+
val bytes = rs.getBytes(1)
479+
assert(bytes !== null)
480+
assert(bytes.length === 0)
481+
assert(!rs.wasNull)
482+
assert(!rs.next())
483+
}
484+
}
413485
}

0 commit comments

Comments
 (0)