diff --git a/README.md b/README.md index 9dbf583a9..fd2e83ed7 100644 --- a/README.md +++ b/README.md @@ -13,9 +13,9 @@ mvn clean test \ ``` Notice that you must change properties for Aurora MySQL and Aurora Postgresql to real before running tests. ## Setup Local Environment -MySQL, Postgresql, MSSQL, DB2, MemSQL, SAP HANA are using prebuild images. +MySQL, Postgresql, MSSQL, DB2, MemSQL, SAP HANA, and Oracle are using prebuilt images. -Oracle DB image should be build separately. +MemSQL image should be configure after start. MemSQL image should be configure after start. @@ -29,9 +29,8 @@ provided for this purpose. Netezza and Teradata require VMware Player for running emulator. -* [Install Docker Compose](https://docs.docker.com/compose/install/) -* Build local docker images - * [Build Oracle DB docker image version 12.1.0.2-ee](https://github.com/oracle/docker-images/tree/master/OracleDatabase/SingleInstance) +* [Install Podman](https://podman.io/getting-started/installation) +* [Install Podman Compose](https://github.com/containers/podman-compose#installation) or use `docker-compose`. * Enter the folder with docker-compose file: ```bash cd docker-compose/db-plugins-env/ @@ -42,16 +41,16 @@ export MEMSQL_LICENSE_KEY=YOUR_LICENSE_KEY ``` * Initialize Memsql container: ```bash -docker-compose up memsql +podman-compose up memsql ``` * Start SAP HANA password service ```bash bash saphana-password-server.sh & ``` -* Start docker environment by running commands: +* Start container environment by running commands: ```bash cd docker-compose/db-plugins-env/ -docker-compose up -d +podman-compose up -d ``` * Connect to MemSQL Studio at [http://localhost:8888](http://localhost:8888) The default Username is root and Password should be left blank. @@ -100,9 +99,9 @@ grant all on *.* to 'root'@'%' identified by 'root' with grant option; * **oracle.host** - Server host. Default: localhost. * **oracle.port** - Server port. Default: 1521. * **oracle.username** - Server username. Default: SYSTEM. -* **oracle.password** - Server password. Default: 123Qwe123. -* **oracle.database** - Server sid/database. Default: cdap. -* **oracle.connectionType** - Server connection type (service/sid) Default: sid. +* **oracle.password** - Server password. Default: oracle (as set in `docker-compose.yml`). +* **oracle.database** - Server sid/database. Default: FREEPDB1. +* **oracle.connectionType** - Server connection type (service/sid) Default: service. #### Netezza * **netezza.host** - Server host. Default: localhost. * **netezza.port** - Server port. Default: 5480. diff --git a/database-commons/src/test/java/io/cdap/plugin/db/batch/DatabasePluginTestBase.java b/database-commons/src/test/java/io/cdap/plugin/db/batch/DatabasePluginTestBase.java index fa45dcee6..1e19eec51 100644 --- a/database-commons/src/test/java/io/cdap/plugin/db/batch/DatabasePluginTestBase.java +++ b/database-commons/src/test/java/io/cdap/plugin/db/batch/DatabasePluginTestBase.java @@ -123,7 +123,7 @@ protected void runETLOnce(ApplicationManager appManager, ExecutionException { final WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); workflowManager.start(arguments); - workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 5, TimeUnit.MINUTES); + workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 10, TimeUnit.MINUTES); } protected void testDBInvalidFieldType(String columnName, Schema.Type type, ETLPlugin sinkConfig, diff --git a/docker-compose/db-plugins-env/docker-compose.yml b/docker-compose/db-plugins-env/docker-compose.yml index 2b5206e9f..79b63615a 100644 --- a/docker-compose/db-plugins-env/docker-compose.yml +++ b/docker-compose/db-plugins-env/docker-compose.yml @@ -50,13 +50,16 @@ services: privileged: true oracle: - image: oracle/database:12.1.0.2-ee + image: container-registry.oracle.com/database/free:latest ports: - 1521:1521 environment: - - ORACLE_SID=cdap - - ORACLE_PDB=mydb - - ORACLE_PWD=123Qwe123 + - ORACLE_PWD=oracle + healthcheck: + test: sqlplus -L SYSTEM/oracle@//localhost:1521/FREEPDB1 @/opt/oracle/checkDBStatus.sql || exit 1 + interval: 20s + timeout: 10s + retries: 10 memsql: image: memsql/cluster-in-a-box:centos-6.8.10-a53e479edc-1.9.0-1.3.0 diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java new file mode 100644 index 000000000..865dfcfbc --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java @@ -0,0 +1,108 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.plugin.db.sink.ETLDBOutputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * Class that extends {@link ETLDBOutputFormat} to implement the abstract methods for Oracle. + */ +public class OracleETLDBOutputFormat extends ETLDBOutputFormat { + private static final Logger LOG = LoggerFactory.getLogger(OracleETLDBOutputFormat.class); + + /** + * This method is used to construct the MERGE query for Oracle. + * Example - MERGE INTO target_table USING source_table ON (target_table.id = source_table.id) + * WHEN MATCHED THEN UPDATE SET target_table.name = source_table.name, target_table.age = source_table.age + * WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source_table.id, source_table.name, source_table.age); + * + * In this context, the source is a single row represented by placeholders. + * + * @param table Name of the target table. + * @param fieldNames All the columns of the table. + * @param mergeKeys The key columns to use for the ON condition. + * @return MERGE query in the form of a string. + */ + @Override + public String constructUpsertQuery(String table, String[] fieldNames, String[] mergeKeys) { + LOG.debug("Constructing upsert query for table: {}, fields: {}, keys: {}", + table, Arrays.toString(fieldNames), Arrays.toString(mergeKeys)); + if (mergeKeys == null || mergeKeys.length == 0) { + throw new IllegalArgumentException("Merge keys must be specified for MERGE operation."); + } + if (fieldNames == null || fieldNames.length == 0) { + throw new IllegalArgumentException("Field names must be specified for MERGE operation."); + } + + String targetTable = table; + String sourceTable = "DUAL"; // We are merging a single row + + StringBuilder query = new StringBuilder(); + query.append("MERGE INTO ").append(targetTable).append(" TGT"); + query.append(" USING (SELECT "); + for (int i = 0; i < fieldNames.length; i++) { + query.append("? ").append(fieldNames[i]); + if (i < fieldNames.length - 1) { + query.append(", "); + } + } + query.append(" FROM DUAL) SRC"); + + query.append(" ON ("); + for (int i = 0; i < mergeKeys.length; i++) { + query.append("TGT.").append(mergeKeys[i]).append(" = SRC.").append(mergeKeys[i]); + if (i < mergeKeys.length - 1) { + query.append(" AND "); + } + } + query.append(")"); + + // UPDATE clause + query.append(" WHEN MATCHED THEN UPDATE SET "); + boolean firstUpdate = true; + for (String fieldName : fieldNames) { + if (!Arrays.asList(mergeKeys).contains(fieldName)) { + if (!firstUpdate) { + query.append(", "); + } + query.append("TGT.").append(fieldName).append(" = SRC.").append(fieldName); + firstUpdate = false; + } + } + if (firstUpdate) { + // Should not happen if there are non-key fields, but good to handle. + // If all fields are keys, we can put a dummy update. + query.append("TGT.").append(mergeKeys[0]).append(" = SRC.").append(mergeKeys[0]); + } + + // INSERT clause + query.append(" WHEN NOT MATCHED THEN INSERT ("); + query.append(Arrays.stream(fieldNames).collect(Collectors.joining(", "))); + query.append(") VALUES ("); + query.append(Arrays.stream(fieldNames).map(f -> "SRC." + f).collect(Collectors.joining(", "))); + query.append(")"); + + String resultQuery = query.toString(); + LOG.debug("Constructed upsert query: {}", resultQuery); + return resultQuery; + } +} diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java index 40ecfbe9e..a0ef838b0 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java @@ -23,6 +23,7 @@ import io.cdap.cdap.api.annotation.MetadataProperty; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Output; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; @@ -31,15 +32,25 @@ import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider; import io.cdap.plugin.common.db.DBErrorDetailsProvider; import io.cdap.plugin.db.DBRecord; import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.config.AbstractDBSpecificSinkConfig; import io.cdap.plugin.db.sink.AbstractDBSink; +import io.cdap.plugin.db.sink.ETLDBOutputFormat; import io.cdap.plugin.db.sink.FieldsValidator; import io.cdap.plugin.util.DBUtils; - +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -50,12 +61,13 @@ @Description("Writes records to Oracle table. Each record will be written in a row in the table") @Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = OracleConnector.NAME)}) public class OracleSink extends AbstractDBSink { - + private static final Logger LOG = LoggerFactory.getLogger(OracleSink.class); private final OracleSinkConfig oracleSinkConfig; public OracleSink(OracleSinkConfig oracleSinkConfig) { super(oracleSinkConfig); this.oracleSinkConfig = oracleSinkConfig; + LOG.debug("OracleSink constructor called."); } @Override @@ -96,6 +108,33 @@ protected DBErrorDetailsProvider getErrorDetailsProvider() { return dbErrorDetailsProvider; } + @Override + public void prepareRun(BatchSinkContext context) { + LOG.info("Entering OracleSink prepareRun for table {}.", oracleSinkConfig.getTableName()); + super.prepareRun(context); + LOG.info("Exiting OracleSink prepareRun for table {}.", oracleSinkConfig.getTableName()); + } + + @Override + protected void addOutputContext(BatchSinkContext context) { + LOG.debug("Adding output context for operation: {}", oracleSinkConfig.getOperation()); + Class outputFormatClass; + if ("upsert".equalsIgnoreCase(oracleSinkConfig.getOperation())) { + outputFormatClass = OracleETLDBOutputFormat.class; + } else { + outputFormatClass = ETLDBOutputFormat.class; + } + context.addOutput(Output.of(oracleSinkConfig.getReferenceName(), + new SinkOutputFormatProvider((Class) outputFormatClass, getConfiguration()))); + } + + @Override + public void destroy() { + LOG.info("Entering OracleSink destroy for table {}.", oracleSinkConfig.getTableName()); + super.destroy(); + LOG.info("Exiting OracleSink destroy for table {}.", oracleSinkConfig.getTableName()); + } + /** * Oracle action configuration. */ @@ -119,15 +158,42 @@ public static class OracleSinkConfig extends AbstractDBSpecificSinkConfig { @Nullable public Integer defaultBatchValue; + @Name("operation") + @Description("The write operation to perform. 'insert', 'update', or 'upsert'.") + @Macro + @Nullable + public String operation; + + @Name("mergeKeys") + @Description("Comma-separated list of columns to use as keys for the MERGE operation. " + + "Required when 'operation' is 'upsert'.") + @Macro + @Nullable + public String mergeKeys; + @Override public void validate(FailureCollector collector) { super.validate(collector); ConfigUtil.validateConnection(this, useConnection, connection, collector); + if ("upsert".equalsIgnoreCase(operation)) { + if (mergeKeys == null || mergeKeys.trim().isEmpty()) { + collector.addFailure("Merge keys must be specified when operation is 'upsert'.", null) + .withConfigProperty("mergeKeys"); + } + } } @Override protected Map getDBSpecificArguments() { - return ImmutableMap.of(OracleConstants.DEFAULT_BATCH_VALUE, String.valueOf(defaultBatchValue)); + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.put(OracleConstants.DEFAULT_BATCH_VALUE, String.valueOf(defaultBatchValue)); + if (operation != null) { + builder.put("operation", operation); + } + if (mergeKeys != null) { + builder.put("mergeKeys", mergeKeys); + } + return builder.build(); } @Override @@ -140,14 +206,26 @@ public String getEscapedTableName() { return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR; } - @Override public String getEscapedDbSchemaName() { - return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR; + String schemaName = getDBSchemaName(); + return schemaName == null ? null : ESCAPE_CHAR + schemaName + ESCAPE_CHAR; } - @Override protected OracleConnectorConfig getConnection() { return connection; } + + public List getMergeKeys() { + if (mergeKeys == null || mergeKeys.trim().isEmpty()) { + return Collections.emptyList(); + } + return Arrays.stream(mergeKeys.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + } + + public String getOperation() { + return operation == null ? "insert" : operation.toLowerCase(); + } } } diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java index 01b9a8247..c1961b23d 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java @@ -1,28 +1,19 @@ -/* - * Copyright © 2019 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - package io.cdap.plugin.oracle; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.db.ColumnType; import io.cdap.plugin.db.SchemaReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.math.BigDecimal; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.List; /** @@ -30,10 +21,10 @@ * {@link org.apache.hadoop.io.Writable}. */ public class OracleSinkDBRecord extends OracleSourceDBRecord { + private static final Logger LOG = LoggerFactory.getLogger(OracleSinkDBRecord.class); public OracleSinkDBRecord(StructuredRecord record, List columnTypes) { - this.record = record; - this.columnTypes = columnTypes; + super(record, columnTypes); } @Override @@ -41,13 +32,101 @@ protected SchemaReader getSchemaReader() { return new OracleSinkSchemaReader(); } + /** + * Used in map-reduce. Do not remove. + */ + public OracleSinkDBRecord() { + // Required for Hadoop DBWritable + } + @Override - protected void insertOperation(PreparedStatement stmt) throws SQLException { - for (int fieldIndex = 0; fieldIndex < columnTypes.size(); fieldIndex++) { - ColumnType columnType = columnTypes.get(fieldIndex); - // Get the field from the schema using the column name with ignoring case. + public void write(PreparedStatement stmt) throws SQLException { + LOG.debug("Writing record to PreparedStatement: {}", record); + // This method is called by ETLDBOutputFormat.DBSinkRecordWriter + // The operation type is not passed here, so we assume it's for MERGE + // as this custom OracleETLDBOutputFormat is only used for MERGE. + int paramIndex = 1; + for (ColumnType columnType : columnTypes) { Schema.Field field = record.getSchema().getField(columnType.getName(), true); - writeToDB(stmt, field, fieldIndex); + setField(stmt, paramIndex++, field, columnType); + } + } + + private void setField(PreparedStatement stmt, int paramIndex, Schema.Field field, ColumnType columnType) + throws SQLException { + Object val = record.get(field.getName()); + LOG.trace("Setting field: {}, index: {}, value: {}, sqlType: {}", + field.getName(), paramIndex, val, columnType.getType()); + int sqlType = columnType.getType(); + Schema fieldSchema = field.getSchema(); + fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema; + + if (val == null) { + stmt.setNull(paramIndex, sqlType); + return; + } + + switch (fieldSchema.getType()) { + case NULL: + stmt.setNull(paramIndex, sqlType); + break; + case BOOLEAN: + stmt.setBoolean(paramIndex, (Boolean) val); + break; + case INT: + stmt.setInt(paramIndex, (Integer) val); + break; + case LONG: + stmt.setLong(paramIndex, (Long) val); + break; + case FLOAT: + stmt.setFloat(paramIndex, (Float) val); + break; + case DOUBLE: + stmt.setDouble(paramIndex, (Double) val); + break; + case BYTES: + if (fieldSchema.getLogicalType() == Schema.LogicalType.DECIMAL) { + stmt.setBigDecimal(paramIndex, new BigDecimal(new String((byte[]) val))); + } else { + stmt.setBytes(paramIndex, (byte[]) val); + } + break; + case STRING: + stmt.setString(paramIndex, (String) val); + break; + case RECORD: + // You might need to handle complex types based on your data + throw new SQLException("Record types not fully supported in setField yet."); + case ARRAY: + case MAP: + case UNION: + case ENUM: + throw new SQLException("Unsupported schema type: " + fieldSchema.getType() + " for field " + field.getName()); + } + + // Handle Logical Types + Schema.LogicalType logicalType = fieldSchema.getLogicalType(); + if (logicalType != null) { + switch (logicalType) { + case DECIMAL: + stmt.setBigDecimal(paramIndex, (BigDecimal) val); + break; + case DATE: + stmt.setDate(paramIndex, java.sql.Date.valueOf(((java.time.LocalDate) val))); + break; + case TIME_MICROS: + case TIME_MILLIS: + stmt.setTime(paramIndex, java.sql.Time.valueOf(((java.time.LocalTime) val))); + break; + case TIMESTAMP_MICROS: + case TIMESTAMP_MILLIS: + LocalDateTime localDateTime = (LocalDateTime) val; + stmt.setTimestamp(paramIndex, Timestamp.valueOf(localDateTime)); + break; + default: + throw new SQLException("Unsupported logical type: " + logicalType + " for field " + field.getName()); + } } } } diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleConnectionTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleConnectionTest.java new file mode 100644 index 000000000..7c851ff43 --- /dev/null +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleConnectionTest.java @@ -0,0 +1,63 @@ +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +public class OracleConnectionTest { + + public static void main(String[] args) { + String host = System.getProperty("oracle.host", "oracle"); + String port = System.getProperty("oracle.port", "1521"); + String username = System.getProperty("oracle.username", "SYSTEM"); + String password = System.getProperty("oracle.password", "oracle"); + String database = System.getProperty("oracle.database", "FREEPDB1"); + String connectionType = System.getProperty("oracle.connectionType", "service"); + + String url; + if ("service".equalsIgnoreCase(connectionType)) { + url = String.format("jdbc:oracle:thin:@//%s:%s/%s", host, port, database); + } else { + url = String.format("jdbc:oracle:thin:@%s:%s:%s", host, port, database); + } + + System.out.println("Attempting to connect to: " + url); + + Properties props = new Properties(); + props.setProperty("user", username); + props.setProperty("password", password); + + try { + // Load the Oracle JDBC driver + Class.forName("oracle.jdbc.driver.OracleDriver"); + System.out.println("Oracle JDBC Driver Registered!"); + } catch (ClassNotFoundException e) { + System.out.println("Where is your Oracle JDBC Driver? Include in your library path!"); + e.printStackTrace(); + return; + } + + try (Connection connection = DriverManager.getConnection(url, props)) { + if (connection != null) { + System.out.println("Successfully connected to Oracle!"); + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM DUAL")) { + if (rs.next()) { + System.out.println("SELECT COUNT(*) FROM DUAL Result: " + rs.getInt(1)); + } + } + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT status FROM v$instance")) { + if (rs.next()) { + System.out.println("Instance Status: " + rs.getString(1)); + } + } + } else { + System.out.println("Failed to make connection!"); + } + } catch (Exception e) { + System.out.println("Connection Failed!"); + e.printStackTrace(); + } + } +} diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleFailedConnectionTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleFailedConnectionTest.java index 7ec6f3844..f8e0352a0 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleFailedConnectionTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleFailedConnectionTest.java @@ -33,7 +33,7 @@ public void test() throws ClassNotFoundException, IOException { super.test(JDBC_DRIVER_CLASS_NAME, connector, "Failed to create connection to database via connection string:" + " jdbc:oracle:thin:@localhost:1521:database and arguments: " + - "{user=username, oracle.jdbc.timezoneAsRegion=false, " + + "{oracle.jdbc.timezoneAsRegion=false, user=username, " + "internal_logon=normal}. Error: ConnectException: Connection " + "refused."); } diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OraclePluginTestBase.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OraclePluginTestBase.java index 6dffe633b..f278456e2 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OraclePluginTestBase.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OraclePluginTestBase.java @@ -79,6 +79,16 @@ public abstract class OraclePluginTestBase extends DatabasePluginTestBase { private static int startCount; static { + System.out.println("--- OraclePluginTestBase static block ---"); + // Initialize properties first + getProperties(); + System.setProperty("oracle.host", BASE_PROPS.get(ConnectionConfig.HOST)); + System.setProperty("oracle.port", BASE_PROPS.get(ConnectionConfig.PORT)); + System.setProperty("oracle.database", BASE_PROPS.get(ConnectionConfig.DATABASE)); + System.setProperty("oracle.username", BASE_PROPS.get(ConnectionConfig.USER)); + System.setProperty("oracle.password", BASE_PROPS.get(ConnectionConfig.PASSWORD)); + System.setProperty("oracle.connectionType", BASE_PROPS.get(OracleConstants.CONNECTION_TYPE)); + Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date(CURRENT_TS)); YEAR = calendar.get(Calendar.YEAR); @@ -90,59 +100,74 @@ private static void getProperties() { BASE_PROPS.put(ConnectionConfig.DATABASE, getPropertyOrSkip("oracle.database")); BASE_PROPS.put(ConnectionConfig.USER, getPropertyOrSkip("oracle.username")); BASE_PROPS.put(ConnectionConfig.PASSWORD, getPropertyOrSkip("oracle.password")); - BASE_PROPS.put(OracleConstants.CONNECTION_TYPE, getPropertyOrSkip("oracle.connectionType")); + BASE_PROPS.put(OracleConstants.CONNECTION_TYPE, "service"); // Force service type BASE_PROPS.put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME); BASE_PROPS.put(OracleConstants.DEFAULT_BATCH_VALUE, "10"); } @BeforeClass public static void setupTest() throws Exception { - if (startCount++ > 0) { - return; - } + try { + LOGGER.warn("--- OraclePluginTestBase.setupTest() started ---"); + if (startCount++ > 0) { + return; + } - getProperties(); + // getProperties(); // Moved to static block - setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class); - - addPluginArtifact(NamespaceId.DEFAULT.artifact(JDBC_DRIVER_NAME, "1.0.0"), - DATAPIPELINE_ARTIFACT_ID, - OracleSource.class, OracleSink.class, DBRecord.class, ETLDBOutputFormat.class, - DataDrivenETLDBInputFormat.class, DBRecord.class, OraclePostAction.class, OracleAction.class); - - Class driverClass = Class.forName(DRIVER_CLASS); - - // add oracle 3rd party plugin - PluginClass oracleDriver = new PluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, JDBC_DRIVER_NAME, - "oracle driver class", driverClass.getName(), - null, Collections.emptyMap()); - addPluginArtifact(NamespaceId.DEFAULT.artifact("oracle-jdbc-connector", "1.0.0"), - DATAPIPELINE_ARTIFACT_ID, - Sets.newHashSet(oracleDriver), driverClass); - - TimeZone.setDefault(TimeZone.getTimeZone("UTC")); - - if (OracleConstants.SERVICE_CONNECTION_TYPE.equals(BASE_PROPS.get("oracle.connectionType"))) { - connectionUrl = String.format(OracleConstants.ORACLE_CONNECTION_STRING_SERVICE_NAME_FORMAT, - BASE_PROPS.get(ConnectionConfig.HOST), - BASE_PROPS.get(ConnectionConfig.PORT), - BASE_PROPS.get(ConnectionConfig.DATABASE) - ); - } else { - connectionUrl = String.format(OracleConstants.ORACLE_CONNECTION_STRING_SID_FORMAT, - BASE_PROPS.get(ConnectionConfig.HOST), - BASE_PROPS.get(ConnectionConfig.PORT), - BASE_PROPS.get(ConnectionConfig.DATABASE) - ); - } + setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class); + + addPluginArtifact(NamespaceId.DEFAULT.artifact(JDBC_DRIVER_NAME, "1.0.0"), + DATAPIPELINE_ARTIFACT_ID, + OracleSource.class, OracleSink.class, DBRecord.class, ETLDBOutputFormat.class, + OracleETLDBOutputFormat.class, DataDrivenETLDBInputFormat.class, DBRecord.class, + OraclePostAction.class, OracleAction.class); + + Class driverClass = Class.forName(DRIVER_CLASS); + + // add oracle 3rd party plugin + PluginClass oracleDriver = new PluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, JDBC_DRIVER_NAME, + "oracle driver class", driverClass.getName(), + null, Collections.emptyMap()); + addPluginArtifact(NamespaceId.DEFAULT.artifact("oracle-jdbc-connector", "1.0.0"), + DATAPIPELINE_ARTIFACT_ID, + Sets.newHashSet(oracleDriver), driverClass); + + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + + connectionUrl = String.format("jdbc:oracle:thin:@%s:%s/%s", + BASE_PROPS.get(ConnectionConfig.HOST), + BASE_PROPS.get(ConnectionConfig.PORT), + BASE_PROPS.get(ConnectionConfig.DATABASE)); - Connection conn = createConnection(); - createTestTables(conn); - prepareTestData(conn); + LOGGER.warn("--- Connection Properties ---"); + for (Map.Entry entry : BASE_PROPS.entrySet()) { + LOGGER.warn(entry.getKey() + ": " + entry.getValue()); + } + LOGGER.warn("Connection URL: " + connectionUrl); + LOGGER.warn("--- End Connection Properties ---"); + + Connection conn = createConnection(); + createTestTables(conn); + prepareTestData(conn); + } catch (Exception e) { + LOGGER.error("Error during test setup: " + e.getMessage(), e); + throw e; + } } protected static void createTestTables(Connection conn) throws SQLException { try (Statement stmt = conn.createStatement()) { + // Best effort drop tables + dropTableIfExists(stmt, "dbActionTest"); + dropTableIfExists(stmt, "postActionTest"); + dropTableIfExists(stmt, MY_TABLE); + dropTableIfExists(stmt, MY_DEST_TABLE); + dropTableIfExists(stmt, YOUR_TABLE); + dropTableIfExists(stmt, MY_TABLE_FOR_LONG); + dropTableIfExists(stmt, MY_DEST_TABLE_FOR_LONG); + dropTableIfExists(stmt, "MERGE_DEST_TABLE"); + // create a table that the action will truncate at the end of the run stmt.execute("CREATE TABLE dbActionTest (x int, day varchar(10))"); // create a table that the action will truncate at the end of the run @@ -196,6 +221,13 @@ protected static void createTestTables(Connection conn) throws SQLException { stmt.execute(String.format(createTableWithLongFormat, MY_TABLE_FOR_LONG)); stmt.execute(String.format(createTableWithLongFormat, MY_DEST_TABLE_FOR_LONG)); + + // Table for MERGE tests + stmt.execute("CREATE TABLE MERGE_DEST_TABLE (" + + " ID INT PRIMARY KEY, " + + " NAME VARCHAR2(50), " + + " SCORE NUMBER" + + ")"); } } @@ -297,6 +329,18 @@ private static void populateData(PreparedStatement pStmt, String tableName) thro } } + private static void dropTableIfExists(Statement stmt, String tableName) { + try { + stmt.execute("DROP TABLE " + tableName); + System.out.println("Dropped table " + tableName); + } catch (SQLException e) { + // ORA-00942: table or view does not exist - expected if not exists + if (!e.getMessage().contains("ORA-00942")) { + System.err.println("Error dropping table " + tableName + ": " + e.getMessage()); + } + } + } + protected static byte[] getBfileBytes(Object bfile) throws Exception { Class bfileClass = Class.forName("oracle.sql.BFILE"); return (byte[]) bfileClass.getMethod("getBytes").invoke(bfile); @@ -322,13 +366,20 @@ private static void populateDataLong(PreparedStatement pStmt) throws SQLExceptio public static Connection createConnection() { try { Class.forName(DRIVER_CLASS); + LOGGER.warn("--- createConnection() ---"); + LOGGER.warn("Connection URL: " + connectionUrl); + DriverManager.setLoginTimeout(20); // Set timeout to 20 seconds + LOGGER.warn("DriverManager login timeout set to 20 seconds."); return DriverManager.getConnection(connectionUrl, BASE_PROPS.get(ConnectionConfig.USER), BASE_PROPS.get(ConnectionConfig.PASSWORD)); } catch (Exception e) { - throw Throwables.propagate(e); + System.err.println("Exception in createConnection: " + e.getMessage()); + e.printStackTrace(System.err); + throw new RuntimeException(e); // Re-throw as RuntimeException } } + @AfterClass public static void tearDownDB() { @@ -342,7 +393,8 @@ public static void tearDownDB() { () -> stmt.execute(String.format(dropTableFormat, "postActionTest")), () -> stmt.execute(String.format(dropTableFormat, "dbActionTest")), () -> stmt.execute(String.format(dropTableFormat, MY_DEST_TABLE)), - () -> stmt.execute(String.format(dropTableFormat, MY_DEST_TABLE_FOR_LONG))), + () -> stmt.execute(String.format(dropTableFormat, MY_DEST_TABLE_FOR_LONG)), + () -> stmt.execute(String.format(dropTableFormat, "MERGE_DEST_TABLE"))), LOGGER); } catch (Exception e) { diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OraclePluginTestSuite.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OraclePluginTestSuite.java index d5d3ebc81..1d1577381 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OraclePluginTestSuite.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OraclePluginTestSuite.java @@ -17,6 +17,7 @@ package io.cdap.plugin.oracle; import io.cdap.cdap.common.test.TestSuite; +import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -32,4 +33,14 @@ OraclePostActionTestRun.class }) public class OraclePluginTestSuite extends OraclePluginTestBase { + + static { + System.out.println("--- OraclePluginTestSuite static block --- (Class Loading)"); + } + + @BeforeClass + public static void setupTestSuite() throws Exception { + System.out.println("--- OraclePluginTestSuite.setupTestSuite() called ---"); + OraclePluginTestBase.setupTest(); + } } diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSinkTestRun.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSinkTestRun.java index 8bc22f1c2..26eab9859 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSinkTestRun.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSinkTestRun.java @@ -315,9 +315,145 @@ public void testDBSinkSingleFieldExplicitInputSchema() throws Exception { schema, Collections.singletonList(record), getSinkConfig(), Collections.singletonList(test)); } - private void testSink(String appName, - String inputDatasetName, - String tableName, + @Test + public void testDBSinkUniqueConstraintViolation() throws Exception { + String tableName = "UNIQUE_TEST_TABLE"; + // 1. Setup: Create table with Primary Key + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement()) { + try { + stmt.execute("DROP TABLE " + tableName); + } catch (SQLException e) { + // Ignore if table doesn't exist + } + stmt.execute("CREATE TABLE " + tableName + " (ID INT PRIMARY KEY, NAME VARCHAR(50))"); + } + + Schema schema = Schema.recordOf("testRecord", + Schema.Field.of("ID", Schema.of(Schema.Type.INT)), + Schema.Field.of("NAME", Schema.of(Schema.Type.STRING))); + StructuredRecord record = StructuredRecord.builder(schema).set("ID", 1).set("NAME", "Test User").build(); + + String insertSQL = "INSERT INTO " + tableName + " (ID, NAME) VALUES (?, ?)"; + + // 2. First Insert + try (Connection conn = createConnection()) { + conn.setAutoCommit(false); + try (java.sql.PreparedStatement pstmt = conn.prepareStatement(insertSQL)) { + pstmt.setInt(1, record.get("ID")); + pstmt.setString(2, record.get("NAME")); + pstmt.executeUpdate(); + conn.commit(); + } catch (SQLException e) { + conn.rollback(); + Assert.fail("First insert failed: " + e.getMessage()); + } + } + + // 3. Second Insert (Simulating Retry) + try (Connection conn = createConnection()) { + conn.setAutoCommit(false); + try (java.sql.PreparedStatement pstmt = conn.prepareStatement(insertSQL)) { + pstmt.setInt(1, record.get("ID")); + pstmt.setString(2, record.get("NAME")); + pstmt.executeUpdate(); + conn.commit(); + Assert.fail("Second insert should have failed due to unique constraint violation."); + } catch (SQLException e) { + conn.rollback(); + // 4. Assertion: Check for ORA-00001 + Assert.assertTrue("Expected ORA-00001 unique constraint violation, but got: " + e.getMessage(), + e.getMessage().contains("ORA-00001")); + System.out.println("Caught expected exception: " + e.getMessage()); + } + } finally { + // Cleanup + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement()) { + stmt.execute("DROP TABLE " + tableName); + } + } + } + + @Test + public void testDBSinkMerge() throws Exception { + String tableName = "MERGE_DEST_TABLE"; + Schema schema = Schema.recordOf("mergeRecord", + Schema.Field.of("ID", Schema.of(Schema.Type.INT)), + Schema.Field.of("NAME", Schema.of(Schema.Type.STRING)), + Schema.Field.of("SCORE", Schema.decimalOf(5, 2))); + + // Initial data in the target table + try (Connection conn = createConnection()) { + conn.setAutoCommit(false); + try (Statement stmt = conn.createStatement()) { + stmt.execute("TRUNCATE TABLE " + tableName); + stmt.execute("INSERT INTO " + tableName + " (ID, NAME, SCORE) VALUES (1, 'Alice', 75.5)"); + stmt.execute("INSERT INTO " + tableName + " (ID, NAME, SCORE) VALUES (2, 'Bob', 88.0)"); + conn.commit(); + } catch (SQLException e) { + conn.rollback(); + throw e; + } + } + // Input records for the sink + List inputRecords = new ArrayList<>(); + // This record should update ID 1 + inputRecords.add(StructuredRecord.builder(schema).set("ID", 1).set("NAME", "Alice Smith") + .setDecimal("SCORE", BigDecimal.valueOf(90.00).setScale(2)).build()); + // This record should be inserted + inputRecords.add(StructuredRecord.builder(schema).set("ID", 3).set("NAME", "Charlie") + .setDecimal("SCORE", BigDecimal.valueOf(95.50).setScale(2)).build()); + // This record should update ID 2 + inputRecords.add(StructuredRecord.builder(schema).set("ID", 2).set("NAME", "Bob Johnson") + .setDecimal("SCORE", BigDecimal.valueOf(92.00).setScale(2)).build()); + + ETLPlugin sinkConfig = new ETLPlugin( + OracleConstants.PLUGIN_NAME, + BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(AbstractDBSink.DBSinkConfig.TABLE_NAME, tableName) + .put(Constants.Reference.REFERENCE_NAME, "DBMergeTest") + .put("operation", "upsert") + .put("mergeKeys", "ID") + .build(), + null); + + testSink("testDBSinkMerge", "merge-input", tableName, schema, inputRecords, sinkConfig, Collections.emptyList()); + + // Verify the results + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT ID, NAME, SCORE FROM " + tableName + " ORDER BY ID")) { + + // ID 1 should be updated + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getInt("ID")); + Assert.assertEquals("Alice Smith", rs.getString("NAME")); + Assert.assertEquals(90.0, rs.getDouble("SCORE"), 0.001); + + // ID 2 should be updated + Assert.assertTrue(rs.next()); + Assert.assertEquals(2, rs.getInt("ID")); + Assert.assertEquals("Bob Johnson", rs.getString("NAME")); + Assert.assertEquals(92.0, rs.getDouble("SCORE"), 0.001); + + // ID 3 should be inserted + Assert.assertTrue(rs.next()); + Assert.assertEquals(3, rs.getInt("ID")); + Assert.assertEquals("Charlie", rs.getString("NAME")); + Assert.assertEquals(95.5, rs.getDouble("SCORE"), 0.001); + + Assert.assertFalse(rs.next()); + } + } + + + private void testSink(String appName, + String inputDatasetName, + String tableName, + Schema schema, List inputRecords, ETLPlugin sinkConfig, diff --git a/pom.xml b/pom.xml index 54e6ef09e..30012f045 100644 --- a/pom.xml +++ b/pom.xml @@ -401,7 +401,7 @@ maven-surefire-plugin 2.22.0 - -Xmx3g -Djava.awt.headless=true -XX:MaxPermSize=256m -XX:+UseConcMarkSweepGC -Djava.net.preferIPv4Stack=true + -Xmx3g -Djava.awt.headless=true -XX:MaxPermSize=256m -Djava.net.preferIPv4Stack=true ${surefire.redirectTestOutputToFile} false plain @@ -423,7 +423,7 @@ - + *.rst *.md **/*.cdap @@ -469,15 +468,13 @@ **/*.json **/*.json.template **/MANIFEST.MF - **/org/apache/hadoop/** - **/resources/** - + --> org.apache.maven.plugins maven-checkstyle-plugin