From b28908a28b92b70751f641844dfda4b8ffd77e31 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 17 Mar 2026 18:51:40 +0800 Subject: [PATCH 1/3] add data type macaddr8 and xm --- .../datasource/jdbc/client/JdbcPostgreSQLClient.java | 2 ++ .../apache/doris/cdcclient/utils/SchemaChangeHelper.java | 2 ++ .../doris/cdcclient/utils/SchemaChangeHelperTest.java | 6 ++++++ .../cdc/test_streaming_postgres_job_all_type.out | 8 +++++--- .../cdc/test_streaming_postgres_job_all_type.groovy | 8 +++++--- 5 files changed, 20 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java index 8d449ad33f10ee..fb26a232990f12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java @@ -172,8 +172,10 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { case "cidr": case "inet": case "macaddr": + case "macaddr8": case "varbit": case "uuid": + case "xml": case "json": case "jsonb": return ScalarType.createStringType(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java index 5eea4f1f16f61a..3f8c525c0ed300 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java @@ -268,9 +268,11 @@ static String pgTypeNameToDorisType(String pgTypeName, int length, int scale) { case "cidr": case "inet": case "macaddr": + case "macaddr8": case "varbit": case "uuid": case "bytea": + case "xml": return DorisType.STRING; case "json": case "jsonb": diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java index b71fe609d4ace9..9a869a7924b59b 100644 --- a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java @@ -155,6 +155,12 @@ void networkAndMiscTypes_isString() { assertEquals(DorisType.STRING, map("varbit", -1, -1)); } + @Test + void macaddr8AndXmlTypes_isString() { + assertEquals(DorisType.STRING, map("macaddr8", -1, -1)); + assertEquals(DorisType.STRING, map("xml", -1, -1)); + } + @Test void geometricTypes_isString() { assertEquals(DorisType.STRING, map("point", -1, -1)); diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out index 878a4a01f72cff..5edd2ef91248d2 100644 --- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out @@ -29,11 +29,13 @@ bit_varying_col text Yes false \N NONE int_array_col array Yes false \N NONE text_array_col array Yes false \N NONE point_col text Yes false \N NONE +macaddr8_col text Yes false \N NONE +xml_col text Yes false \N NONE -- !select_all_types_null -- -1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} +1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 -- !select_all_types_null2 -- -1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} -2 2 200 2000 7.89 0.12 99999.000001 char2 varchar2 another text false 2025-01-01 23:59:59 23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y": 20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw== [10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0} +1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 +2 2 200 2000 7.89 0.12 99999.000001 char2 varchar2 another text false 2025-01-01 23:59:59 23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y": 20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw== [10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0} 08:00:2b:aa:bb:cc:dd:ee 2 diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy index 57666ddf5db16a..bffbd70447b72b 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy @@ -74,12 +74,14 @@ suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,ex bit_varying_col bit varying(16), int_array_col integer[], text_array_col text[], - point_col point + point_col point, + macaddr8_col macaddr8, + xml_col xml ); """ // mock snapshot data sql """ - INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)'); + INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)','08:00:2b:01:02:03:04:05'::macaddr8,'1'::xml); """ } @@ -125,7 +127,7 @@ suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,ex // mock incremental into connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { - sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)','08:00:2b:aa:bb:cc:dd:ee'::macaddr8,'2'::xml);""" } sleep(60000); // wait for cdc incremental data From a4a7e71cc40453a977ac52123615e81256c1998a Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 18 Mar 2026 14:59:43 +0800 Subject: [PATCH 2/3] add hstore and array transform --- .../jdbc/client/JdbcPostgreSQLClient.java | 7 +- .../deserialize/DebeziumJsonDeserializer.java | 15 +- .../cdcclient/utils/SchemaChangeHelper.java | 6 + .../utils/SchemaChangeHelperTest.java | 46 +++++- .../test_streaming_postgres_job_all_type.out | 7 +- ...est_streaming_postgres_job_array_types.out | 21 +++ ...est_streaming_postgres_job_all_type.groovy | 8 +- ..._streaming_postgres_job_array_types.groovy | 147 ++++++++++++++++++ 8 files changed, 242 insertions(+), 15 deletions(-) create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java index fb26a232990f12..6ee651ad24e1ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java @@ -42,8 +42,10 @@ public class JdbcPostgreSQLClient extends JdbcClient { private static final String[] supportedInnerType = new String[] { "int2", "int4", "int8", "smallserial", "serial", - "bigserial", "float4", "float8", "timestamp", "timestamptz", - "date", "bool", "bpchar", "varchar", "text" + "bigserial", "float4", "float8", "numeric", + "timestamp", "timestamptz", "date", "bool", + "bpchar", "varchar", "text", + "json", "jsonb", "uuid" }; protected JdbcPostgreSQLClient(JdbcClientConfig jdbcClientConfig) { @@ -176,6 +178,7 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { case "varbit": case "uuid": case "xml": + case "hstore": case "json": case "jsonb": return ScalarType.createStringType(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java index 065a3da2c09b16..541b8b3e66fc35 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java @@ -181,10 +181,11 @@ private Object convert(Schema fieldSchema, Object dbzObj) { case BOOLEAN: return Boolean.parseBoolean(dbzObj.toString()); case STRING: - case ARRAY: case MAP: case STRUCT: return dbzObj.toString(); + case ARRAY: + return convertToArray(fieldSchema, dbzObj); case BYTES: return convertToBinary(dbzObj, fieldSchema); default: @@ -325,6 +326,18 @@ private Object convertDecimal(Object dbzObj, Schema schema) { return bigDecimal; } + private Object convertToArray(Schema fieldSchema, Object dbzObj) { + if (dbzObj instanceof List) { + Schema elementSchema = fieldSchema.valueSchema(); + List result = new ArrayList<>(); + for (Object element : (List) dbzObj) { + result.add(element == null ? null : convert(elementSchema, element)); + } + return result; + } + return dbzObj.toString(); + } + protected Object convertToTime(Object dbzObj, Schema schema) { try { if (dbzObj instanceof Long) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java index 3f8c525c0ed300..aed0bdf7ea6dd7 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java @@ -208,6 +208,11 @@ public static String columnToDorisType(Column column) { /** Map a PostgreSQL native type name to a Doris type string. */ static String pgTypeNameToDorisType(String pgTypeName, int length, int scale) { Preconditions.checkNotNull(pgTypeName); + // Debezium uses underscore prefix for PostgreSQL array types (_int4, _text, etc.) + if (pgTypeName.startsWith("_")) { + String innerDorisType = pgTypeNameToDorisType(pgTypeName.substring(1), length, scale); + return String.format("%s<%s>", DorisType.ARRAY, innerDorisType); + } switch (pgTypeName.toLowerCase()) { case "bool": return DorisType.BOOLEAN; @@ -273,6 +278,7 @@ static String pgTypeNameToDorisType(String pgTypeName, int length, int scale) { case "uuid": case "bytea": case "xml": + case "hstore": return DorisType.STRING; case "json": case "jsonb": diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java index 9a869a7924b59b..c9a63564c769d4 100644 --- a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java @@ -147,18 +147,20 @@ void jsonTypes() { @Test void networkAndMiscTypes_isString() { - assertEquals(DorisType.STRING, map("inet", -1, -1)); - assertEquals(DorisType.STRING, map("cidr", -1, -1)); + assertEquals(DorisType.STRING, map("inet", -1, -1)); + assertEquals(DorisType.STRING, map("cidr", -1, -1)); assertEquals(DorisType.STRING, map("macaddr", -1, -1)); - assertEquals(DorisType.STRING, map("uuid", -1, -1)); - assertEquals(DorisType.STRING, map("bytea", -1, -1)); - assertEquals(DorisType.STRING, map("varbit", -1, -1)); + assertEquals(DorisType.STRING, map("uuid", -1, -1)); + assertEquals(DorisType.STRING, map("bytea", -1, -1)); + assertEquals(DorisType.STRING, map("varbit", -1, -1)); + assertEquals(DorisType.STRING, map("hstore", -1, -1)); } @Test - void macaddr8AndXmlTypes_isString() { + void macaddr8XmlHstoreTypes_isString() { assertEquals(DorisType.STRING, map("macaddr8", -1, -1)); assertEquals(DorisType.STRING, map("xml", -1, -1)); + assertEquals(DorisType.STRING, map("hstore", -1, -1)); } @Test @@ -172,6 +174,38 @@ void geometricTypes_isString() { assertEquals(DorisType.STRING, map("circle", -1, -1)); } + // ─── Array types ───────────────────────────────────────────────────────── + + @Test + void arrayTypes() { + // covers the 10 types required by test_streaming_postgres_job_array_types + assertEquals("ARRAY", map("_int2", -1, -1)); + assertEquals("ARRAY", map("_int4", -1, -1)); + assertEquals("ARRAY", map("_int8", -1, -1)); + assertEquals("ARRAY", map("_float4", -1, -1)); + assertEquals("ARRAY", map("_float8", -1, -1)); + assertEquals("ARRAY", map("_bool", -1, -1)); + assertEquals("ARRAY", map("_varchar", -1, -1)); + assertEquals("ARRAY", map("_text", -1, -1)); + assertEquals("ARRAY", map("_timestamp", -1, -1)); + assertEquals("ARRAY", map("_timestamptz", -1, -1)); + // additional types + assertEquals("ARRAY", map("_date", -1, -1)); + assertEquals("ARRAY", map("_json", -1, -1)); + assertEquals("ARRAY", map("_jsonb", -1, -1)); + } + + @Test + void arrayType_numeric_defaultPrecisionScale() { + assertEquals("ARRAY", map("_numeric", 0, -1)); + } + + @Test + void arrayType_nested() { + // Two-dimensional array: __int4 → ARRAY> + assertEquals("ARRAY>", map("__int4", -1, -1)); + } + // ─── Unknown type fallback ─────────────────────────────────────────────── @Test diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out index 5edd2ef91248d2..abdc4038774d5d 100644 --- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out @@ -31,11 +31,12 @@ text_array_col array Yes false \N NONE point_col text Yes false \N NONE macaddr8_col text Yes false \N NONE xml_col text Yes false \N NONE +hstore_col text Yes false \N NONE -- !select_all_types_null -- -1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 +1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 {"a":"1","b":"2"} -- !select_all_types_null2 -- -1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 -2 2 200 2000 7.89 0.12 99999.000001 char2 varchar2 another text false 2025-01-01 23:59:59 23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y": 20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw== [10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0} 08:00:2b:aa:bb:cc:dd:ee 2 +1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 {"a":"1","b":"2"} +2 2 200 2000 7.89 0.12 99999.000001 char2 varchar2 another text false 2025-01-01 23:59:59 23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y": 20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw== [10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0} 08:00:2b:aa:bb:cc:dd:ee 2 {"x":"10","y":"20"} diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out new file mode 100644 index 00000000000000..4493da5abcaf9d --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out @@ -0,0 +1,21 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !desc_array_types -- +id bigint No true \N +int2_array_col array Yes false \N NONE +int4_array_col array Yes false \N NONE +int8_array_col array Yes false \N NONE +float4_array_col array Yes false \N NONE +double_array_col array Yes false \N NONE +bool_array_col array Yes false \N NONE +varchar_array_col array Yes false \N NONE +text_array_col array Yes false \N NONE +timestamp_array_col array Yes false \N NONE +timestamptz_array_col array Yes false \N NONE + +-- !select_array_types -- +1 [1, 2] [10, 20] [100, 200] [1.1, 2.2] [1.11, 2.22] [1, 0] ["foo", "bar"] ["hello", "world"] ["2024-01-01 12:00:00.000000", "2024-06-01 00:00:00.000000"] ["2024-01-01 04:00:00.000000", "2024-06-01 00:00:00.000000"] + +-- !select_array_types2 -- +1 [1, 2] [10, 20] [100, 200] [1.1, 2.2] [1.11, 2.22] [1, 0] ["foo", "bar"] ["hello", "world"] ["2024-01-01 12:00:00.000000", "2024-06-01 00:00:00.000000"] ["2024-01-01 04:00:00.000000", "2024-06-01 00:00:00.000000"] +2 [3, 4] [30, 40] [300, 400] [3.3, 4.4] [3.33, 4.44] [0, 1] ["baz", "qux"] ["foo", "bar"] ["2025-01-01 06:00:00.000000", "2025-06-01 18:00:00.000000"] ["2024-12-31 22:00:00.000000", "2025-06-01 18:00:00.000000"] + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy index bffbd70447b72b..9ec4ab3a52e6b8 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy @@ -43,6 +43,7 @@ suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,ex // create test connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}""" + sql """CREATE EXTENSION IF NOT EXISTS hstore""" sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" sql """ create table ${pgDB}.${pgSchema}.${table1} ( @@ -76,12 +77,13 @@ suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,ex text_array_col text[], point_col point, macaddr8_col macaddr8, - xml_col xml + xml_col xml, + hstore_col hstore ); """ // mock snapshot data sql """ - INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)','08:00:2b:01:02:03:04:05'::macaddr8,'1'::xml); + INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)','08:00:2b:01:02:03:04:05'::macaddr8,'1'::xml,'a=>1,b=>2'::hstore); """ } @@ -127,7 +129,7 @@ suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,ex // mock incremental into connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { - sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)','08:00:2b:aa:bb:cc:dd:ee'::macaddr8,'2'::xml);""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)','08:00:2b:aa:bb:cc:dd:ee'::macaddr8,'2'::xml,'x=>10,y=>20'::hstore);""" } sleep(60000); // wait for cdc incremental data diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy new file mode 100644 index 00000000000000..3ae65b118d39b8 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_postgres_job_array_types", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_array_types_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "streaming_array_types_pg" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """ + CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + id bigserial PRIMARY KEY, + int2_array_col int2[], + int4_array_col int4[], + int8_array_col int8[], + float4_array_col float4[], + double_array_col double precision[], + bool_array_col bool[], + varchar_array_col varchar(50)[], + text_array_col text[], + timestamp_array_col timestamp[], + timestamptz_array_col timestamptz[] + ); + """ + // mock snapshot data + sql """ + INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES ( + 1, + ARRAY[1::int2, 2::int2], + ARRAY[10::int4, 20::int4], + ARRAY[100::int8, 200::int8], + ARRAY[1.1::float4, 2.2::float4], + ARRAY[1.11::double precision, 2.22::double precision], + ARRAY[true, false], + ARRAY['foo'::varchar, 'bar'::varchar], + ARRAY['hello', 'world'], + ARRAY['2024-01-01 12:00:00'::timestamp, '2024-06-01 00:00:00'::timestamp], + ARRAY['2024-01-01 12:00:00+08'::timestamptz, '2024-06-01 00:00:00+00'::timestamptz] + ); + """ + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // check job running + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + jobSuccendCount.size() == 1 && '1' <= jobSuccendCount.get(0).get(0) + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + qt_desc_array_types """desc ${currentDb}.${table1};""" + qt_select_array_types """select * from ${currentDb}.${table1} order by 1;""" + + // mock incremental data + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ + INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES ( + 2, + ARRAY[3::int2, 4::int2], + ARRAY[30::int4, 40::int4], + ARRAY[300::int8, 400::int8], + ARRAY[3.3::float4, 4.4::float4], + ARRAY[3.33::double precision, 4.44::double precision], + ARRAY[false, true], + ARRAY['baz'::varchar, 'qux'::varchar], + ARRAY['foo', 'bar'], + ARRAY['2025-01-01 06:00:00'::timestamp, '2025-06-01 18:00:00'::timestamp], + ARRAY['2025-01-01 06:00:00+08'::timestamptz, '2025-06-01 18:00:00+00'::timestamptz] + ); + """ + } + + sleep(60000); // wait for cdc incremental data + + qt_select_array_types2 """select * from ${currentDb}.${table1} order by 1;""" + + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} From affff4c8f86fe430017993648341f4acbccb78ec Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 18 Mar 2026 17:03:17 +0800 Subject: [PATCH 3/3] fix --- .../org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java index c9a63564c769d4..9a80d804cdcc2d 100644 --- a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java @@ -153,7 +153,6 @@ void networkAndMiscTypes_isString() { assertEquals(DorisType.STRING, map("uuid", -1, -1)); assertEquals(DorisType.STRING, map("bytea", -1, -1)); assertEquals(DorisType.STRING, map("varbit", -1, -1)); - assertEquals(DorisType.STRING, map("hstore", -1, -1)); } @Test