Skip to content

Commit 40b8b6a

Browse files
Flink 2.0 support as separate module (#239)
* integrate flink2 as separate module * run tests on example using scala 3 always * use single version for last 3 stable version of Flink 1.x * add CI step for Flink 2.x * select sbt module using build matrix * set fixed version for kafka connetor * run test on examples for frlink 1.x only
1 parent 1bae103 commit 40b8b6a

File tree

183 files changed

+9018
-152
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

183 files changed

+9018
-152
lines changed

.github/workflows/ci.yml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,18 @@ jobs:
1717
java: [11]
1818
scala: [2.13.16, 3.3.5]
1919
flink: [1.18.1, 1.19.1]
20+
sbt-module: ['flink-1-api']
2021
include:
2122
- scala: 3.3.5
2223
java: 17
2324
flink: 1.20.0
25+
sbt-module: 'flink-1-api'
26+
- scala: 3.3.5
27+
java: 17
28+
flink: 2.0.0
29+
sbt-module: 'flink-2-api'
30+
env:
31+
JAVA_OPTIONS: '--add-opens java.base/java.lang=ALL-UNNAMED'
2432
steps:
2533
- uses: actions/checkout@v3
2634
with:
@@ -32,6 +40,10 @@ jobs:
3240
java-version: ${{ matrix.java }}
3341
cache: sbt
3442
- name: Compile Docs
35-
run: JAVA_OPTS="--add-opens java.base/java.lang=ALL-UNNAMED" sbt "++ ${{ matrix.scala }} docs/mdoc"
36-
- name: Run tests
37-
run: JAVA_OPTS="--add-opens java.base/java.lang=ALL-UNNAMED" sbt -DflinkVersion=${{ matrix.flink }} "++ ${{ matrix.scala }} test"
43+
run: JAVA_OPTS=$JAVA_OPTIONS sbt "++ ${{ matrix.scala }} docs/mdoc"
44+
- name: Run tests on examples
45+
# always running on Scala 3.x version by default
46+
if: ${{ !startsWith(matrix.flink, '1.18') && !startsWith(matrix.flink, '2.') }}
47+
run: JAVA_OPTS=$JAVA_OPTIONS sbt -DflinkVersion1=${{ matrix.flink }} "project examples; test"
48+
- name: Run tests on Flink API
49+
run: JAVA_OPTS=$JAVA_OPTIONS sbt -DflinkVersion1=${{ matrix.flink }} -DflinkVersion2=${{ matrix.flink }} "++ ${{ matrix.scala }}; project ${{ matrix.sbt-module }}; test"

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ metals.sbt
1414
.ammonite/*
1515
*.jar
1616
*checkpoint
17-
sink-table
17+
sink-table
18+
*.iml

build.sbt

Lines changed: 158 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,158 +1,194 @@
11
import sbtrelease.ReleaseStateTransformations.*
22

33
Global / onChangedBuildSource := ReloadOnSourceChanges
4-
Global / excludeLintKeys := Set(git.useGitDescribe)
4+
Global / excludeLintKeys := Set(git.useGitDescribe, crossScalaVersions)
55

66
lazy val rootScalaVersion = "3.3.5"
77
lazy val crossVersions = Seq("2.13.16", rootScalaVersion)
8-
lazy val flinkVersion = System.getProperty("flinkVersion", "1.18.1")
8+
lazy val flinkVersion1 = System.getProperty("flinkVersion1", "1.20.1")
9+
lazy val flinkVersion2 = System.getProperty("flinkVersion2", "2.0.0")
910

1011
lazy val root = (project in file("."))
11-
.aggregate(`scala-api`, `examples`)
12+
.aggregate(`scala-api-common`, `flink-1-api`, `flink-2-api`, `examples`)
13+
.settings(commonSettings)
1214
.settings(
13-
scalaVersion := rootScalaVersion,
14-
publish / skip := true
15+
scalaVersion := rootScalaVersion,
16+
crossScalaVersions := crossVersions,
17+
publish / skip := true
1518
)
1619

17-
lazy val `scala-api` = (project in file("modules/scala-api"))
18-
.settings(ReleaseProcess.releaseSettings(flinkVersion) *)
20+
lazy val commonSettings = Seq(
21+
scalaVersion := rootScalaVersion,
22+
crossScalaVersions := crossVersions,
23+
libraryDependencies ++= {
24+
if (scalaBinaryVersion.value.startsWith("2")) {
25+
Seq(
26+
"com.softwaremill.magnolia1_2" %% "magnolia" % "1.1.10",
27+
"org.scala-lang" % "scala-reflect" % scalaVersion.value % Provided
28+
)
29+
} else {
30+
Seq(
31+
"com.softwaremill.magnolia1_3" %% "magnolia" % "1.3.16",
32+
"org.scala-lang" %% "scala3-compiler" % scalaVersion.value % Provided
33+
)
34+
}
35+
},
36+
// some IT tests won't work without running in forked JVM
37+
Test / fork := true,
38+
// Need to isolate macro usage to version-specific folders.
39+
Compile / unmanagedSourceDirectories += {
40+
val dir = (Compile / scalaSource).value.getPath
41+
val Some((major, _)) = CrossVersion.partialVersion(scalaVersion.value)
42+
file(s"$dir-$major")
43+
},
44+
organization := "org.flinkextended",
45+
description := "Community-maintained fork of official Apache Flink Scala API",
46+
licenses := Seq("APL2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")),
47+
homepage := Some(url("https://github.com/flink-extended/flink-scala-api")),
48+
sonatypeCredentialHost := "s01.oss.sonatype.org",
49+
sonatypeRepository := "https://s01.oss.sonatype.org/service/local",
50+
publishMavenStyle := true,
51+
publishTo := sonatypePublishToBundle.value,
52+
pgpPassphrase := scala.util.Properties.propOrNone("gpg.passphrase").map(_.toCharArray),
53+
git.useGitDescribe := true,
54+
scalacOptions ++= Seq(
55+
"-deprecation",
56+
"-feature",
57+
"-language:higherKinds",
58+
"-language:implicitConversions"
59+
), // Need extra leniency on how much we can inline during typeinfo derivation.
60+
scalacOptions ++= {
61+
if (scalaVersion.value.startsWith("3")) {
62+
Seq("-Xmax-inlines", "128")
63+
} else {
64+
Nil
65+
}
66+
},
67+
scmInfo := Some(
68+
ScmInfo(
69+
url("https://github.com/flink-extended/flink-scala-api"),
70+
"scm:git@github.com:flink-extended/flink-scala-api.git"
71+
)
72+
),
73+
developers := List(
74+
Developer(
75+
id = "romangrebennikov",
76+
name = "Roman Grebennikov",
77+
email = "grv@dfdx.me",
78+
url = url("https://dfdx.me/")
79+
),
80+
Developer(
81+
id = "novakov-alexey",
82+
name = "Alexey Novakov",
83+
email = "novakov.alex@gmail.com",
84+
url = url("https://novakov-alexey.github.io/")
85+
)
86+
),
87+
releaseProcess := Seq.empty[ReleaseStep],
88+
releaseProcess ++= (if (sys.env.contains("RELEASE_VERSION_BUMP"))
89+
Seq[ReleaseStep](
90+
checkSnapshotDependencies,
91+
inquireVersions,
92+
setReleaseVersion,
93+
commitReleaseVersion,
94+
tagRelease,
95+
releaseStepCommandAndRemaining("+publishSigned"),
96+
releaseStepCommand("sonatypeBundleRelease")
97+
)
98+
else Seq.empty[ReleaseStep]),
99+
releaseProcess ++= (if (sys.env.contains("RELEASE_PUBLISH"))
100+
Seq[ReleaseStep](
101+
inquireVersions,
102+
setNextVersion,
103+
commitNextVersion,
104+
pushChanges
105+
)
106+
else Seq.empty[ReleaseStep])
107+
)
108+
109+
lazy val `scala-api-common` = (project in file("modules/flink-common-api"))
110+
.settings(commonSettings)
19111
.settings(
20-
name := "flink-scala-api",
112+
name := "flink-scala-api-common",
21113
scalaVersion := rootScalaVersion,
22114
crossScalaVersions := crossVersions,
23115
libraryDependencies ++= Seq(
24-
"org.apache.flink" % "flink-streaming-java" % flinkVersion % Provided,
25-
"org.apache.flink" % "flink-java" % flinkVersion % Provided,
26-
"org.apache.flink" % "flink-table-api-java-bridge" % flinkVersion % Provided,
27-
"org.apache.flink" % "flink-test-utils" % flinkVersion % Test,
28-
("org.apache.flink" % "flink-streaming-java" % flinkVersion % Test).classifier("tests"),
29-
"org.typelevel" %% "cats-core" % "2.13.0" % Test,
30-
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
31-
"ch.qos.logback" % "logback-classic" % "1.5.18" % Test
32-
),
33-
libraryDependencies ++= {
34-
if (scalaBinaryVersion.value.startsWith("2")) {
35-
Seq(
36-
"com.softwaremill.magnolia1_2" %% "magnolia" % "1.1.10",
37-
"org.scala-lang" % "scala-reflect" % scalaVersion.value % Provided
38-
)
39-
} else {
40-
Seq(
41-
"com.softwaremill.magnolia1_3" %% "magnolia" % "1.3.16",
42-
"org.scala-lang" %% "scala3-compiler" % scalaVersion.value % Provided
43-
)
44-
}
45-
},
46-
// some IT tests won't work without running in forked JVM
47-
Test / fork := true,
48-
// Need to isolate macro usage to version-specific folders.
49-
Compile / unmanagedSourceDirectories += {
50-
val dir = (Compile / scalaSource).value.getPath
51-
val Some((major, _)) = CrossVersion.partialVersion(scalaVersion.value)
52-
file(s"$dir-$major")
53-
},
54-
organization := "org.flinkextended",
55-
description := "Community-maintained fork of official Apache Flink Scala API",
56-
licenses := Seq("APL2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")),
57-
homepage := Some(url("https://github.com/flink-extended/flink-scala-api")),
58-
sonatypeCredentialHost := "s01.oss.sonatype.org",
59-
sonatypeRepository := "https://s01.oss.sonatype.org/service/local",
60-
publishMavenStyle := true,
61-
publishTo := sonatypePublishToBundle.value,
62-
pgpPassphrase := scala.util.Properties.propOrNone("gpg.passphrase").map(_.toCharArray),
63-
git.useGitDescribe := true,
64-
scalacOptions ++= Seq(
65-
"-deprecation",
66-
"-feature",
67-
"-language:higherKinds",
68-
"-language:implicitConversions"
69-
), // Need extra leniency on how much we can inline during typeinfo derivation.
70-
scalacOptions ++= {
71-
if (scalaVersion.value.startsWith("3")) {
72-
Seq("-Xmax-inlines", "128")
73-
} else {
74-
Nil
75-
}
76-
},
77-
scmInfo := Some(
78-
ScmInfo(
79-
url("https://github.com/flink-extended/flink-scala-api"),
80-
"scm:git@github.com:flink-extended/flink-scala-api.git"
81-
)
82-
),
83-
developers := List(
84-
Developer(
85-
id = "romangrebennikov",
86-
name = "Roman Grebennikov",
87-
email = "grv@dfdx.me",
88-
url = url("https://dfdx.me/")
89-
),
90-
Developer(
91-
id = "novakov-alexey",
92-
name = "Alexey Novakov",
93-
email = "novakov.alex@gmail.com",
94-
url = url("https://novakov-alexey.github.io/")
95-
)
96-
),
97-
releaseProcess := Seq.empty[ReleaseStep],
98-
releaseProcess ++= (if (sys.env.contains("RELEASE_VERSION_BUMP"))
99-
Seq[ReleaseStep](
100-
checkSnapshotDependencies,
101-
inquireVersions,
102-
setReleaseVersion,
103-
commitReleaseVersion,
104-
tagRelease,
105-
releaseStepCommandAndRemaining("+publishSigned"),
106-
releaseStepCommand("sonatypeBundleRelease")
107-
)
108-
else Seq.empty[ReleaseStep]),
109-
releaseProcess ++= (if (sys.env.contains("RELEASE_PUBLISH"))
110-
Seq[ReleaseStep](
111-
inquireVersions,
112-
setNextVersion,
113-
commitNextVersion,
114-
pushChanges
115-
)
116-
else Seq.empty[ReleaseStep])
116+
"org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Provided
117+
)
117118
)
118119

119-
lazy val docs = project // new documentation project
120+
def flinkDependencies(flinkVersion: String) =
121+
Seq(
122+
"org.apache.flink" % "flink-streaming-java" % flinkVersion % Provided,
123+
"org.apache.flink" % "flink-table-api-java-bridge" % flinkVersion % Provided,
124+
"org.apache.flink" % "flink-test-utils" % flinkVersion % Test,
125+
("org.apache.flink" % "flink-streaming-java" % flinkVersion % Test).classifier("tests"),
126+
"org.typelevel" %% "cats-core" % "2.13.0" % Test,
127+
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
128+
"ch.qos.logback" % "logback-classic" % "1.5.17" % Test
129+
)
130+
131+
lazy val `flink-1-api` = (project in file("modules/flink-1-api"))
132+
.dependsOn(`scala-api-common`)
133+
.settings(commonSettings)
134+
.settings(
135+
name := "flink-scala-api-1",
136+
scalaVersion := rootScalaVersion,
137+
crossScalaVersions := crossVersions,
138+
libraryDependencies ++= (flinkDependencies(
139+
flinkVersion1
140+
) :+ "org.apache.flink" % "flink-java" % flinkVersion1 % Provided)
141+
)
142+
143+
lazy val `flink-2-api` = (project in file("modules/flink-2-api"))
144+
.dependsOn(`scala-api-common`)
145+
.settings(commonSettings)
146+
.settings(
147+
name := "flink-scala-api-2",
148+
scalaVersion := rootScalaVersion,
149+
crossScalaVersions := crossVersions,
150+
libraryDependencies ++= flinkDependencies(flinkVersion2)
151+
)
152+
153+
lazy val docs = project
120154
.in(file("modules/docs")) // important: it must not be docs/
121155
.settings(
122156
scalaVersion := rootScalaVersion,
123157
crossScalaVersions := crossVersions,
124158
mdocIn := new File("README.md"),
125159
publish / skip := true,
126160
libraryDependencies ++= Seq(
127-
"org.apache.flink" % "flink-streaming-java" % flinkVersion
161+
"org.apache.flink" % "flink-streaming-java" % flinkVersion1
128162
)
129163
)
130-
.dependsOn(`scala-api`)
164+
.dependsOn(`flink-1-api`)
131165
.enablePlugins(MdocPlugin)
132166

133167
val flinkMajorAndMinorVersion =
134-
flinkVersion.split("\\.").toList.take(2).mkString(".")
168+
flinkVersion1.split("\\.").toList.take(2).mkString(".")
135169

136170
lazy val `examples` = (project in file("modules/examples"))
171+
.dependsOn(`flink-1-api`, `scala-api-common`)
137172
.settings(
138-
scalaVersion := rootScalaVersion,
139-
Test / fork := true,
140-
publish / skip := true,
141-
releaseProcess := Seq.empty[ReleaseStep], // Release for example is not needed
173+
scalaVersion := rootScalaVersion,
174+
crossScalaVersions := Seq(rootScalaVersion),
175+
Test / fork := true,
176+
publish / skip := true,
177+
// Release process for the `examples` is not needed
178+
releaseProcess := Seq.empty[ReleaseStep],
142179
libraryDependencies ++= Seq(
143-
"org.flinkextended" %% "flink-scala-api" % "1.20.0_1.2.4",
144-
"org.apache.flink" % "flink-runtime-web" % "1.20.0" % Provided,
145-
"org.apache.flink" % "flink-clients" % "1.20.0" % Provided,
146-
"org.apache.flink" % "flink-state-processor-api" % "1.20.0" % Provided,
147-
"org.apache.flink" % "flink-connector-kafka" % "3.0.2-1.18" % Provided,
148-
"org.apache.flink" % "flink-connector-files" % "1.20.0" % Provided,
149-
"org.apache.flink" % "flink-table-runtime" % "1.20.0" % Provided,
150-
"org.apache.flink" % "flink-table-planner-loader" % "1.20.0" % Provided,
151-
"io.bullet" %% "borer-core" % "1.16.0" % Provided,
152-
"ch.qos.logback" % "logback-classic" % "1.4.14" % Provided,
153-
"org.apache.flink" % "flink-test-utils" % "1.20.0" % Test,
154-
"org.apache.flink" % "flink-streaming-java" % "1.20.0" % Test classifier "tests",
155-
"org.scalatest" %% "scalatest" % "3.2.15" % Test
180+
"org.apache.flink" % "flink-runtime-web" % flinkVersion1 % Provided,
181+
"org.apache.flink" % "flink-clients" % flinkVersion1 % Provided,
182+
"org.apache.flink" % "flink-state-processor-api" % flinkVersion1 % Provided,
183+
"org.apache.flink" % "flink-connector-kafka" % s"3.4.0-1.20" % Provided,
184+
"org.apache.flink" % "flink-connector-files" % flinkVersion1 % Provided,
185+
"org.apache.flink" % "flink-table-runtime" % flinkVersion1 % Provided,
186+
"org.apache.flink" % "flink-table-planner-loader" % flinkVersion1 % Provided,
187+
"io.bullet" %% "borer-core" % "1.16.0" % Provided,
188+
"ch.qos.logback" % "logback-classic" % "1.4.14" % Provided,
189+
"org.apache.flink" % "flink-test-utils" % flinkVersion1 % Test,
190+
"org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Test classifier "tests",
191+
"org.scalatest" %% "scalatest" % "3.2.15" % Test
156192
),
157193
Compile / run := Defaults
158194
.runTask(

modules/examples/src/main/scala/org/example/SocketTextStreamWordCount.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package org.example
1818
* limitations under the License.
1919
*/
2020

21-
import org.apache.flinkx.api._
22-
import org.apache.flinkx.api.serializers._
21+
import org.apache.flinkx.api.*
22+
import org.apache.flinkx.api.serializers.*
2323
import org.apache.flink.configuration.Configuration
2424
import org.apache.flink.configuration.ConfigConstants
2525
import org.apache.flink.configuration.RestOptions.BIND_PORT

modules/examples/src/test/scala/org/example/fraud/FakeRuntimeContext.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import org.apache.flink.api.common.state.{
1919
}
2020
import org.apache.flink.metrics.groups.OperatorMetricGroup
2121

22-
import java.util
2322
import org.apache.flink.api.common.typeutils.TypeSerializer
2423
import org.apache.flink.api.common.JobInfo
2524
import org.apache.flink.api.common.TaskInfo
@@ -82,11 +81,11 @@ class FakeRuntimeContext extends RuntimeContext:
8281

8382
override def getExternalResourceInfos(
8483
resourceName: String
85-
): util.Set[ExternalResourceInfo] = ???
84+
): ju.Set[ExternalResourceInfo] = ???
8685

