[ZEPPELIN-6406] Remove deprecated Flink 1.15/1.16/1.17 shims and add Flink 1.19/1.20 support#5205
Conversation
…1.19/1.20 support
- Remove deprecated flink1.15-shims, flink1.16-shims, flink1.17-shims modules
- Add new flink1.19-shims and flink1.20-shims modules (Flink 1.19.3 / 1.20.3)
- Fix CatalogStoreHolder requirement in Flink 1.19+ CatalogManager.Builder
- Fix Scala reflection error with ImplicitExpressionConversions by:
- Using TableEnvironment.create() / StreamTableEnvironment.create() directly
instead of custom TableEnvFactory (referenced from PR apache#5032)
- Adding bindWithRetry() for Scala REPL bind operations
- Using explicit imports instead of wildcard org.apache.flink.table.api._
- Update flink.scala.version from 2.12.7 to 2.12.18
- Update DownloadUtils to remove flink-scala jar from FLINK_HOME/lib for 1.19+
- Update CI workflow matrix from [115,116,117] to [119,120]
- Update integration tests and conda env files for Flink 1.19/1.20
- Update documentation to reflect new supported versions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…job in JobManager Stream SQL jobs submitted via Table API's executeInsert() were not registered in JobManager, breaking cancel (with/without savepoint). Register the job from AbstractStreamSqlJob and use CountDownLatch-based await to support cancellation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Updates Zeppelin’s Flink integration by dropping deprecated Flink 1.15–1.17 shim implementations and introducing a unified Flink 1.19/1.20 shim layer, along with related test/CI and documentation updates.
Changes:
- Replace version-specific 1.15/1.16/1.17 shim modules with a new
flink1.19-shimsmodule used for both Flink 1.19 and 1.20. - Update integration/unit tests and CI matrix to run against Flink 1.19/1.20, and adjust download/setup utilities accordingly.
- Improve stream SQL cancellation plumbing and address Scala REPL binding/reflection issues for Flink 1.19+.
Reviewed changes
Copilot reviewed 38 out of 40 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java | Adjusts Flink download layout for 1.19+ (moves flink-scala jar to avoid Scala stdlib conflicts). |
| zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java | Bumps integration test Flink runtime to 1.19.3. |
| zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest114.java | Removes old Flink 1.14 cluster integration test wrapper. |
| zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest113.java | Removes old Flink 1.13 cluster integration test wrapper. |
| zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest120.java | Updates Flink integration test to target Flink 1.20.3 (Scala 2.12). |
| zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest119.java | Updates/renames integration test to target Flink 1.19.3 (Scala 2.12). |
| zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest113.java | Removes old Flink 1.13 integration tests. |
| testing/env_python_3_with_flink_120.yml | Updates PyFlink dependency to 1.20.3 for CI env. |
| testing/env_python_3_with_flink_119.yml | Updates PyFlink dependency to 1.19.3 for CI env. |
| testing/env_python_3_with_flink_117.yml | Removes old Flink 1.17 Python env used by CI. |
| flink/README.md | Updates module list to reflect new flink1.19-shims shared for 1.19/1.20. |
| flink/pom.xml | Replaces old shim modules with flink1.19-shims; adds Flink 1.19/1.20 version properties and bumps Scala version property. |
| flink/flink1.19-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java | Adds Flink 1.19 shim copy of timestamp formatting helpers. |
| flink/flink1.19-shims/src/main/java/org/apache/zeppelin/flink/shims119/CollectStreamTableSink.java | Fixes package name for the new shim namespace. |
| flink/flink1.19-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java | Adds Flink 1.19 shim copy of row printing/formatting utilities. |
| flink/flink1.19-shims/src/main/java/org/apache/zeppelin/flink/Flink119SqlInterpreter.java | Introduces Flink 1.19 SQL interpreter shim (renamed from older version). |
| flink/flink1.19-shims/src/main/java/org/apache/zeppelin/flink/Flink119Shims.java | Implements unified shims for Flink 1.19/1.20 and updates CatalogManager construction for Flink 1.19+. |
| flink/flink1.19-shims/pom.xml | Creates the new shim module POM and ties it to the Flink 1.19 dependency set. |
| flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java | Removes Flink 1.17-specific shim implementation. |
| flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java | Removes Flink 1.17-specific shim implementation. |
| flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java | Removes Flink 1.17 shim implementation. |
| flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java | Removes Flink 1.16-specific shim implementation. |
| flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/shims116/CollectStreamTableSink.java | Removes Flink 1.16 collect sink shim. |
| flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java | Removes Flink 1.16-specific shim implementation. |
| flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116SqlInterpreter.java | Removes Flink 1.16 SQL interpreter shim. |
| flink/flink1.16-shims/pom.xml | Removes Flink 1.16 shim module POM. |
| flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/shims115/CollectStreamTableSink.java | Removes Flink 1.15 collect sink shim. |
| flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java | Removes Flink 1.15 SQL interpreter shim. |
| flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java | Removes Flink 1.15 shim implementation. |
| flink/flink1.15-shims/pom.xml | Removes Flink 1.15 shim module POM. |
| flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java | Updates shim loader to only support Flink 1.19/1.20 via Flink119Shims. |
| flink/flink-scala-2.12/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java | Relaxes output assertions and improves failure reporting for stream SQL cancel tests. |
| flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala | Replaces wildcard Flink Table API imports with explicit imports to avoid reflection/unpickling issues. |
| flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala | Tracks and cancels the currently running stream job from the Zeppelin context. |
| flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala | Adds bind retry workaround and integrates stream-job cancellation hook. |
| flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java | Registers stream insert jobs with JobManager and adds cancellation-aware awaiting logic. |
| flink/flink-scala-2.12/pom.xml | Switches dependencies to the new shim module and updates Flink build profiles to 1.19/1.20. |
| docs/setup/deployment/flink_and_spark_cluster.md | Updates build/run instructions and example downloads to Flink 1.19.3 / -Pflink-119. |
| docs/interpreter/flink.md | Updates supported Flink version statement and version-specific setup notes for 1.19+. |
| .github/workflows/core.yml | Updates CI Flink matrix and build profile from 1.16/1.17 to 1.19/1.20. |
Comments suppressed due to low confidence (2)
flink/flink-scala-2.12/pom.xml:40
- The module-level default
flink.scala.versionis still2.12.7, while this PR updates the Flink line to 1.19/1.20 and the parentflink.scala.versionto 2.12.18. If this module is built without activating-Pflink-119/-Pflink-120, it will compile against an older Scala version than intended. Consider aligning the default here to 2.12.18 (or referencing the parent property) to avoid mismatched builds.
<properties>
<!--library versions-->
<flink.version>${flink1.19.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
<flink.scala.compile.version>${flink.scala.version}</flink.scala.compile.version>
<hive.version>2.3.7</hive.version>
flink/flink1.19-shims/pom.xml:33
- This child POM sets
<version>0.13.0-SNAPSHOT</version>explicitly even though it has a parent POM providing the version. Hard-coding the version here risks divergence when the project version changes; prefer inheriting from the parent (remove the<version>element) or using${project.version}consistently.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } catch (Exception e) { | ||
| LOGGER.error("Fail to run stream sql job", e); | ||
| throw new IOException("Fail to run stream sql job", e); | ||
| throw new IOException("Job was cancelled", e); |
There was a problem hiding this comment.
The catch-all exception handler always rethrows new IOException("Job was cancelled", e) even when the failure is unrelated to cancellation (e.g., job submission failure, runtime exception in result retrieval). This will mask real errors and make debugging harder. Consider only using the "Job was cancelled" message when cancelled is true, and otherwise propagate a more accurate message (or rethrow the original IOException).
| throw new IOException("Job was cancelled", e); | |
| if (cancelled) { | |
| throw new IOException("Job was cancelled", e); | |
| } else if (e instanceof IOException) { | |
| throw (IOException) e; | |
| } else { | |
| throw new IOException("Fail to run stream sql job", e); | |
| } |
| this.insertResult = table.executeInsert(tableName); | ||
| // Register the job with JobManager so that cancel (with savepoint) works properly | ||
| if (insertResult.getJobClient().isPresent()) { | ||
| jobManager.addJob(context, insertResult.getJobClient().get()); | ||
| } |
There was a problem hiding this comment.
jobManager.addJob(...) is called for the insert job, but this method stores a JobClient/poller keyed by paragraphId and expects removeJob(...) to be called to avoid stale entries. Since run(...) never removes the job on completion/cancellation, subsequent stream runs in the same paragraph can leave orphaned pollers and can skip starting a new poller due to the existing mapping. Add a jobManager.removeJob(context.getParagraphId()) in a finally block (guarded by JobClient presence) once the job is done.
| private def bindWithRetry(name: String, tpe: String, value: AnyRef, modifiers: List[String]): Unit = { | ||
| // Workaround for Scala reflection issue with ImplicitExpressionConversions in Flink 1.19+. | ||
| // First bind attempt may fail due to unpickling error, but subsequent attempts succeed | ||
| // because the Scala reflection cache resolves the error state. | ||
| var success = false | ||
| for (attempt <- 1 to 2 if !success) { | ||
| try { | ||
| flinkILoop.bind(name, tpe, value, modifiers: List[String]) | ||
| success = true | ||
| } catch { | ||
| case e: Throwable => | ||
| if (attempt == 1) { | ||
| LOGGER.warn("Retrying bind for " + name + " due to Scala reflection issue: " + e.getMessage) | ||
| } else { | ||
| throw new InterpreterException(s"Failed to bind $name after retry", e) | ||
| } |
There was a problem hiding this comment.
bindWithRetry catches Throwable, which will also catch fatal errors (e.g., VirtualMachineError, ThreadDeath, LinkageError) and retry/hide them. Narrow this to scala.util.control.NonFatal (or at least Exception) so only the intended reflection/unpickling failures are retried.
| Flink 1.19+ is scala free and has changed its binary distribution, the following extra steps is required. | ||
| * Move FLINK_HOME/opt/flink-table-planner_2.12-1.19.3.jar to FLINK_HOME/lib | ||
| * Move FLINK_HOME/lib/flink-table-planner-loader-1.19.3.jar to FLINK_HOME/opt | ||
| * Download flink-table-api-scala-bridge_2.12-1.19.3.jar and flink-table-api-scala_2.12-1.19.3.jar to FLINK_HOME/lib | ||
| * Move FLINK_HOME/opt/flink-sql-client-1.19.3.jar to FLINK_HOME/lib |
There was a problem hiding this comment.
These "version-specific notes" hard-code patch-level jar names (1.19.3) even though the page states Flink 1.19+ is supported. This will be incorrect for 1.20.x (and even other 1.19.x patch levels). Consider using a ${FLINK_VERSION} placeholder and/or wording that instructs users to substitute their installed Flink version.
| Flink 1.19+ is scala free and has changed its binary distribution, the following extra steps is required. | |
| * Move FLINK_HOME/opt/flink-table-planner_2.12-1.19.3.jar to FLINK_HOME/lib | |
| * Move FLINK_HOME/lib/flink-table-planner-loader-1.19.3.jar to FLINK_HOME/opt | |
| * Download flink-table-api-scala-bridge_2.12-1.19.3.jar and flink-table-api-scala_2.12-1.19.3.jar to FLINK_HOME/lib | |
| * Move FLINK_HOME/opt/flink-sql-client-1.19.3.jar to FLINK_HOME/lib | |
| Flink 1.19+ is scala free and has changed its binary distribution, the following extra steps is required. Replace `${FLINK_VERSION}` below with the version of Flink you installed. | |
| * Move FLINK_HOME/opt/flink-table-planner_2.12-${FLINK_VERSION}.jar to FLINK_HOME/lib | |
| * Move FLINK_HOME/lib/flink-table-planner-loader-${FLINK_VERSION}.jar to FLINK_HOME/opt | |
| * Download flink-table-api-scala-bridge_2.12-${FLINK_VERSION}.jar and flink-table-api-scala_2.12-${FLINK_VERSION}.jar to FLINK_HOME/lib | |
| * Move FLINK_HOME/opt/flink-sql-client-${FLINK_VERSION}.jar to FLINK_HOME/lib |
| if (cancelled) { | ||
| // Wait briefly for the job to finish (e.g. stopped with savepoint) | ||
| jobDone.await(10, java.util.concurrent.TimeUnit.SECONDS); | ||
| if (cancelledWithSavepoint) { | ||
| LOGGER.info("Stream sql job stopped with savepoint, jobName: {}", jobName); | ||
| return buildResult(); | ||
| } | ||
| throw new InterruptedException("Job was cancelled"); | ||
| } |
There was a problem hiding this comment.
On the cancellation path (if (cancelled) { ... }), run(...) exits (either returning or throwing) without stopping/joining retrievalThread. Since the thread loops on iterator.hasNext() it may keep running after the paragraph is cancelled/stopped, leading to leaked threads/sockets and continued background work. Consider cancelling the retrieval thread (and closing the SocketStreamIterator if possible) and joining it in the cancellation branch and/or in finally.
…criptor error Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The Scala reflection issue is already handled by bindWithRetry and explicit imports in FlinkILoop. Removing flink-scala from lib/ breaks integration tests by removing the Scala standard library from the classpath. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Flink119Shimsclass (1.20 is the last 1.x LTS, no 1.21 planned)CatalogStoreHolderrequirement for Flink 1.19+CatalogManager.BuilderImplicitExpressionConversions) withbindWithRetry+ explicit imports inFlinkILoopexecuteInsert()jobs inJobManagerwithCountDownLatch-based await[115,116,117]to[119,120]flink.scala.versionfrom 2.12.7 to 2.12.18Test plan
FlinkInterpreterTest7/7 passing with-Pflink-119FlinkInterpreterTest7/7 passing with-Pflink-120