Skip to content

Commit 50358db

Browse files
Call duplicate when type is not immutable (#114)
call duplicate when serializer is not immutable for some of the type infos --------- Co-authored-by: Suno <suno@hpcnt.com>
1 parent ae420de commit 50358db

File tree

5 files changed

+34
-22
lines changed

5 files changed

+34
-22
lines changed

src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ abstract class CaseClassSerializer[T <: Product](
3939

4040
@transient lazy val log: Logger = LoggerFactory.getLogger(this.getClass)
4141

42-
override def duplicate: CaseClassSerializer[T] = {
42+
override def isImmutableType: Boolean =
43+
scalaFieldSerializers.forall(_.isImmutableType)
44+
45+
override def duplicate: CaseClassSerializer[T] =
4346
clone().asInstanceOf[CaseClassSerializer[T]]
44-
}
4547

4648
@throws[CloneNotSupportedException]
4749
override protected def clone(): Object = {

src/main/scala/org/apache/flinkx/api/typeinfo/CollectionTypeInformation.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import scala.reflect.{ClassTag, classTag}
88

99
case class CollectionTypeInformation[T: ClassTag](serializer: TypeSerializer[T]) extends TypeInformation[T] {
1010
val clazz = classTag[T].runtimeClass.asInstanceOf[Class[T]]
11-
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer
12-
override def isBasicType: Boolean = false
13-
override def isTupleType: Boolean = false
14-
override def isKeyType: Boolean = false
15-
override def getTotalFields: Int = 1
16-
override def getTypeClass: Class[T] = clazz
17-
override def getArity: Int = 1
11+
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] =
12+
if (serializer.isImmutableType) serializer
13+
else serializer.duplicate()
14+
override def isBasicType: Boolean = false
15+
override def isTupleType: Boolean = false
16+
override def isKeyType: Boolean = false
17+
override def getTotalFields: Int = 1
18+
override def getTypeClass: Class[T] = clazz
19+
override def getArity: Int = 1
1820
}

src/main/scala/org/apache/flinkx/api/typeinfo/CoproductTypeInformation.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
55
import org.apache.flink.api.common.typeutils.TypeSerializer
66

77
case class CoproductTypeInformation[T](c: Class[T], ser: TypeSerializer[T]) extends TypeInformation[T] {
8-
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = ser
9-
override def isBasicType: Boolean = false
10-
override def isTupleType: Boolean = false
11-
override def isKeyType: Boolean = false
12-
override def getTotalFields: Int = 1
13-
override def getTypeClass: Class[T] = c
14-
override def getArity: Int = 1
8+
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] =
9+
if (ser.isImmutableType) ser
10+
else ser.duplicate()
11+
override def isBasicType: Boolean = false
12+
override def isTupleType: Boolean = false
13+
override def isKeyType: Boolean = false
14+
override def getTotalFields: Int = 1
15+
override def getTypeClass: Class[T] = c
16+
override def getArity: Int = 1
1517
}

src/main/scala/org/apache/flinkx/api/typeinfo/ProductTypeInformation.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@ class ProductTypeInformation[T <: Product](
1515
fieldTypes,
1616
fieldNames
1717
) {
18-
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = ser
18+
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] =
19+
if (ser.isImmutableType) ser
20+
else ser.duplicate()
1921
}

src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
77
import scala.reflect.{classTag, ClassTag}
88

99
abstract class SimpleTypeInformation[T: ClassTag: TypeSerializer] extends TypeInformation[T] {
10-
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = implicitly[TypeSerializer[T]]
11-
override def isBasicType: Boolean = false
12-
override def isTupleType: Boolean = false
13-
override def isKeyType: Boolean = false
14-
override def getTotalFields: Int = 1
10+
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = {
11+
val ser = implicitly[TypeSerializer[T]]
12+
if (ser.isImmutableType) ser
13+
else ser.duplicate()
14+
}
15+
override def isBasicType: Boolean = false
16+
override def isTupleType: Boolean = false
17+
override def isKeyType: Boolean = false
18+
override def getTotalFields: Int = 1
1519
override def getTypeClass: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
1620
override def getArity: Int = 1
1721
}

0 commit comments

Comments
 (0)