Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ public class JdbcPostgreSQLClient extends JdbcClient {

private static final String[] supportedInnerType = new String[] {
"int2", "int4", "int8", "smallserial", "serial",
"bigserial", "float4", "float8", "timestamp", "timestamptz",
"date", "bool", "bpchar", "varchar", "text"
"bigserial", "float4", "float8", "numeric",
"timestamp", "timestamptz", "date", "bool",
"bpchar", "varchar", "text",
"json", "jsonb", "uuid"
};

protected JdbcPostgreSQLClient(JdbcClientConfig jdbcClientConfig) {
Expand Down Expand Up @@ -172,8 +174,11 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
case "cidr":
case "inet":
case "macaddr":
case "macaddr8":
case "varbit":
case "uuid":
case "xml":
case "hstore":
case "json":
case "jsonb":
return ScalarType.createStringType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,11 @@ private Object convert(Schema fieldSchema, Object dbzObj) {
case BOOLEAN:
return Boolean.parseBoolean(dbzObj.toString());
case STRING:
case ARRAY:
case MAP:
case STRUCT:
return dbzObj.toString();
case ARRAY:
return convertToArray(fieldSchema, dbzObj);
case BYTES:
return convertToBinary(dbzObj, fieldSchema);
default:
Expand Down Expand Up @@ -325,6 +326,18 @@ private Object convertDecimal(Object dbzObj, Schema schema) {
return bigDecimal;
}

private Object convertToArray(Schema fieldSchema, Object dbzObj) {
if (dbzObj instanceof List) {
Schema elementSchema = fieldSchema.valueSchema();
List<Object> result = new ArrayList<>();
for (Object element : (List<?>) dbzObj) {
result.add(element == null ? null : convert(elementSchema, element));
}
return result;
}
return dbzObj.toString();
}

protected Object convertToTime(Object dbzObj, Schema schema) {
try {
if (dbzObj instanceof Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public static String columnToDorisType(Column column) {
/** Map a PostgreSQL native type name to a Doris type string. */
static String pgTypeNameToDorisType(String pgTypeName, int length, int scale) {
Preconditions.checkNotNull(pgTypeName);
// Debezium uses underscore prefix for PostgreSQL array types (_int4, _text, etc.)
if (pgTypeName.startsWith("_")) {
String innerDorisType = pgTypeNameToDorisType(pgTypeName.substring(1), length, scale);
return String.format("%s<%s>", DorisType.ARRAY, innerDorisType);
}
switch (pgTypeName.toLowerCase()) {
case "bool":
return DorisType.BOOLEAN;
Expand Down Expand Up @@ -268,9 +273,12 @@ static String pgTypeNameToDorisType(String pgTypeName, int length, int scale) {
case "cidr":
case "inet":
case "macaddr":
case "macaddr8":
case "varbit":
case "uuid":
case "bytea":
case "xml":
case "hstore":
return DorisType.STRING;
case "json":
case "jsonb":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,19 @@ void jsonTypes() {

@Test
void networkAndMiscTypes_isString() {
assertEquals(DorisType.STRING, map("inet", -1, -1));
assertEquals(DorisType.STRING, map("cidr", -1, -1));
assertEquals(DorisType.STRING, map("inet", -1, -1));
assertEquals(DorisType.STRING, map("cidr", -1, -1));
assertEquals(DorisType.STRING, map("macaddr", -1, -1));
assertEquals(DorisType.STRING, map("uuid", -1, -1));
assertEquals(DorisType.STRING, map("bytea", -1, -1));
assertEquals(DorisType.STRING, map("varbit", -1, -1));
assertEquals(DorisType.STRING, map("uuid", -1, -1));
assertEquals(DorisType.STRING, map("bytea", -1, -1));
assertEquals(DorisType.STRING, map("varbit", -1, -1));
}

@Test
void macaddr8XmlHstoreTypes_isString() {
assertEquals(DorisType.STRING, map("macaddr8", -1, -1));
assertEquals(DorisType.STRING, map("xml", -1, -1));
assertEquals(DorisType.STRING, map("hstore", -1, -1));
}

@Test
Expand All @@ -166,6 +173,38 @@ void geometricTypes_isString() {
assertEquals(DorisType.STRING, map("circle", -1, -1));
}

// ─── Array types ─────────────────────────────────────────────────────────

@Test
void arrayTypes() {
// covers the 10 types required by test_streaming_postgres_job_array_types
assertEquals("ARRAY<SMALLINT>", map("_int2", -1, -1));
assertEquals("ARRAY<INT>", map("_int4", -1, -1));
assertEquals("ARRAY<BIGINT>", map("_int8", -1, -1));
assertEquals("ARRAY<FLOAT>", map("_float4", -1, -1));
assertEquals("ARRAY<DOUBLE>", map("_float8", -1, -1));
assertEquals("ARRAY<BOOLEAN>", map("_bool", -1, -1));
assertEquals("ARRAY<STRING>", map("_varchar", -1, -1));
assertEquals("ARRAY<STRING>", map("_text", -1, -1));
assertEquals("ARRAY<DATETIME(6)>", map("_timestamp", -1, -1));
assertEquals("ARRAY<DATETIME(6)>", map("_timestamptz", -1, -1));
// additional types
assertEquals("ARRAY<DATE>", map("_date", -1, -1));
assertEquals("ARRAY<JSON>", map("_json", -1, -1));
assertEquals("ARRAY<JSON>", map("_jsonb", -1, -1));
}

@Test
void arrayType_numeric_defaultPrecisionScale() {
assertEquals("ARRAY<DECIMAL(38, 9)>", map("_numeric", 0, -1));
}

@Test
void arrayType_nested() {
// Two-dimensional array: __int4 → ARRAY<ARRAY<INT>>
assertEquals("ARRAY<ARRAY<INT>>", map("__int4", -1, -1));
}

// ─── Unknown type fallback ───────────────────────────────────────────────

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ bit_varying_col text Yes false \N NONE
int_array_col array<int> Yes false \N NONE
text_array_col array<text> Yes false \N NONE
point_col text Yes false \N NONE
macaddr8_col text Yes false \N NONE
xml_col text Yes false \N NONE
hstore_col text Yes false \N NONE

-- !select_all_types_null --
1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 <root><item>1</item></root> {"a":"1","b":"2"}

-- !select_all_types_null2 --
1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
2 2 200 2000 7.89 0.12 99999.000001 char2 varchar2 another text false 2025-01-01 23:59:59 23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y": 20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw== [10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0}
1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 <root><item>1</item></root> {"a":"1","b":"2"}
2 2 200 2000 7.89 0.12 99999.000001 char2 varchar2 another text false 2025-01-01 23:59:59 23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y": 20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw== [10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0} 08:00:2b:aa:bb:cc:dd:ee <root><item>2</item></root> {"x":"10","y":"20"}

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !desc_array_types --
id bigint No true \N
int2_array_col array<smallint> Yes false \N NONE
int4_array_col array<int> Yes false \N NONE
int8_array_col array<bigint> Yes false \N NONE
float4_array_col array<float> Yes false \N NONE
double_array_col array<double> Yes false \N NONE
bool_array_col array<boolean> Yes false \N NONE
varchar_array_col array<text> Yes false \N NONE
text_array_col array<text> Yes false \N NONE
timestamp_array_col array<datetime(6)> Yes false \N NONE
timestamptz_array_col array<datetime(6)> Yes false \N NONE

-- !select_array_types --
1 [1, 2] [10, 20] [100, 200] [1.1, 2.2] [1.11, 2.22] [1, 0] ["foo", "bar"] ["hello", "world"] ["2024-01-01 12:00:00.000000", "2024-06-01 00:00:00.000000"] ["2024-01-01 04:00:00.000000", "2024-06-01 00:00:00.000000"]

-- !select_array_types2 --
1 [1, 2] [10, 20] [100, 200] [1.1, 2.2] [1.11, 2.22] [1, 0] ["foo", "bar"] ["hello", "world"] ["2024-01-01 12:00:00.000000", "2024-06-01 00:00:00.000000"] ["2024-01-01 04:00:00.000000", "2024-06-01 00:00:00.000000"]
2 [3, 4] [30, 40] [300, 400] [3.3, 4.4] [3.33, 4.44] [0, 1] ["baz", "qux"] ["foo", "bar"] ["2025-01-01 06:00:00.000000", "2025-06-01 18:00:00.000000"] ["2024-12-31 22:00:00.000000", "2025-06-01 18:00:00.000000"]

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,ex
// create test
connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
// sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
sql """CREATE EXTENSION IF NOT EXISTS hstore"""
sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
sql """
create table ${pgDB}.${pgSchema}.${table1} (
Expand Down Expand Up @@ -74,12 +75,15 @@ suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,ex
bit_varying_col bit varying(16),
int_array_col integer[],
text_array_col text[],
point_col point
point_col point,
macaddr8_col macaddr8,
xml_col xml,
hstore_col hstore
);
"""
// mock snapshot data
sql """
INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)');
INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)','08:00:2b:01:02:03:04:05'::macaddr8,'<root><item>1</item></root>'::xml,'a=>1,b=>2'::hstore);
"""
}

Expand Down Expand Up @@ -125,7 +129,7 @@ suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,ex

// mock incremental into
connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)');"""
sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)','08:00:2b:aa:bb:cc:dd:ee'::macaddr8,'<root><item>2</item></root>'::xml,'x=>10,y=>20'::hstore);"""
}

sleep(60000); // wait for cdc incremental data
Expand Down
Loading
Loading