From fc4f33b6c3d609f18b820b25cc1435a2c4c5ead8 Mon Sep 17 00:00:00 2001 From: Nikolay Voskresenskiy Date: Tue, 19 Aug 2025 15:30:36 -0400 Subject: [PATCH] Spark 3.5 support --- .../hadoop/gradle/BuildPlugin.groovy | 2 +- .../gradle/scala/SparkVariantPlugin.java | 19 +++ gradle.properties | 1 + spark/core/build.gradle | 27 ++++ spark/sql-30/build.gradle | 20 ++- .../StreamingQueryLifecycleListener.scala | 127 +++++++++++++++++ .../streaming/StreamingQueryTestHarness.scala | 132 ++---------------- .../StreamingQueryIdleLifecycleListener.scala | 31 ++++ .../StreamingQueryIdleLifecycleListener.scala | 31 ++++ .../sql/streaming/EsStreamQueryWriter.scala | 2 +- 10 files changed, 267 insertions(+), 125 deletions(-) create mode 100644 spark/sql-30/src/itest/scala/org/elasticsearch/spark/sql/streaming/StreamingQueryLifecycleListener.scala create mode 100644 spark/sql-30/src/itest/spark35scala212/org/elasticsearch/spark/sql/streaming/StreamingQueryIdleLifecycleListener.scala create mode 100644 spark/sql-30/src/itest/spark35scala213/org/elasticsearch/spark/sql/streaming/StreamingQueryIdleLifecycleListener.scala diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/BuildPlugin.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/BuildPlugin.groovy index a77a4adae..dd4d2ef05 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/BuildPlugin.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/BuildPlugin.groovy @@ -594,7 +594,6 @@ class BuildPlugin implements Plugin { // Main variant needs the least configuration on its own, since it is the default publication created above. sparkVariants.defaultVariant { SparkVariant variant -> - project.publishing.publications.main.setAlias(true) updateVariantArtifactId(project, project.publishing.publications.main, variant) } @@ -651,6 +650,7 @@ class BuildPlugin implements Plugin { } configurePom(project, variantPublication) updateVariantArtifactId(project, variantPublication, variant) + variantPublication.setAlias(true) } } if (signingKey.isPresent()) { diff --git a/buildSrc/src/main/java/org/elasticsearch/hadoop/gradle/scala/SparkVariantPlugin.java b/buildSrc/src/main/java/org/elasticsearch/hadoop/gradle/scala/SparkVariantPlugin.java index f5060ae11..37fe517b5 100644 --- a/buildSrc/src/main/java/org/elasticsearch/hadoop/gradle/scala/SparkVariantPlugin.java +++ b/buildSrc/src/main/java/org/elasticsearch/hadoop/gradle/scala/SparkVariantPlugin.java @@ -75,6 +75,8 @@ public class SparkVariantPlugin implements Plugin { + public static final String ITEST_SOURCE_SET_NAME = "itest"; + public static class SparkVariant { private final CharSequence name; @@ -314,6 +316,23 @@ private static void configureDefaultVariant(Project project, SparkVariant sparkV runtimeElements.getOutgoing().capability(capability); configureScalaJarClassifiers(project, sparkVariant); + // Extend main and test source set for the main variant - this enables the possibility of having diverging code between variants + SourceSetContainer sourceSets = javaPluginExtension.getSourceSets(); + ScalaSourceDirectorySet scalaSourceSet = getScalaSourceSet(sourceSets.getByName(MAIN_SOURCE_SET_NAME)); + scalaSourceSet.setSrcDirs(Arrays.asList( + "src/" + MAIN_SOURCE_SET_NAME + "/scala", + "src/" + MAIN_SOURCE_SET_NAME + "/" + sparkVariant.getName() + )); + ScalaSourceDirectorySet scalaTestSourceSet = getScalaSourceSet(sourceSets.getByName(TEST_SOURCE_SET_NAME)); + scalaTestSourceSet.setSrcDirs(Arrays.asList( + "src/" + TEST_SOURCE_SET_NAME + "/scala", + "src/" + TEST_SOURCE_SET_NAME + "/" + sparkVariant.getName() + )); + ScalaSourceDirectorySet scalaITestSourceSet = getScalaSourceSet(sourceSets.getByName(ITEST_SOURCE_SET_NAME)); + scalaITestSourceSet.setSrcDirs(Arrays.asList( + "src/" + ITEST_SOURCE_SET_NAME + "/scala", + "src/" + ITEST_SOURCE_SET_NAME + "/" + sparkVariant.getName() + )); } private static void configureVariant(Project project, SparkVariant sparkVariant, JavaPluginExtension javaPluginExtension) { diff --git a/gradle.properties b/gradle.properties index 4c6e45428..ed048513e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,6 +30,7 @@ spark20Version = 2.3.0 spark22Version = 2.2.3 spark24Version = 2.4.4 spark30Version = 3.4.3 +spark35Version = 3.5.6 # same as Spark's scala210Version = 2.10.7 diff --git a/spark/core/build.gradle b/spark/core/build.gradle index bef096f7b..6bbc74556 100644 --- a/spark/core/build.gradle +++ b/spark/core/build.gradle @@ -14,6 +14,8 @@ sparkVariants { // We should maybe move these to a separate config file that can be read from both this file and the pipeline script in the future if it creates issues setCoreDefaultVariant "spark30scala213", spark30Version, scala213Version addCoreFeatureVariant "spark30scala212", spark30Version, scala212Version + addCoreFeatureVariant "spark35scala212", spark35Version, scala212Version + addCoreFeatureVariant "spark35scala213", spark35Version, scala213Version all { SparkVariantPlugin.SparkVariant variant -> @@ -44,6 +46,11 @@ sparkVariants { add(variant.configuration('api'), "org.apache.spark:spark-core_${variant.scalaMajorVersion}:${variant.sparkVersion}") { exclude group: 'org.apache.hadoop' } + if (variant.sparkVersion >= "3.5.0") { + add(variant.configuration('implementation'), "org.apache.spark:spark-common-utils_${variant.scalaMajorVersion}:$variant.sparkVersion") { + exclude group: 'org.apache.hadoop' + } + } add(variant.configuration('implementation'), project(":elasticsearch-hadoop-mr")) add(variant.configuration('implementation'), "commons-logging:commons-logging:1.1.1") @@ -126,3 +133,23 @@ tasks.withType(ScalaCompile) { ScalaCompile task -> task.targetCompatibility = project.ext.minimumRuntimeVersion task.options.forkOptions.executable = new File(project.ext.runtimeJavaHome, 'bin/java').absolutePath } + +tasks.register('copyPoms', Copy) { + from(tasks.named('generatePomFileForMainPublication')) { + rename 'pom-default.xml', "elasticsearch-spark-30_2.13-${project.getVersion()}.pom" + } + from(tasks.named('generatePomFileForSpark30scala212Publication')) { + rename 'pom-default.xml', "elasticsearch-spark-30_2.12-${project.getVersion()}.pom" + } + from(tasks.named('generatePomFileForSpark35scala212Publication')) { + rename 'pom-default.xml', "elasticsearch-spark-35_2.12-${project.getVersion()}.pom" + } + from(tasks.named('generatePomFileForSpark35scala213Publication')) { + rename 'pom-default.xml', "elasticsearch-spark-35_2.13-${project.getVersion()}.pom" + } + into(new File(project.buildDir, 'distributions')) +} + +tasks.named('distribution').configure { + dependsOn 'copyPoms' +} diff --git a/spark/sql-30/build.gradle b/spark/sql-30/build.gradle index 86afc779c..467def9de 100644 --- a/spark/sql-30/build.gradle +++ b/spark/sql-30/build.gradle @@ -9,8 +9,10 @@ apply plugin: 'spark.variants' sparkVariants { capabilityGroup 'org.elasticsearch.spark.sql.variant' - setDefaultVariant "spark30scala213", spark30Version, scala213Version - addFeatureVariant "spark30scala212", spark30Version, scala212Version + setCoreDefaultVariant "spark30scala213", spark30Version, scala213Version + addCoreFeatureVariant "spark30scala212", spark30Version, scala212Version + addCoreFeatureVariant "spark35scala213", spark35Version, scala213Version + addCoreFeatureVariant "spark35scala212", spark35Version, scala212Version all { SparkVariantPlugin.SparkVariant variant -> String scalaCompileTaskName = project.sourceSets @@ -58,6 +60,14 @@ sparkVariants { exclude group: 'javax.servlet' exclude group: 'org.apache.hadoop' } + if (variant.sparkVersion >= "3.5.0") { + add(variant.configuration('implementation'), "org.apache.spark:spark-common-utils_${variant.scalaMajorVersion}:$variant.sparkVersion") { + exclude group: 'org.apache.hadoop' + } + add(variant.configuration('implementation'), "org.apache.spark:spark-sql-api_${variant.scalaMajorVersion}:$variant.sparkVersion") { + exclude group: 'org.apache.hadoop' + } + } add(variant.configuration('implementation'), "org.apache.spark:spark-sql_${variant.scalaMajorVersion}:$variant.sparkVersion") { exclude group: 'org.apache.hadoop' @@ -198,6 +208,12 @@ tasks.register('copyPoms', Copy) { from(tasks.named('generatePomFileForSpark30scala212Publication')) { rename 'pom-default.xml', "elasticsearch-spark-30_2.12-${project.getVersion()}.pom" } + from(tasks.named('generatePomFileForSpark35scala212Publication')) { + rename 'pom-default.xml', "elasticsearch-spark-35_2.12-${project.getVersion()}.pom" + } + from(tasks.named('generatePomFileForSpark35scala213Publication')) { + rename 'pom-default.xml', "elasticsearch-spark-35_2.13-${project.getVersion()}.pom" + } into(new File(project.buildDir, 'distributions')) } diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/sql/streaming/StreamingQueryLifecycleListener.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/sql/streaming/StreamingQueryLifecycleListener.scala new file mode 100644 index 000000000..05483077d --- /dev/null +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/sql/streaming/StreamingQueryLifecycleListener.scala @@ -0,0 +1,127 @@ +package org.elasticsearch.spark.sql.streaming + +import org.apache.spark.sql.streaming.StreamingQueryListener +import org.elasticsearch.hadoop.util.unit.TimeValue +import org.junit.Assert + +import java.util.UUID +import java.util.concurrent.{CountDownLatch, TimeUnit} + +// Listener to 1) ensure no more than a single stream is running at a time, 2) know when we're done processing inputs +// and 3) to capture any Exceptions encountered during the execution of the stream. +class StreamingQueryLifecycleListener extends StreamingQueryListener { + + private var uuid: Option[UUID] = None + + private var inputsRequired = 0L + private var inputsSeen = 0L + + private var expectingToThrow: Option[Class[_]] = None + private var foundExpectedException: Boolean = false + private var encounteredException: Option[String] = None + + private var latch = new CountDownLatch(1) // expects just a single batch + + def incrementExpected(): Unit = inputsRequired = inputsRequired + 1 + + def setExpectedException(clazz: Class[_]): Unit = { + expectingToThrow match { + case Some(cls) => throw new IllegalArgumentException(s"Already expecting exception of type [$cls]!") + case None => expectingToThrow = Some(clazz) + } + } + + // Make sure we only ever watch one query at a time. + private def captureQueryID(eventId: UUID): Unit = { + uuid match { + case Some(id) if eventId != id => throw new IllegalStateException("Multiple queries are not supported") + case None => uuid = Some(eventId) + case _ => // No problem for now + } + } + + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + captureQueryID(event.id) + } + + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + captureQueryID(event.progress.id) + + // keep track of the number of input rows seen. When we reach the number of expected inputs, + // wait for two 0 values to pass before signaling to close + + val rows = event.progress.numInputRows + inputsSeen = inputsSeen + rows + + if (inputsSeen == inputsRequired) { + if (rows == 0) { + // Don't close after meeting the first input level. Wait to make sure we get + // one last pass with no new rows processed before signalling. + latch.countDown() + } + } else if (inputsSeen > inputsRequired) { + throw new IllegalStateException("Too many inputs encountered. Expected [" + inputsRequired + + "] but found [" + inputsSeen + "]") + } + } + + protected def onQueryIdle(eventId: UUID): Unit = { + captureQueryID(eventId) + if (inputsSeen == inputsRequired) { + latch.countDown() + } + } + + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + try { + captureQueryID(event.id) + + encounteredException = event.exception match { + case Some(value) => + // This is a whole trace, get everything after the enclosing SparkException (i.e. the original exception + trace) + val messageParts = value.split("\\): ") + if (messageParts.size > 1) { + val nonSparkMessage = messageParts(1) + // Ditch the full trace after the first newline + val removedNewLine = nonSparkMessage.substring(0, nonSparkMessage.indexOf("\n")) + // Split the class name from the exception message and take the class name + Some(removedNewLine.substring(0, removedNewLine.indexOf(":"))) + } else { + // Return the original framework error + Some(value.substring(0, value.indexOf(":"))) + } + case None => None + } + + val expectedExceptionName = expectingToThrow.map(_.getCanonicalName).getOrElse("None") + + foundExpectedException = encounteredException.exists(_.equals(expectedExceptionName)) + } finally { + // signal no matter what to avoid deadlock + latch.countDown() + } + } + + def waitOnComplete(timeValue: TimeValue): Boolean = latch.await(timeValue.millis, TimeUnit.MILLISECONDS) + + def expectAnotherBatch(): Unit = { + latch = new CountDownLatch(1) + } + + def assertExpectedExceptions(message: Option[String]): Unit = { + expectingToThrow match { + case Some(exceptionClass) => + if (!foundExpectedException) { + encounteredException match { + case Some(s) => Assert.fail(s"Expected ${exceptionClass.getCanonicalName} but got $s") + case None => Assert.fail(message.getOrElse(s"Expected ${exceptionClass.getCanonicalName} but no Exceptions were thrown")) + } + } + case None => + encounteredException match { + case Some(exception) => Assert.fail(s"Expected no exceptions but got $exception") + case None => () + } + } + } +} \ No newline at end of file diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/sql/streaming/StreamingQueryTestHarness.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/sql/streaming/StreamingQueryTestHarness.scala index 4466dd751..a7cbbd733 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/sql/streaming/StreamingQueryTestHarness.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/sql/streaming/StreamingQueryTestHarness.scala @@ -19,19 +19,13 @@ package org.elasticsearch.spark.sql.streaming -import java.util.UUID -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.Encoder -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.streaming.StreamingQuery -import org.apache.spark.sql.streaming.StreamingQueryException -import org.apache.spark.sql.streaming.StreamingQueryListener +import org.apache.spark.sql.{Dataset, Encoder, SparkSession} +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamingQueryListener} import org.elasticsearch.hadoop.util.TestUtils import org.elasticsearch.hadoop.util.unit.TimeValue -import org.junit.Assert + +import java.lang.reflect.Method +import java.util.concurrent.{TimeUnit, TimeoutException} /** * Manages the creation and graceful teardown of a Spark Streaming Query @@ -62,120 +56,16 @@ class StreamingQueryTestHarness[S <: java.io.Serializable : Encoder](val sparkSe private[this] object Running extends State private[this] object Closed extends State - // Listener to 1) ensure no more than a single stream is running at a time, 2) know when we're done processing inputs - // and 3) to capture any Exceptions encountered during the execution of the stream. - private [this] class StreamingQueryLifecycleListener extends StreamingQueryListener { - - private var uuid: Option[UUID] = None - - private var inputsRequired = 0L - private var inputsSeen = 0L - - private var expectingToThrow: Option[Class[_]] = None - private var foundExpectedException: Boolean = false - private var encounteredException: Option[String] = None - - private var latch = new CountDownLatch(1) // expects just a single batch - - def incrementExpected(): Unit = inputsRequired = inputsRequired + 1 - - def setExpectedException(clazz: Class[_]): Unit = { - expectingToThrow match { - case Some(cls) => throw new IllegalArgumentException(s"Already expecting exception of type [$cls]!") - case None => expectingToThrow = Some(clazz) - } - } - - // Make sure we only ever watch one query at a time. - private def captureQueryID(eventId: UUID): Unit = { - uuid match { - case Some(id) if eventId != id => throw new IllegalStateException("Multiple queries are not supported") - case None => uuid = Some(eventId) - case _ => // No problem for now - } - } - - override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { - captureQueryID(event.id) - } - override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { - captureQueryID(event.progress.id) - - // keep track of the number of input rows seen. When we reach the number of expected inputs, - // wait for two 0 values to pass before signaling to close - - val rows = event.progress.numInputRows - inputsSeen = inputsSeen + rows - - if (inputsSeen == inputsRequired) { - if (rows == 0) { - // Don't close after meeting the first input level. Wait to make sure we get - // one last pass with no new rows processed before signalling. - latch.countDown() - } - } else if (inputsSeen > inputsRequired) { - throw new IllegalStateException("Too many inputs encountered. Expected [" + inputsRequired + - "] but found [" + inputsSeen + "]") - } - } - - override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { - try { - captureQueryID(event.id) - - encounteredException = event.exception match { - case Some(value) => - // This is a whole trace, get everything after the enclosing SparkException (i.e. the original exception + trace) - val messageParts = value.split("\\): ") - if (messageParts.size > 1) { - val nonSparkMessage = messageParts(1) - // Ditch the full trace after the first newline - val removedNewLine = nonSparkMessage.substring(0, nonSparkMessage.indexOf("\n")) - // Split the class name from the exception message and take the class name - Some(removedNewLine.substring(0, removedNewLine.indexOf(":"))) - } else { - // Return the original framework error - Some(value.substring(0, value.indexOf(":"))) - } - case None => None - } - - val expectedExceptionName = expectingToThrow.map(_.getCanonicalName).getOrElse("None") - - foundExpectedException = encounteredException.exists(_.equals(expectedExceptionName)) - } finally { - // signal no matter what to avoid deadlock - latch.countDown() - } - } - - def waitOnComplete(timeValue: TimeValue): Boolean = latch.await(timeValue.millis, TimeUnit.MILLISECONDS) - - def expectAnotherBatch(): Unit = { - latch = new CountDownLatch(1) - } - - def assertExpectedExceptions(message: Option[String]): Unit = { - expectingToThrow match { - case Some(exceptionClass) => - if (!foundExpectedException) { - encounteredException match { - case Some(s) => Assert.fail(s"Expected ${exceptionClass.getCanonicalName} but got $s") - case None => Assert.fail(message.getOrElse(s"Expected ${exceptionClass.getCanonicalName} but no Exceptions were thrown")) - } - } - case None => - encounteredException match { - case Some(exception) => Assert.fail(s"Expected no exceptions but got $exception") - case None => () - } - } + @transient private val listener: StreamingQueryLifecycleListener = { + val supportsIdleEvent = classOf[StreamingQueryListener].getMethods.exists((method: Method) => method.getName.equals("onQueryIdle")) + if (supportsIdleEvent) { + Class.forName("org.elasticsearch.spark.sql.streaming.StreamingQueryIdleLifecycleListener").getConstructor().newInstance().asInstanceOf[StreamingQueryLifecycleListener] + } else { + new StreamingQueryLifecycleListener() } } - @transient private val listener = new StreamingQueryLifecycleListener - // Port number to use for the socket source @transient private lazy val port = { // Todo: Generate this port? diff --git a/spark/sql-30/src/itest/spark35scala212/org/elasticsearch/spark/sql/streaming/StreamingQueryIdleLifecycleListener.scala b/spark/sql-30/src/itest/spark35scala212/org/elasticsearch/spark/sql/streaming/StreamingQueryIdleLifecycleListener.scala new file mode 100644 index 000000000..8bcfdbea6 --- /dev/null +++ b/spark/sql-30/src/itest/spark35scala212/org/elasticsearch/spark/sql/streaming/StreamingQueryIdleLifecycleListener.scala @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.spark.sql.streaming + +import org.apache.spark.sql.streaming.StreamingQueryListener + +// Listener to 1) ensure no more than a single stream is running at a time, 2) know when we're done processing inputs +// and 3) to capture any Exceptions encountered during the execution of the stream. +private[streaming] class StreamingQueryIdleLifecycleListener extends StreamingQueryLifecycleListener { + + override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = { + super.onQueryIdle(event.id) + } +} \ No newline at end of file diff --git a/spark/sql-30/src/itest/spark35scala213/org/elasticsearch/spark/sql/streaming/StreamingQueryIdleLifecycleListener.scala b/spark/sql-30/src/itest/spark35scala213/org/elasticsearch/spark/sql/streaming/StreamingQueryIdleLifecycleListener.scala new file mode 100644 index 000000000..8bcfdbea6 --- /dev/null +++ b/spark/sql-30/src/itest/spark35scala213/org/elasticsearch/spark/sql/streaming/StreamingQueryIdleLifecycleListener.scala @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.spark.sql.streaming + +import org.apache.spark.sql.streaming.StreamingQueryListener + +// Listener to 1) ensure no more than a single stream is running at a time, 2) know when we're done processing inputs +// and 3) to capture any Exceptions encountered during the execution of the stream. +private[streaming] class StreamingQueryIdleLifecycleListener extends StreamingQueryLifecycleListener { + + override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = { + super.onQueryIdle(event.id) + } +} \ No newline at end of file diff --git a/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/streaming/EsStreamQueryWriter.scala b/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/streaming/EsStreamQueryWriter.scala index 1b2ea3d56..769244e7f 100644 --- a/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/streaming/EsStreamQueryWriter.scala +++ b/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/streaming/EsStreamQueryWriter.scala @@ -47,7 +47,7 @@ private [sql] class EsStreamQueryWriter(serializedSettings: String, override protected def bytesConverter: Class[_ <: BytesConverter] = classOf[JdkBytesConverter] override protected def fieldExtractor: Class[_ <: FieldExtractor] = classOf[DataFrameFieldExtractor] - private val encoder: ExpressionEncoder[Row] = RowEncoder(schema).resolveAndBind() + private val encoder: ExpressionEncoder[Row] = ExpressionEncoder(RowEncoder.encoderFor(schema, lenient = false)).resolveAndBind() private val deserializer: ExpressionEncoder.Deserializer[Row] = encoder.createDeserializer() override def write(taskContext: TaskContext, data: Iterator[InternalRow]): Unit = {