8786
override def hasBroadcastVariable(name: String): Boolean = ???
8887

89-
override def getBroadcastVariable[RT](name: String): util.List[RT] = ???
88+
override def getBroadcastVariable[RT](name: String): ju.List[RT] = ???
9089

9190
override def getBroadcastVariableWithInitializer[T, C](
9291
name: String,

modules/scala-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala renamed to modules/flink-1-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala

File renamed without changes.

modules/scala-api/src/main/scala-3/org/apache/flinkx/api/LowPrioImplicits.scala renamed to modules/flink-1-api/src/main/scala-3/org/apache/flinkx/api/LowPrioImplicits.scala

File renamed without changes.

modules/scala-api/src/main/scala/org/apache/flink/streaming/util/typeutils/DefaultScalaProductFieldAccessorFactory.scala renamed to modules/flink-1-api/src/main/scala/org/apache/flink/streaming/util/typeutils/DefaultScalaProductFieldAccessorFactory.scala

File renamed without changes.

modules/scala-api/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala renamed to modules/flink-1-api/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala

File renamed without changes.

modules/scala-api/src/main/scala/org/apache/flinkx/api/AsyncDataStream.scala renamed to modules/flink-1-api/src/main/scala/org/apache/flinkx/api/AsyncDataStream.scala

File renamed without changes.

0 commit comments

Comments
 (0)