Skip to content

Conversation

@suhwan-cheon
Copy link
Contributor

@suhwan-cheon suhwan-cheon commented Sep 14, 2025

issue: https://issues.apache.org/jira/browse/FLINK-38247

Issue

An infinite loop occurred when using the MySqlChunkSplitter to split a table with a MySQL BIGINT UNSIGNED primary key. (This problem happens when the primary key value exceeds Long.MAX_VALUE)


Solution

I added StatementUtils.setSafeObject to detects the overflow and correctly converts the value to a BigDecimal before setting it in the PreparedStatement.


Verification

  • I added StatementUtilsTest to verify the correctness of StatementUtils.setSafeObject.
  • I used Java's dynamic proxy (createPreparedStatementProxy) to avoid creating a verbose mock class, simplifying the test code.

@lvyanquan
Copy link
Contributor

Please add an itcase in MySqlSourceITCase.

// Therefore, we need to handle the overflow issue by converting the long value to BigDecimal.
public static void setSafeObject(PreparedStatement ps, int parameterIndex, Object value)
throws SQLException {
if (value instanceof Long && (Long) value < 0L) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be BigInteger type, you can verify this in itcase.

Copy link
Contributor

@lvyanquan lvyanquan Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of sql lists for possible test case:



CREATE TABLE `unsigned_bigint_pk`
--
(
`order_id`  BIGINT UNSIGNED NOT NULL,
`desc`  varchar(512) NOT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO unsigned_bigint_pk
VALUES (1,  'flink'),
(2,  'flink'),
(3,  'flink'),
(4,  'flink'),
(5,  'flink'),
(6,  'flink'),
(7,  'flink'),
(8,  'flink'),
(9,  'flink'),
(10,  'flink'),
(11,  'flink'),
(12,  'flink'),
(18446744073709551604,  'flink'),
(18446744073709551605,  'flink'),
(18446744073709551606,  'flink'),
(18446744073709551607,  'flink'),
(18446744073709551608,  'flink'),
(18446744073709551609,  'flink'),
(18446744073709551610,  'flink'),
(18446744073709551611,  'flink'),
(18446744073709551612,  'flink'),
(18446744073709551613,  'flink'),
(18446744073709551614,  'flink'),
(18446744073709551615,  'flink');


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lvyanquan
Hello! I added itcase in MySqlSourceITCase.

I think JDBC may surface BIGINT UNSIGNED as Long (values > Long.MAX_VALUE appear negative due to two’s complement)
So In this code, 1) detect negative Longs 2) bind them as BigInteger, ensuring values near 2^64−1 are handled correctly.
I Added an IT in MySqlSourceITCase that creates unsigned_bigint_pk and verifies boundary values. And I saw that the test worked well.

@lvyanquan
Copy link
Contributor

Run 'mvn spotless:apply' to fix these violations.

@suhwan-cheon
Copy link
Contributor Author

Run 'mvn spotless:apply' to fix these violations.

I've applied it. Can you run the test again?

@suhwan-cheon
Copy link
Contributor Author

there's a Checkstyle error. I'm not used to this environment, so I made a mistake.
I've fixed Checkstyle error and ran the MySQL connector unit tests with mvn -pl flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc -DskipITs=true test
please review.

.tableList(db + "." + table)
.deserializer(deserializer)
.startupOptions(StartupOptions.initial())
.chunkKeyColumn(new ObjectPath(db, table), "order_id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You add .splitSize(2) here and you can set rootLogger.level in log4j2-test.properties to INFO to see the actual split information.
like:


gners.MySqlChunkSplitter - ChunkSplitter has split 2820 chunks for table customer_kgqlle.unsigned_bigint_pk
72864 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 2
72869 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 3
72871 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 4
72875 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 5
72878 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 6
72879 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 7
72881 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 8
72882 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 9
72884 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 10
72885 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 11
72990 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - ChunkSplitter has split 2830 chunks for table customer_kgqlle.unsigned_bigint_pk
72990 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 12
72992 [snapshot-splitting] INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 18446744073709551604

Which means that this problem was not resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply.
I've checked that there's a problem, and I'll try to fix it as soon as possible!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lvyanquan
I added .splitSize(2) and I found my logic was wrong.
(I realized that even if I delivered BigInteger to setObject(), it would be converted to long...)

So, I modified it to use setBigDecimal() when it was BigInteger.
Spotless, test has been performed, please check it.

@lvyanquan lvyanquan added this to the V3.6.0 milestone Sep 24, 2025
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes MySQL table chunk-splitting for BIGINT UNSIGNED primary keys when values exceed Long.MAX_VALUE by avoiding PreparedStatement#setObject overflow behavior during boundary queries.

Changes:

  • Introduce StatementUtils.setSafeObject to bind BigInteger values safely (via BigDecimal) in PreparedStatements.
  • Apply setSafeObject to chunk-boundary and split-scan prepared statements in StatementUtils.
  • Add unit and integration tests covering unsigned BIGINT chunking and the new binding behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
.../src/main/java/.../utils/StatementUtils.java Adds setSafeObject and uses it when binding split/chunk boundary parameters to prevent overflow-driven infinite loops.
.../src/test/java/.../utils/StatementUtilsTest.java Adds unit tests validating BigInteger binding is routed through setBigDecimal and other values still use setObject.
.../src/test/java/.../source/MySqlSourceITCase.java Adds an IT case that creates a BIGINT UNSIGNED PK table with near-2^64-1 values and verifies initial snapshot chunking reads all rows.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +86 to +90
throws SQLException {
if (value instanceof BigInteger) {
ps.setBigDecimal(parameterIndex, new BigDecimal((BigInteger) value));
} else {
ps.setObject(parameterIndex, value);
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says setSafeObject “detects the overflow”, but the implementation converts all BigInteger values to BigDecimal (even when the value would fit in a signed long). If the JDBC driver returns BigInteger for all BIGINT UNSIGNED values (including small ones), this changes parameter typing from BIGINT to DECIMAL for every bound, which can affect query planning/index usage. Consider only converting when value is outside Long.MIN_VALUE..Long.MAX_VALUE (or when explicitly handling unsigned overflow), and otherwise keep setObject/setLong for the common case.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants