Skip to content

Commit 3b199db

Browse files
authored
Make TypeMapper trait serializable (#82)
* Make TypeMapper trait serializable
1 parent eb74983 commit 3b199db

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ case class MappedSerializer[A, B](mapper: TypeMapper[A, B], ser: TypeSerializer[
3737
}
3838

3939
object MappedSerializer {
40-
trait TypeMapper[A, B] {
40+
trait TypeMapper[A, B] extends Serializable {
4141
def map(a: A): B
4242
def contramap(b: B): A
4343
}

src/test/scala/org/apache/flinkx/api/MappedTypeInfoTest.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,17 @@ import scala.reflect.ClassTag
1111

1212
class MappedTypeInfoTest extends AnyFlatSpec with Matchers with TestUtils {
1313
import MappedTypeInfoTest._
14-
it should "derive TI for non-serializeable classes" in {
14+
it should "derive TI for non-serializable classes" in {
1515
drop(implicitly[TypeInformation[WrappedString]])
1616
}
17+
18+
it should "serialize type mappers across tasks" in {
19+
val env = StreamExecutionEnvironment.getExecutionEnvironment
20+
val dataStream = env.fromElements(Purchase(1, 1.0))
21+
val purchase = dataStream.keyBy(_.id).map(_.copy(price = 5.1)).executeAndCollect(1)
22+
23+
purchase.last should be (Purchase(1, 5.1))
24+
}
1725
}
1826

1927
object MappedTypeInfoTest {
@@ -40,4 +48,10 @@ object MappedTypeInfoTest {
4048
internal = value
4149
}
4250
}
51+
52+
case class Purchase(id: Int, price: BigDecimal)
53+
54+
object Purchase {
55+
implicit val typeInfo: TypeInformation[Purchase] = deriveTypeInformation
56+
}
4357
}

0 commit comments

Comments
 (0)