Skip to content

Commit e2eefad

Browse files
Feature: handle null case class (#265)
* Feature: handle null case class * Removes DISABLE_CASE_CLASS_ARITY_USAGE flag * Removes special condition for case class with no arity: it's handled like any other case class * fix IllegalArgumentException: argument type mismatch * fix case class instantiation: call constructor with all deserialized fields and append default values to complete if necessary * fix constructor reflection lookup being done too early * switch from apply() to constructor() call to fix conflict with other overloaded apply()
1 parent f7a9068 commit e2eefad

File tree

8 files changed

+158
-154
lines changed

8 files changed

+158
-154
lines changed

README.md

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -413,26 +413,6 @@ env
413413

414414
## Feature Flags
415415

416-
### Disable Class Arity Usage
417-
418-
In the [1.1.5 release](https://github.com/flink-extended/flink-scala-api/releases/tag/v1.18.1_1.1.5) the Case Class serialization process also [stores case class arity](https://github.com/flink-extended/flink-scala-api/pull/98/files#diff-e896c210d6a754cb3afb462aea34cca08f090330f6f3c663a64dfb5584fc3727R106) number to
419-
a savepoint. This was introduced to support Case Class schema evolution and allow to add new
420-
class fields with default values. However, unfortunately this is the breaking change to the Flink job state restore process. Flink job will fail, if
421-
a savepoint used for the job restore was created by 1.1.4 or earlier releases.
422-
423-
In order migrate to the 1.1.5 release version, one can use specially added environment variable:
424-
`DISABLE_CASE_CLASS_ARITY_USAGE`.
425-
426-
To disable new savepoint format and be able to restore a Flink job with a savepoint created before 1.1.5 release set the variable to `true`.
427-
428-
Example: `DISABLE_CASE_CLASS_ARITY_USAGE = true`
429-
430-
To enable new serialization logic set this variable to `false` or simply do not define this environment variable.
431-
432-
Example: `DISABLE_CASE_CLASS_ARITY_USAGE = false`
433-
434-
P.S. this flag can be deprecated in future when most of the users migrate to the latest library version.
435-
436416
### Disable fail-fast on Scala type resolution with Class
437417

438418
From [1.2.3 release](https://github.com/flink-extended/flink-scala-api/releases/tag/v1.20.0_1.2.3), a check is done to prevent misusage of Scala type resolution with `Class` which may lead to silently fallback to generic Kryo serializers.
-25 Bytes
Binary file not shown.

modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,56 @@
11
package org.apache.flinkx.api
22

3-
import org.apache.flinkx.api.SchemaEvolutionTest.{Click, ClickEvent, Event, NoArityTest}
4-
import org.apache.flinkx.api.serializers._
5-
import org.apache.flinkx.api.serializer.CaseClassSerializer
6-
import org.apache.flink.api.common.typeinfo.TypeInformation
7-
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
83
import org.apache.flink.api.common.ExecutionConfig
4+
import org.apache.flink.api.common.typeinfo.TypeInformation
5+
import org.apache.flink.core.memory._
6+
import org.apache.flinkx.api.SchemaEvolutionTest.{Click, ClickEvent, Event, View}
7+
import org.apache.flinkx.api.serializer.CaseClassSerializer
8+
import org.apache.flinkx.api.serializers._
99
import org.scalatest.flatspec.AnyFlatSpec
1010
import org.scalatest.matchers.should.Matchers
1111

1212
import java.io.ByteArrayOutputStream
1313
import java.nio.file.{Files, Path}
1414

1515
class SchemaEvolutionTest extends AnyFlatSpec with Matchers {
16-
private implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation[Event]
17-
private implicit val arityTestInfo: TypeInformation[NoArityTest] = deriveTypeInformation[NoArityTest]
18-
private val clicks =
16+
private implicit val clickEventTypeInfo: TypeInformation[ClickEvent] = deriveTypeInformation[ClickEvent]
17+
private implicit val viewInfo: TypeInformation[View] = deriveTypeInformation[View]
18+
private implicit val newClickTypeInfo: TypeInformation[Click] = deriveTypeInformation[Click]
19+
private implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation[Event]
20+
private val clicks =
1921
List(ClickEvent("a", "2021-01-01"), ClickEvent("b", "2021-01-01"), ClickEvent("c", "2021-01-01"))
2022

2123
def createSerializer[T: TypeInformation] =
2224
implicitly[TypeInformation[T]].createSerializer(new ExecutionConfig())
2325

26+
it should "serialize click with old serializer and deserialize it with new serializer" in {
27+
// Serializer before schema change: without serializers for the "new" default fields
28+
val oldClickSerializer = new CaseClassSerializer[Click](
29+
clazz = classOf[Click],
30+
scalaFieldSerializers = Array(stringSerializer, createSerializer[List[ClickEvent]]),
31+
isCaseClassImmutable = true
32+
)
33+
val newClickSerializer = createSerializer[Click] // Serializer derived from the "new" case class
34+
val expected = Click(null, clicks)
35+
36+
// serialize "old" Click with old serializer
37+
val out = new DataOutputSerializer(1024 * 1024)
38+
oldClickSerializer.serialize(expected, out)
39+
40+
// deserialize old Click with new serializer
41+
val in = new DataInputDeserializer(out.getSharedBuffer)
42+
val result = newClickSerializer.deserialize(in)
43+
result shouldBe expected
44+
45+
// serialize modified Click with new serializer
46+
val modifiedExpected = expected.copy(fieldInFile = "modified1", fieldNotInFile = "modified2")
47+
newClickSerializer.serialize(modifiedExpected, out)
48+
49+
// deserialize modified Click with new serializer
50+
val modifiedResult = newClickSerializer.deserialize(in)
51+
modifiedResult shouldBe modifiedExpected
52+
}
53+
2454
ignore should "generate blob for event=click+purchase" in {
2555
val buffer = new ByteArrayOutputStream()
2656
val eventSerializer = createSerializer[Event]
@@ -34,28 +64,9 @@ class SchemaEvolutionTest extends AnyFlatSpec with Matchers {
3464
click shouldBe Click("p1", clicks)
3565
}
3666

37-
ignore should "generate blob for no arity test" in {
38-
val buffer = new ByteArrayOutputStream()
39-
val eventSerializer = createSerializer[NoArityTest]
40-
eventSerializer.serialize(NoArityTest(4, 3, List("test")), new DataOutputViewStreamWrapper(buffer))
41-
Files.write(Path.of("src/test/resources/without-arity-test.dat"), buffer.toByteArray)
42-
}
43-
44-
it should "decode class without arity info" in {
45-
val buffer = this.getClass.getResourceAsStream("/without-arity-test.dat")
46-
val serializer = createSerializer[NoArityTest] match {
47-
case s: CaseClassSerializer[_] => s
48-
case s => fail(s"Derived serializer must be of CaseClassSerializer type, but was $s")
49-
}
50-
val decoded =
51-
serializer.deserializeFromSource(new DataInputViewStreamWrapper(buffer), classArityUsageDisabled = true)
52-
decoded shouldBe NoArityTest(4, 3, List("test"))
53-
}
5467
}
5568

5669
object SchemaEvolutionTest {
57-
case class NoArityTest(field1: Long, field2: Long, field3: List[String] = Nil)
58-
5970
sealed trait Event
6071
case class Click(
6172
id: String,

modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnap
66
import org.apache.flink.core.memory._
77
import org.apache.flink.util.ChildFirstClassLoader
88
import org.apache.flinkx.api.SerializerSnapshotTest._
9+
import org.apache.flinkx.api.serializer.CaseClassSerializer
910
import org.apache.flinkx.api.serializers._
1011
import org.scalatest.Assertion
1112
import org.scalatest.flatspec.AnyFlatSpec
@@ -61,7 +62,7 @@ class SerializerSnapshotTest extends AnyFlatSpec with Matchers {
6162
}
6263

6364
it should "do map ser snapshot adt " in {
64-
implicit val ti: Typeclass[OuterTrait] = deriveTypeInformation[OuterTrait]
65+
implicit val ti: TypeInformation[OuterTrait] = deriveTypeInformation[OuterTrait]
6566
drop(ti)
6667
assertRoundtripSerializer(createSerializer[TraitMap])
6768
}
@@ -133,6 +134,42 @@ class SerializerSnapshotTest extends AnyFlatSpec with Matchers {
133134
deserializedData should be(expectedData)
134135
}
135136

137+
it should "serialize and deserialize case class with null and default" in {
138+
val expectedData = WithDefault(null)
139+
// Serializer before schema change: without serializers for the second "new" default fields
140+
val oldSerializer = new CaseClassSerializer[WithDefault](
141+
clazz = classOf[WithDefault],
142+
scalaFieldSerializers = Array(implicitly[TypeSerializer[SimpleClass1]]),
143+
isCaseClassImmutable = false
144+
)
145+
val oldSnapshot = oldSerializer.snapshotConfiguration()
146+
147+
// Serialize the old snapshot
148+
val output = new DataOutputSerializer(1024 * 1024)
149+
TypeSerializerSnapshot.writeVersionedSnapshot(output, oldSnapshot) // Flink always calls this
150+
151+
// Serialize the "old" data with "old" serializer
152+
oldSerializer.serialize(expectedData, output)
153+
154+
// Deserialize the old snapshot
155+
val input = new DataInputDeserializer(output.getSharedBuffer)
156+
val deserializedOldSnapshot = TypeSerializerSnapshot
157+
.readVersionedSnapshot[WithDefault](input, getClass.getClassLoader) // Flink always calls this
158+
159+
// Deserialize the old data with the new serializer
160+
val newSerializer = implicitly[TypeSerializer[WithDefault]]
161+
val deserializedData = newSerializer.deserialize(input)
162+
deserializedData should be(expectedData)
163+
164+
// serialize modified data with new serializer
165+
expectedData.sc2 = SimpleClass2("c", 3)
166+
newSerializer.serialize(expectedData, output)
167+
168+
// deserialize modified data with new serializer
169+
val modifiedResult = newSerializer.deserialize(input)
170+
modifiedResult shouldBe expectedData
171+
}
172+
136173
def roundtripSerializer[T](ser: TypeSerializer[T], cl: ClassLoader = getClass.getClassLoader): TypeSerializer[T] = {
137174
val snap = ser.snapshotConfiguration()
138175
val buffer = new ByteArrayOutputStream()
@@ -174,4 +211,6 @@ object SerializerSnapshotTest {
174211

175212
case class OuterClass(map: Map[UUID, List[OuterTrait]])
176213

214+
case class WithDefault(var sc1: SimpleClass1 = SimpleClass1("a", 1), var sc2: SimpleClass2 = SimpleClass2("b", 2))
215+
177216
}

modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package org.apache.flinkx.api
22

33
import cats.data.NonEmptyList
4+
import org.apache.flink.api.common.ExecutionConfig
5+
import org.apache.flink.api.common.typeinfo.TypeInformation
46
import org.apache.flinkx.api.SerializerTest.DeeplyNested.ModeNested.SuperNested.{Egg, Food}
57
import org.apache.flinkx.api.SerializerTest.NestedRoot.NestedMiddle.NestedBottom
68
import org.apache.flinkx.api.SerializerTest._
7-
import org.apache.flink.api.common.typeinfo.TypeInformation
8-
import org.apache.flink.api.common.ExecutionConfig
9+
import org.apache.flinkx.api.serializers._
910
import org.scalatest.Inspectors
1011
import org.scalatest.flatspec.AnyFlatSpec
1112
import org.scalatest.matchers.should.Matchers
12-
import org.apache.flinkx.api.serializers._
1313

1414
import java.time.{Instant, LocalDate, LocalDateTime}
1515
import java.util.UUID
@@ -204,6 +204,16 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test
204204
roundtrip(ser, ExtendingCaseClass("abc", "def"))
205205
}
206206

207+
it should "serialize a case class with nullable field" in {
208+
val ser = implicitly[TypeInformation[NullableField]].createSerializer(ec)
209+
roundtrip(ser, NullableField(null, Bar(1)))
210+
}
211+
212+
it should "serialize a case class with a nullable field of a case class with no arity" in {
213+
val ser = implicitly[TypeInformation[NullableFieldWithNoArity]].createSerializer(ec)
214+
roundtrip(ser, NullableFieldWithNoArity(null))
215+
}
216+
207217
}
208218

209219
object SerializerTest {
@@ -276,4 +286,14 @@ object SerializerTest {
276286

277287
final case class ExtendingCaseClass(override val a: String, b: String) extends AbstractClass(a)
278288

289+
final case class NullableField(var a: Foo = null, var b: Bar = null)
290+
291+
object NullableField {
292+
def apply(a: Foo): NullableField = new NullableField(a, null)
293+
}
294+
295+
final case class NoArity()
296+
297+
final case class NullableFieldWithNoArity(var a: NoArity)
298+
279299
}
Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package org.apache.flinkx.api.serializer
22

33
import scala.annotation.nowarn
4-
import scala.reflect.runtime.{currentMirror => cm}
5-
import scala.reflect.runtime.universe
64
import scala.reflect.runtime.universe._
5+
import scala.reflect.runtime.{universe, currentMirror => cm}
76

87
private[serializer] trait ConstructorCompat {
98

109
@nowarn("msg=(eliminated by erasure)|(explicit array)")
11-
final def lookupConstructor[T <: Product](cls: Class[T], numFields: Int): Array[AnyRef] => T = {
10+
final def lookupConstructor[T <: Product](cls: Class[T]): Array[AnyRef] => T = {
1211
val rootMirror = universe.runtimeMirror(cls.getClassLoader)
1312
val classSymbol = rootMirror.classSymbol(cls)
1413

@@ -32,41 +31,26 @@ private[serializer] trait ConstructorCompat {
3231
.head
3332
.asMethod
3433

35-
val classMirror = rootMirror.reflectClass(classSymbol)
36-
val constructorMethodMirror = classMirror.reflectConstructor(primaryConstructorSymbol)
37-
38-
lazy val claas = cm.classSymbol(cls)
39-
lazy val module = claas.companion.asModule
40-
lazy val im = cm.reflect(cm.reflectModule(module).instance)
41-
42-
def withDefault(im: InstanceMirror, name: String, givenArgs: Int): List[Any] = {
43-
val at = TermName(name)
44-
val ts = im.symbol.typeSignature
45-
val method = ts.member(at).asMethod
46-
47-
// either defarg or default val for type of p
48-
def valueFor(p: Symbol, i: Int): Any = {
49-
val defarg = ts member TermName(s"$name$$default$$${i + 1}")
34+
val classMirror = rootMirror.reflectClass(classSymbol)
35+
val constructor = classMirror.reflectConstructor(primaryConstructorSymbol)
36+
val claas = cm.classSymbol(cls)
37+
val module = claas.companion.asModule
38+
val im = cm.reflect(cm.reflectModule(module).instance)
39+
val ts = im.symbol.typeSignature
40+
val constructorSize = primaryConstructorSymbol.paramLists.flatten.size
41+
val defaultValues = (1 to constructorSize)
42+
.flatMap { i =>
43+
val defarg = ts.member(TermName(s"$$lessinit$$greater$$default$$$i"))
5044
if (defarg != NoSymbol)
51-
im.reflectMethod(defarg.asMethod)()
52-
else
53-
p.typeSignature match {
54-
case t if t =:= typeOf[String] => null
55-
case t if t =:= typeOf[Int] | t =:= typeOf[Long] | t =:= typeOf[Double] | t =:= typeOf[Float] => 0
56-
case x => throw new IllegalArgumentException(x.toString)
57-
}
45+
Some(im.reflectMethod(defarg.asMethod)())
46+
else None
5847
}
5948

60-
val defaultArgs = method.paramLists.flatten.splitAt(givenArgs)._2
61-
defaultArgs.zipWithIndex.map(p => valueFor(p._1, p._2 + givenArgs))
62-
}
63-
64-
{ (args: Array[AnyRef]) =>
65-
{
66-
lazy val defaultArgs = withDefault(im, "apply", args.length)
67-
val allArgs = args.toList ++ (if (args.length == numFields) Nil else defaultArgs)
68-
constructorMethodMirror.apply(allArgs: _*).asInstanceOf[T]
69-
}
49+
(args: Array[AnyRef]) => {
50+
// Append default values for missing arguments
51+
val allArgs = args ++ defaultValues.takeRight(constructorSize - args.length)
52+
constructor.apply(allArgs: _*).asInstanceOf[T]
7053
}
7154
}
55+
7256
}
Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,25 @@
11
package org.apache.flinkx.api.serializer
22

3-
import java.lang.reflect.Modifier
4-
3+
import java.lang.reflect.Constructor
54
import scala.util.control.NonFatal
65

76
private[serializer] trait ConstructorCompat:
87
// As of Scala version 3.1.2, there is no direct support for runtime reflection.
98
// This is in contrast to Scala 2, which has its own APIs for reflecting on classes.
109
// Thus, fallback to Java reflection and look up the constructor matching the required signature.
11-
final def lookupConstructor[T](cls: Class[T], numFields: Int): Array[AnyRef] => T =
10+
final def lookupConstructor[T](cls: Class[T]): Array[AnyRef] => T =
1211
// Types of parameters can fail to match when (un)boxing is used.
1312
// Say you have a class `final case class Foo(a: String, b: Int)`.
1413
// The first parameter is an alias for `java.lang.String`, which the constructor uses.
1514
// The second parameter is an alias for `java.lang.Integer`, but the constructor actually takes an unboxed `int`.
1615
val constructor =
17-
try cls.getConstructors.collectFirst { case c if c.getParameterCount == numFields => c }.get
16+
try
17+
cls.getConstructors
18+
.foldLeft[Option[Constructor[?]]](None) {
19+
case (Some(longest), c) if longest.getParameterCount < c.getParameterCount => Some(c)
20+
case (_, c) => Some(c)
21+
}
22+
.get
1823
catch
1924
case NonFatal(e) =>
2025
throw new IllegalArgumentException(
@@ -28,19 +33,13 @@ private[serializer] trait ConstructorCompat:
2833
e
2934
)
3035

31-
{ (args: Array[AnyRef]) =>
32-
{
33-
lazy val defaultArgs = cls
34-
.getMethods()
35-
.filter(
36-
_.getName()
37-
.startsWith("$lessinit$greater$default")
38-
)
39-
.sortBy(_.getName())
40-
.map(_.invoke(null))
41-
.takeRight(numFields - args.length) // read default values for missing arguments
36+
lazy val defaultArgs = cls.getMethods
37+
.filter(_.getName.startsWith("$lessinit$greater$default"))
38+
.sortBy(_.getName())
39+
.map(_.invoke(null))
4240

43-
val allArgs = args.toList ++ (if (args.length == numFields) Nil else defaultArgs)
44-
constructor.newInstance(allArgs*).asInstanceOf[T]
45-
}
41+
(args: Array[AnyRef]) => {
42+
// Append default values for missing arguments
43+
val allArgs = args ++ defaultArgs.takeRight(constructor.getParameterCount - args.length)
44+
constructor.newInstance(allArgs*).asInstanceOf[T]
4645
}

0 commit comments

Comments
 (0)