diff --git a/pom.xml b/pom.xml
index 530680dcb..fc2cb8212 100644
--- a/pom.xml
+++ b/pom.xml
@@ -226,6 +226,11 @@
kafka-connect-storage-common-avatica-shaded
${kafka.connect.storage.common.version}
+
+ com.ibm.db2
+ jcc
+ 11.5.8.0
+
com.google.guava
guava
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/FilteredColumnToStructVisitor.java b/src/main/java/io/confluent/connect/hdfs/jdbc/FilteredColumnToStructVisitor.java
new file mode 100644
index 000000000..146da0ce4
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/FilteredColumnToStructVisitor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import org.apache.kafka.connect.data.Struct;
+
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+
+public class FilteredColumnToStructVisitor extends FilteredJdbcColumnVisitor {
+ private final Struct struct;
+
+ public FilteredColumnToStructVisitor(HashCache hashCache,
+ JdbcTableInfo tableInfo,
+ String primaryKey,
+ Struct struct) {
+ super(hashCache, tableInfo, primaryKey);
+ this.struct = struct;
+ }
+
+ @Override
+ public void visit(String columnName, Blob value) throws SQLException {
+ // TODO: Would be so much better if we could stream this data
+ // TODO: Write to a disk-buffer first, and then digest()? RocksDB?
+ byte[] bytes = value != null
+ ? value.getBytes(1L, (int) value.length())
+ : null;
+
+ updateCache(columnName, bytes);
+
+ struct.put(columnName, bytes);
+ }
+
+ @Override
+ public void visit(String columnName, Clob value) throws SQLException {
+ // TODO: Would be so much better if we could stream this data
+ // TODO: Write to a disk-buffer first, and then digest()? RocksDB?
+ String valueStr = value != null
+ ? value.getSubString(1L, (int) value.length())
+ : null;
+
+ updateCache(columnName, valueStr);
+
+ struct.put(columnName, valueStr);
+ }
+
+ @Override
+ public void visit(String columnName, SQLXML value) throws SQLException {
+ // TODO: Would be so much better if we could stream this data
+ // TODO: Write to a disk-buffer first, and then digest()? RocksDB?
+ String valueStr = value != null
+ ? value.getString()
+ : null;
+
+ updateCache(columnName, valueStr);
+
+ struct.put(columnName, valueStr);
+ }
+
+ @Override
+ public void visit(String columnName, String value) {
+ updateCache(columnName, value);
+
+ struct.put(columnName, value);
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/FilteredJdbcColumnVisitor.java b/src/main/java/io/confluent/connect/hdfs/jdbc/FilteredJdbcColumnVisitor.java
new file mode 100644
index 000000000..e76be5cff
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/FilteredJdbcColumnVisitor.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import java.util.Optional;
+
+public abstract class FilteredJdbcColumnVisitor implements JdbcColumnVisitor {
+ private final HashCache hashCache;
+ protected final JdbcTableInfo tableInfo;
+ protected final String primaryKey;
+ private int columnsChanged = 0;
+
+ public FilteredJdbcColumnVisitor(HashCache hashCache,
+ JdbcTableInfo tableInfo,
+ String primaryKey) {
+ this.hashCache = hashCache;
+ this.tableInfo = tableInfo;
+ this.primaryKey = primaryKey;
+ }
+
+ public boolean hasChangedColumns() {
+ return columnsChanged > 0;
+ }
+
+ protected void updateCache(String columnName, byte[] value) {
+ boolean columnChanged = Optional
+ .ofNullable(hashCache)
+ .map(hashCache_ -> hashCache_.updateCache(tableInfo, primaryKey, columnName, value))
+ .orElse(true);
+
+ // If it has changed, indicate that we should write the new value to HDFS
+ if (columnChanged) {
+ columnsChanged++;
+ }
+ }
+
+ protected void updateCache(String columnName, String value) {
+ boolean columnChanged = Optional
+ .ofNullable(hashCache)
+ .map(hashCache_ -> hashCache_.updateCache(tableInfo, primaryKey, columnName, value))
+ .orElse(true);
+
+ // If it has changed, indicate that we should write the new value to HDFS
+ if (columnChanged) {
+ columnsChanged++;
+ }
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/HashCache.java b/src/main/java/io/confluent/connect/hdfs/jdbc/HashCache.java
new file mode 100644
index 000000000..a81c89b0c
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/HashCache.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Cache of LOB hashes, for pruning unnecessary HDFS writes;
+ * NOTE: None of this is synchronized, as it expects to be called from non-parallel code.
+ * NOTE: MD5 Hashing takes about 1-2 seconds per gig, at least locally
+ */
+public class HashCache {
+ private static final byte[] EMPTY_HASH = {};
+
+ private final Map>>
+ tablePkColumnCache = new HashMap<>();
+
+ private final int maxHashSize;
+ private final MessageDigest messageDigest;
+
+ public HashCache(int maxHashSize, MessageDigest messageDigest) {
+ this.maxHashSize = maxHashSize;
+ this.messageDigest = messageDigest;
+ }
+
+ public boolean updateCache(JdbcTableInfo tableInfo,
+ String primaryKey,
+ String columnName,
+ String value
+ ) {
+ byte[] bytes = Optional
+ .ofNullable(value)
+ // TODO: Should we hardcode UTF_8 here?
+ .map(value_ -> value_.getBytes(StandardCharsets.UTF_8))
+ .orElse(null);
+
+ return updateCache(tableInfo, primaryKey, columnName, bytes);
+ }
+
+ /**
+ * Update cache; and return true if hash has changed
+ */
+ public boolean updateCache(JdbcTableInfo tableInfo,
+ String primaryKey,
+ String columnName,
+ byte[] value
+ ) {
+ byte[] hash = Optional
+ .ofNullable(value)
+ .map(bytes -> {
+ messageDigest.reset();
+ return messageDigest.digest(bytes);
+ })
+ .orElse(EMPTY_HASH);
+
+ // Get the Cache for the given Table + PK
+ Map> pkColumnHashCache = getPkColumnCache(tableInfo);
+ Map columnHashCache = pkColumnHashCache.computeIfAbsent(
+ primaryKey,
+ __ -> new HashMap<>()
+ );
+
+ // Check the hash against the cache
+ byte[] oldHash = columnHashCache.put(columnName, hash);
+ return !Arrays.equals(hash, oldHash);
+ }
+
+ private LinkedHashMap> getPkColumnCache(
+ JdbcTableInfo tableInfo
+ ) {
+ // The entire tablePkColumnCache map structure is accessed serially,
+ // so no need to make it threadsafe
+ return tablePkColumnCache.computeIfAbsent(
+ tableInfo,
+ __ -> new LinkedHashMap>(
+ 100,
+ .75f,
+ true
+ ) {
+ // NOTE: Turns LinkedHashMap into an LRU Cache
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry> eldest
+ ) {
+ return this.size() > maxHashSize;
+ }
+ }
+ );
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcColumnInfo.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcColumnInfo.java
new file mode 100644
index 000000000..4505d8568
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcColumnInfo.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import java.sql.JDBCType;
+import java.util.Comparator;
+import java.util.Objects;
+
+public class JdbcColumnInfo {
+ private final String name;
+ private final JDBCType jdbcType;
+ private final int ordinal;
+ private final boolean nullable;
+
+ public static Comparator byOrdinal =
+ Comparator.comparingInt(JdbcColumnInfo::getOrdinal);
+
+ public JdbcColumnInfo(String name,
+ JDBCType jdbcType,
+ int ordinal,
+ boolean nullable) {
+ this.name = name;
+ this.jdbcType = jdbcType;
+ this.ordinal = ordinal;
+ this.nullable = nullable;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public JDBCType getJdbcType() {
+ return jdbcType;
+ }
+
+ public int getOrdinal() {
+ return ordinal;
+ }
+
+ public boolean isNullable() {
+ return nullable;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JdbcColumnInfo)) {
+ return false;
+ }
+ JdbcColumnInfo column = (JdbcColumnInfo) o;
+ return Objects.equals(getName(), column.getName())
+ && getJdbcType() == column.getJdbcType()
+ && getOrdinal() == column.getOrdinal()
+ && isNullable() == column.isNullable();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getName(), getJdbcType(), getOrdinal(), isNullable());
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName()
+ + "{"
+ + "name='" + name + "'"
+ + ", jdbcType=" + jdbcType
+ + ", ordinal=" + ordinal
+ + ", nullable=" + nullable
+ + "}";
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcColumnVisitor.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcColumnVisitor.java
new file mode 100644
index 000000000..a3f23d60e
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcColumnVisitor.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+
+public interface JdbcColumnVisitor {
+ //void visit(String columnName, Array value);
+
+ //void visit(String columnName, BigDecimal value);
+
+ void visit(String columnName, Blob value) throws SQLException;
+
+ //void visit(String columnName, Boolean value);
+
+ //void visit(String columnName, Byte value);
+
+ void visit(String columnName, Clob value) throws SQLException;
+
+ //void visit(String columnName, Date value);
+
+ //void visit(String columnName, Double value);
+
+ //void visit(String columnName, Float value);
+
+ //void visit(String columnName, Integer value);
+
+ //void visit(String columnName, Long value);
+
+ //void visit(String columnName, Ref value);
+
+ //void visit(String columnName, Short value);
+
+ void visit(String columnName, SQLXML value) throws SQLException;
+
+ void visit(String columnName, String value);
+
+ //void visit(String columnName, Struct value);
+
+ //void visit(String columnName, Time value);
+
+ //void visit(String columnName, Timestamp value);
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcHdfsSinkConnector.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcHdfsSinkConnector.java
new file mode 100644
index 000000000..ac97770da
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcHdfsSinkConnector.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import io.confluent.connect.hdfs.HdfsSinkConnector;
+import io.confluent.connect.hdfs.Version;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * HdfsSinkConnector is a Kafka Connect Connector implementation that ingest data from Kafka to
+ * HDFS.
+ */
+public class JdbcHdfsSinkConnector extends SinkConnector {
+ private Map configProperties;
+
+ @Override
+ public String version() {
+ return Version.getVersion();
+ }
+
+ @Override
+ public void start(Map props) {
+ configProperties = props;
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return JdbcHdfsSinkTask.class;
+ }
+
+ @Override
+ public List> taskConfigs(int maxTasks) {
+ return IntStream
+ .range(0, maxTasks)
+ .mapToObj(i -> {
+ Map taskProps = new HashMap<>(configProperties);
+ taskProps.put(HdfsSinkConnector.TASK_ID_CONFIG_NAME, Integer.toString(i));
+ return taskProps;
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public ConfigDef config() {
+ return JdbcHdfsSinkConnectorConfig.getConfig();
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcHdfsSinkConnectorConfig.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcHdfsSinkConnectorConfig.java
new file mode 100644
index 000000000..f162bfc9e
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcHdfsSinkConnectorConfig.java
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.types.Password;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcHdfsSinkConnectorConfig extends HdfsSinkConnectorConfig {
+ public static final String DATABASE_GROUP = "Database";
+ public static final String HASH_CACHE_GROUP = "HashCache";
+
+ public static final String CONNECTION_URL_CONFIG = "connection.url";
+ public static final String CONNECTION_URL_DOC = "JDBC connection URL.";
+ public static final String CONNECTION_URL_DISPLAY = "JDBC URL";
+
+ public static final String CONNECTION_USER_CONFIG = "connection.user";
+ public static final String CONNECTION_USER_DOC = "JDBC connection user.";
+ public static final String CONNECTION_USER_DISPLAY = "JDBC User";
+
+ public static final String CONNECTION_PASSWORD_CONFIG = "connection.password";
+ public static final String CONNECTION_PASSWORD_DOC = "JDBC connection password.";
+ public static final String CONNECTION_PASSWORD_DISPLAY = "JDBC Password";
+
+ public static final String HASH_CACHE_ENABLED_CONFIG = "hash.cache.enabled";
+ public static final String HASH_CACHE_ENABLED_DOC =
+ "Enable support for Caching Hashed values for large columns,"
+ + " to prevent writing duplicate values.";
+ public static final String HASH_CACHE_ENABLED_DISPLAY = "Hash Cache size";
+ public static final boolean HASH_CACHE_ENABLED_DEFAULT = true;
+
+ public static final String HASH_CACHE_SIZE_CONFIG = "hash.cache.size";
+ public static final String HASH_CACHE_SIZE_DOC =
+ "Maximum size of the Hash Cache. LRU entries are evicted from the cache.";
+ public static final String HASH_CACHE_SIZE_DISPLAY = "Hash Cache size";
+ public static final int HASH_CACHE_SIZE_DEFAULT = 10000;
+
+ public static final String JDBC_FILTERS_CONFIG = "jdbc.filters";
+ public static final String JDBC_FILTERS_DOC =
+ "List of names to filter. Each name denotes a distinct Table";
+ public static final String JDBC_FILTERS_DISPLAY = "Hash Cache size";
+
+ // Example:
+ // jdbc.filters=dvtest,tableX
+ // jdbc.filters.dvtest.db=testdb
+ // jdbc.filters.dvtest.schema=DB2INST1
+ // jdbc.filters.dvtest.table=DV_TEST
+ // jdbc.filters.dvtest.columns=db_action,VALUE_CLOB,VALUE_XML,Value_Blob
+ // jdbc.filters.tableX.db=dbX
+ // jdbc.filters.tableX.schema=schemaX
+ // jdbc.filters.tableX.table=tableX
+ // jdbc.filters.tableX.columns=c1,c2,c3
+
+ public static final String JDBC_FILTER_DBNAME_CONFIG = "dbname";
+ public static final String JDBC_FILTER_TABLE_CONFIG = "table";
+ public static final String JDBC_FILTER_TABLE_DISPLAY = ".";
+ public static final String JDBC_FILTER_COLUMNS_CONFIG = "columns";
+ public static final String JDBC_FILTER_COLUMNS_DOC = "Comma separated list of column names";
+
+ public static ConfigDef newConfigDef() {
+ int orderInDatabaseGroup = 0;
+ int orderInHashCacheGroup = 0;
+ return HdfsSinkConnectorConfig
+ .newConfigDef()
+ // Define Database configuration group
+ .define(
+ CONNECTION_URL_CONFIG,
+ Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ Importance.HIGH,
+ CONNECTION_URL_DOC,
+ DATABASE_GROUP,
+ ++orderInDatabaseGroup,
+ Width.MEDIUM,
+ CONNECTION_URL_DISPLAY
+ )
+ .define(
+ CONNECTION_USER_CONFIG,
+ Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ Importance.HIGH,
+ CONNECTION_USER_DOC,
+ DATABASE_GROUP,
+ ++orderInDatabaseGroup,
+ Width.MEDIUM,
+ CONNECTION_USER_DISPLAY
+ )
+ .define(
+ CONNECTION_PASSWORD_CONFIG,
+ Type.PASSWORD,
+ ConfigDef.NO_DEFAULT_VALUE,
+ Importance.HIGH,
+ CONNECTION_PASSWORD_DOC,
+ DATABASE_GROUP,
+ ++orderInDatabaseGroup,
+ Width.SHORT,
+ CONNECTION_PASSWORD_DISPLAY
+ )
+ .define(
+ JDBC_FILTERS_CONFIG,
+ Type.LIST,
+ ConfigDef.NO_DEFAULT_VALUE,
+ Importance.HIGH,
+ JDBC_FILTERS_DOC,
+ DATABASE_GROUP,
+ ++orderInDatabaseGroup,
+ Width.LONG,
+ JDBC_FILTERS_DISPLAY
+ )
+ .define(
+ HASH_CACHE_ENABLED_CONFIG,
+ Type.BOOLEAN,
+ HASH_CACHE_ENABLED_DEFAULT,
+ Importance.LOW,
+ HASH_CACHE_ENABLED_DOC,
+ HASH_CACHE_GROUP,
+ ++orderInHashCacheGroup,
+ Width.SHORT,
+ HASH_CACHE_ENABLED_DISPLAY
+ )
+ .define(
+ HASH_CACHE_SIZE_CONFIG,
+ Type.INT,
+ HASH_CACHE_SIZE_DEFAULT,
+ Importance.LOW,
+ HASH_CACHE_SIZE_DOC,
+ HASH_CACHE_GROUP,
+ ++orderInHashCacheGroup,
+ Width.MEDIUM,
+ HASH_CACHE_SIZE_DISPLAY
+ );
+ }
+
+ public JdbcHdfsSinkConnectorConfig(Map props) {
+ this(newConfigDef(), addDefaults(props));
+ }
+
+ protected JdbcHdfsSinkConnectorConfig(ConfigDef configDef, Map props) {
+ super(configDef, props);
+ }
+
+ public String name() {
+ return originalsStrings().getOrDefault("name", "JDBC-HDFS-sink");
+ }
+
+ public static ConfigDef getConfig() {
+ return HdfsSinkConnectorConfig.getConfig();
+ }
+
+ public String getConnectionUrl() {
+ return JdbcUtil
+ .trimToNone(getString(CONNECTION_URL_CONFIG))
+ .orElseThrow(() -> new ConfigException(
+ "Missing or empty String value for required [" + CONNECTION_URL_CONFIG + "]"
+ ));
+ }
+
+ public String getConnectionUser() {
+ return JdbcUtil
+ .trimToNone(getString(CONNECTION_USER_CONFIG))
+ .orElseThrow(() -> new ConfigException(
+ "Missing or empty String value for required [" + CONNECTION_USER_CONFIG + "]"
+ ));
+ }
+
+ public Password getConnectionPassword() {
+ return Optional
+ .ofNullable(getPassword(CONNECTION_PASSWORD_CONFIG))
+ .orElseThrow(() -> new ConfigException(
+ "Missing Password value for required [" + CONNECTION_PASSWORD_CONFIG + "]"
+ ));
+ }
+
+ public boolean isHashCacheEnabled() {
+ return getBoolean(HASH_CACHE_ENABLED_CONFIG);
+ }
+
+ public int getHashCacheSize() {
+ return getInt(HASH_CACHE_SIZE_CONFIG);
+ }
+
+ public Map> getJdbcFilterMap() {
+ Map> filterMap =
+ getList(JDBC_FILTERS_CONFIG)
+ .stream()
+ .map(JdbcUtil::trimToNull)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toMap(
+ this::jdbcFilterToTable,
+ this::jdbcFilterToColumns
+ ));
+
+ if (filterMap.isEmpty()) {
+ throw new ConfigException(
+ "Empty list of filter names for ["
+ + JDBC_FILTERS_CONFIG
+ + "]. Must be a comma-separated list."
+ );
+ }
+
+ return filterMap;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(getConfig().toEnrichedRst());
+ }
+
+ private String prefixOf(String filterName) {
+ return JDBC_FILTERS_CONFIG
+ + "."
+ + filterName
+ + ".";
+ }
+
+ // Refactor to use a ConfigDef for parsing each JDBC Filter
+ private JdbcTableInfo jdbcFilterToTable(String filterName) {
+ String prefix = prefixOf(filterName);
+
+ Map filterConfig = originalsWithPrefix(prefix);
+
+ final String db = Optional
+ .ofNullable(filterConfig.get(JDBC_FILTER_DBNAME_CONFIG))
+ .map(String.class::cast)
+ .flatMap(JdbcUtil::trimToNone)
+ .orElseThrow(() -> new ConfigException(
+ "Missing or empty String value for required ["
+ + prefix
+ + JDBC_FILTER_DBNAME_CONFIG
+ + "]"
+ ));
+
+ final String[] schemaAndTable = Optional
+ .ofNullable(filterConfig.get(JDBC_FILTER_TABLE_CONFIG))
+ .map(String.class::cast)
+ .map(csvColumns -> csvColumns.split("\\."))
+ .orElseThrow(() -> new ConfigException(
+ "Missing or empty String value for required ["
+ + prefix
+ + JDBC_FILTER_TABLE_CONFIG
+ + "]"
+ ));
+
+ if (schemaAndTable.length != 2) {
+ throw new ConfigException(
+ "Invalid value for ["
+ + prefix
+ + JDBC_FILTER_TABLE_CONFIG
+ + "]. Expected: "
+ + JDBC_FILTER_TABLE_DISPLAY
+ );
+ }
+
+ final String schema = schemaAndTable[0].trim();
+
+ if (schema.isEmpty()) {
+ throw new ConfigException(
+ " cannot be blank for ["
+ + prefix
+ + JDBC_FILTER_TABLE_CONFIG
+ + "]. Expected: "
+ + JDBC_FILTER_TABLE_DISPLAY
+ );
+ }
+
+ final String table = schemaAndTable[1].trim();
+
+ if (table.isEmpty()) {
+ throw new ConfigException(
+ " cannot be blank for ["
+ + prefix
+ + JDBC_FILTER_TABLE_CONFIG
+ + "]. Expected: "
+ + JDBC_FILTER_TABLE_DISPLAY
+ );
+ }
+
+ return new JdbcTableInfo(db, schema, table);
+ }
+
+ private Set jdbcFilterToColumns(String filterName) {
+ String prefix = prefixOf(filterName);
+
+ Map filterConfig = originalsWithPrefix(prefix);
+
+ // Columns are required, and must not be empty
+ Set columns = Optional
+ .ofNullable(filterConfig.get(JDBC_FILTER_COLUMNS_CONFIG))
+ .map(String.class::cast)
+ .map(csvColumns -> csvColumns.split(","))
+ .map(Arrays::stream)
+ .orElseGet(Stream::empty)
+ .map(JdbcUtil::trimToNull)
+ .filter(Objects::nonNull)
+ .map(String::toLowerCase)
+ .collect(Collectors.toSet());
+
+ if (columns.isEmpty()) {
+ throw new ConfigException(
+ "Missing or empty list of columns for ["
+ + prefix
+ + JDBC_FILTER_COLUMNS_CONFIG
+ + "]. Expected: "
+ + JDBC_FILTER_COLUMNS_DOC
+ );
+ }
+
+ return columns;
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcHdfsSinkTask.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcHdfsSinkTask.java
new file mode 100644
index 000000000..c0566848e
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcHdfsSinkTask.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import io.confluent.connect.hdfs.HdfsSinkTask;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.MessageDigest;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcHdfsSinkTask extends HdfsSinkTask {
+ private static final Logger log = LoggerFactory.getLogger(JdbcHdfsSinkTask.class);
+
+ private HashCache hashCache;
+ private HikariConfig hikariConfig;
+ private Map> configuredTableColumnsMap;
+ private HikariDataSource dataSource;
+ private JdbcRecordTransformer recordTransformer;
+
+ @Override
+ public void start(Map props) {
+ try {
+ log.info("Loading JDBC configs");
+
+ JdbcHdfsSinkConnectorConfig connectorConfig = new JdbcHdfsSinkConnectorConfig(props);
+
+ hikariConfig = new HikariConfig();
+ hikariConfig.setJdbcUrl(connectorConfig.getConnectionUrl());
+ hikariConfig.setUsername(connectorConfig.getConnectionUser());
+ hikariConfig.setPassword(connectorConfig.getConnectionPassword().value());
+
+ configuredTableColumnsMap = connectorConfig.getJdbcFilterMap();
+
+ if (connectorConfig.isHashCacheEnabled()) {
+ hashCache = new HashCache(
+ connectorConfig.getHashCacheSize(),
+ // TODO: Un-hardcode this
+ MessageDigest.getInstance("MD5")
+ );
+ }
+
+ log.info("Successfully loaded JDBC configs");
+ } catch (ConnectException ex) {
+ log.error(
+ "JDBC configuration error: {}",
+ ex.getMessage(),
+ ex
+ );
+ throw ex;
+ } catch (Exception ex) {
+ log.error(
+ "JDBC configuration error: {}",
+ ex.getMessage(),
+ ex
+ );
+ throw new ConnectException(
+ "JDBC configuration error: " + ex.getMessage(),
+ ex
+ );
+ }
+
+ super.start(props);
+ }
+
+ @Override
+ public void put(Collection records) {
+ log.debug("put(large-columns): Processing {} records from Kafka", records.size());
+ // TODO: Keep track of schema changes
+ // TODO: Verify db and schema match the connection string.
+ // TODO: groupBy?
+
+ // NOTE: We need to have a fresh SqlCache every iteration.
+ // TODO: Determine if it would be safe to cache Table Schemas long-term,
+ // or do they change often enough to warrant a refresh every iteration?
+ SqlMetadataCache sqlMetadataCache = new SqlMetadataCache(dataSource);
+
+ // Iterate over each record, and put() each individually
+ for (SinkRecord record : records) {
+ try {
+ Optional
+ .ofNullable(recordTransformer.transformRecord(sqlMetadataCache, record))
+ .map(Stream::of)
+ .orElseGet(Stream::empty)
+ .peek(transformedRecord -> {
+ log.debug(
+ "Created new SinkRecord from old Sink Record: PK [{}] Columns [{}]",
+ transformedRecord.key(),
+ transformedRecord
+ .valueSchema()
+ .fields()
+ .stream()
+ .map(Field::name)
+ .collect(Collectors.joining(","))
+ );
+ })
+ .map(Collections::singletonList)
+ .forEach(super::put);
+ } catch (SQLException ex) {
+ log.error("Failed to transform Record: {}", ex.getMessage(), ex);
+ throw new DataException(
+ "Failed to transform Record: " + ex.getMessage(),
+ ex
+ );
+ }
+ }
+
+ // Trigger a sync() to HDFS, even if no records were written.
+ // This updates all accounting and delayed writes, etc...
+ super.put(Collections.emptyList());
+ log.debug("put(-large-files): Finished");
+ }
+
+ @Override
+ public void open(Collection partitions) {
+ log.info("Opening JDBC DataSource: {}", hikariConfig.getJdbcUrl());
+
+ dataSource = new HikariDataSource(hikariConfig);
+
+ recordTransformer = new JdbcRecordTransformer(
+ dataSource,
+ configuredTableColumnsMap,
+ hashCache
+ );
+
+ log.info("Successfully opened JDBC DataSource: {}", dataSource.getJdbcUrl());
+
+ super.open(partitions);
+ }
+
+ @Override
+ public void close(Collection partitions) {
+ if (dataSource != null) {
+ try {
+ log.info("Closing JDBC DataSource {}", hikariConfig.getJdbcUrl());
+ dataSource.close();
+ } catch (Exception ex) {
+ log.warn("Failed to close DataSource: {}", ex.getMessage(), ex);
+ }
+ }
+
+ super.close(partitions);
+ }
+
+ @Override
+ public void stop() {
+ log.info("Stopping {}", getClass().getSimpleName());
+ super.stop();
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcQueryUtil.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcQueryUtil.java
new file mode 100644
index 000000000..41234a5d1
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcQueryUtil.java
@@ -0,0 +1,452 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.JDBCType;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class JdbcQueryUtil {
+ private static final Logger log = LoggerFactory.getLogger(JdbcQueryUtil.class);
+
+ @FunctionalInterface
+ private interface PrepareConsumer {
+ void accept(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException;
+ }
+
+ @FunctionalInterface
+ private interface ResultSetConsumer {
+ void accept(ResultSet resultSet,
+ JDBCType jdbcType,
+ String columnName,
+ JdbcColumnVisitor columnVisitor) throws SQLException;
+ }
+
+ private static final Map jdbcTypePrepareMap = new HashMap<>();
+ private static final Map jdbcTypeResultSetMap = new HashMap<>();
+
+ static {
+ // TODO: Add more as needed, if Primary Keys are other data types
+ jdbcTypePrepareMap.put(JDBCType.BIGINT, JdbcQueryUtil::setLong);
+ jdbcTypePrepareMap.put(JDBCType.BIT, JdbcQueryUtil::setBoolean);
+ jdbcTypePrepareMap.put(JDBCType.BOOLEAN, JdbcQueryUtil::setBoolean);
+ jdbcTypePrepareMap.put(JDBCType.CHAR, JdbcQueryUtil::setString);
+ jdbcTypePrepareMap.put(JDBCType.DOUBLE, JdbcQueryUtil::setDouble);
+ jdbcTypePrepareMap.put(JDBCType.FLOAT, JdbcQueryUtil::setFloat);
+ jdbcTypePrepareMap.put(JDBCType.INTEGER, JdbcQueryUtil::setInt);
+ jdbcTypePrepareMap.put(JDBCType.LONGVARCHAR, JdbcQueryUtil::setString);
+ jdbcTypePrepareMap.put(JDBCType.REAL, JdbcQueryUtil::setFloat);
+ jdbcTypePrepareMap.put(JDBCType.SMALLINT, JdbcQueryUtil::setShort);
+ jdbcTypePrepareMap.put(JDBCType.TINYINT, JdbcQueryUtil::setByte);
+ jdbcTypePrepareMap.put(JDBCType.VARCHAR, JdbcQueryUtil::setString);
+ // TODO: Add more as needed, if Query results are not all LOBs or VARCHARS
+ jdbcTypeResultSetMap.put(JDBCType.BLOB, JdbcQueryUtil::visitBlob);
+ jdbcTypeResultSetMap.put(JDBCType.CLOB, JdbcQueryUtil::visitClob);
+ jdbcTypeResultSetMap.put(JDBCType.LONGVARCHAR, JdbcQueryUtil::visitString);
+ jdbcTypeResultSetMap.put(JDBCType.SQLXML, JdbcQueryUtil::visitSqlXml);
+ jdbcTypeResultSetMap.put(JDBCType.VARCHAR, JdbcQueryUtil::visitString);
+ }
+
+ public static List fetchAllColumns(
+ DataSource dataSource,
+ JdbcTableInfo tableInfo
+ ) throws SQLException {
+ try (Connection connection = dataSource.getConnection();
+ // We uppercase the schema and table because otherwise DB2 won't recognize them...
+ ResultSet columns = connection.getMetaData().getColumns(
+ null,
+ JdbcUtil.toUpperCase(tableInfo.getSchema()),
+ JdbcUtil.toUpperCase(tableInfo.getTable()),
+ null
+ )
+ ) {
+ List columnList = new LinkedList<>();
+ while (columns.next()) {
+ String columnName = columns.getString("COLUMN_NAME").trim();
+ // WARNING: This returns the wrong value in some cases (2009/XML becomes 1111)
+ int dataTypeNum = columns.getInt("DATA_TYPE");
+ String dataTypeStr = columns.getString("DATA_TYPE");
+ String typeName = columns.getString("TYPE_NAME");
+ // TODO: Validate dataType against typeName
+ JDBCType jdbcType = JDBCType.valueOf(Integer.parseInt(dataTypeStr));
+ boolean nullable = columns.getBoolean("NULLABLE");
+ //String isAutoIncrement = columns.getString("IS_AUTOINCREMENT");
+ //int radix = columns.getInt("NUM_PREC_RADIX");
+ int ordinal = columns.getInt("ORDINAL_POSITION");
+ JdbcColumnInfo jdbcColumn = new JdbcColumnInfo(columnName, jdbcType, ordinal, nullable);
+ log.debug(
+ "Loaded Column for Table [{}] TypeName [{}] DataType [{} ==? {}] = {}",
+ tableInfo,
+ typeName,
+ dataTypeStr,
+ dataTypeNum,
+ jdbcColumn
+ );
+ columnList.add(jdbcColumn);
+ }
+
+ return columnList
+ .stream()
+ .sorted(JdbcColumnInfo.byOrdinal)
+ .collect(Collectors.toList());
+ }
+ }
+
+ public static Set fetchPrimaryKeyNames(
+ DataSource dataSource,
+ JdbcTableInfo tableInfo
+ ) throws SQLException {
+ try (Connection connection = dataSource.getConnection();
+ // We uppercase the schema and table because otherwise DB2 won't recognize them...
+ ResultSet columns = connection.getMetaData().getPrimaryKeys(
+ null,
+ JdbcUtil.toUpperCase(tableInfo.getSchema()),
+ JdbcUtil.toUpperCase(tableInfo.getTable())
+ )
+ ) {
+ Set primaryKeyNames = new HashSet<>();
+ while (columns.next()) {
+ //String schem = columns.getString("TABLE_SCHEM");
+ //String tn = columns.getString("TABLE_NAME");
+ String columnName = columns.getString("COLUMN_NAME").trim();
+ //String pkName = columns.getString("PK_NAME");
+ //short kseq = columns.getShort("KEY_SEQ");
+ primaryKeyNames.add(columnName);
+ }
+ log.debug("Table [{}] PrimaryKeys: {}", tableInfo, primaryKeyNames);
+ return primaryKeyNames;
+ }
+ }
+
+ public static void executeSingletonQuery(
+ DataSource dataSource,
+ JdbcTableInfo tableInfo,
+ List primaryKeyColumns,
+ Collection columnsToQuery,
+ JdbcValueMapper valueMapper,
+ JdbcColumnVisitor columnVisitor,
+ String primaryKeyStr
+ ) throws SQLException {
+ String selectClause =
+ columnsToQuery
+ .stream()
+ .map(JdbcColumnInfo::getName)
+ .collect(Collectors.joining(","));
+
+ String whereClause =
+ primaryKeyColumns
+ .stream()
+ .map(JdbcColumnInfo::getName)
+ .map(primaryKeyName -> primaryKeyName + "=?")
+ .collect(Collectors.joining(" AND "));
+
+ String sqlQuery =
+ "SELECT "
+ + selectClause
+ + " FROM "
+ + tableInfo.qualifiedName()
+ + " WHERE "
+ + whereClause
+ + ";";
+
+ // Execute the Query
+ log.debug(
+ "Executing SQL Query for PK [{}] in Table [{}]: {}",
+ primaryKeyStr,
+ tableInfo,
+ sqlQuery
+ );
+
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery)) {
+ int index = 1;
+ for (JdbcColumnInfo primaryKeyColumn : primaryKeyColumns) {
+ JdbcQueryUtil.setPreparedValue(
+ preparedStatement,
+ index++,
+ primaryKeyColumn.getJdbcType(),
+ valueMapper,
+ primaryKeyColumn.getName()
+ );
+ }
+
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ // TODO: How do we detect if incoming record is a DELETE?
+ // So we don't end up writing large values for no reason.
+ log.warn(
+ "Cannot find Row {} in Table [{}]",
+ primaryKeyStr,
+ tableInfo
+ );
+ } else {
+ log.debug(
+ "Processing ResultSet from Query PK [{}] in Table [{}]",
+ primaryKeyStr,
+ tableInfo
+ );
+
+ // Read values from the DB into newValueStruct
+ JdbcQueryUtil.visitResultSetColumns(resultSet, columnVisitor);
+
+ // NOTE: We should only have a single result!
+ if (resultSet.next()) {
+ throw new DataException(
+ "Got more than 1 row for Query PK ["
+ + primaryKeyStr
+ + "] in Table ["
+ + tableInfo
+ + "]"
+ );
+ }
+ }
+ }
+ // TODO: Rollback?
+ }
+ }
+
+ public static void setPreparedValue(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException {
+ if (JDBCType.NULL == jdbcType) {
+ preparedStatement.setNull(index, jdbcType.getVendorTypeNumber());
+ return;
+ }
+
+ Optional
+ .ofNullable(jdbcTypePrepareMap.get(jdbcType))
+ .orElseThrow(
+ () -> new DataException(
+ "Unsupported Query Column ["
+ + fieldName
+ + "] type ["
+ + jdbcType
+ + "] in PreparedStatement"
+ )
+ )
+ .accept(preparedStatement, index, jdbcType, valueMapper, fieldName);
+ }
+
+ public static void visitResultSetColumns(ResultSet resultSet,
+ JdbcColumnVisitor columnVisitor) throws SQLException {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ int columnCount = resultSetMetaData.getColumnCount();
+ for (int i = 1; i <= columnCount; i++) {
+ String columnName = resultSetMetaData.getColumnName(i);
+ JDBCType jdbcType = JDBCType.valueOf(resultSetMetaData.getColumnType(i));
+ String tableName = resultSetMetaData.getTableName(i);
+
+ Optional
+ .ofNullable(jdbcTypeResultSetMap.get(jdbcType))
+ .orElseThrow(() -> new DataException(
+ "Unsupported ResultSet Column ["
+ + columnName
+ + "] type ["
+ + jdbcType
+ + "] in Table ["
+ + tableName
+ + "]"
+ ))
+ .accept(resultSet, jdbcType, columnName, columnVisitor);
+ }
+ }
+
+ private static void setBoolean(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException {
+ Boolean value = valueMapper.getBoolean(fieldName);
+ if (value != null) {
+ preparedStatement.setBoolean(index, value);
+ } else {
+ preparedStatement.setNull(index, jdbcType.getVendorTypeNumber());
+ }
+ }
+
+ private static void setByte(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException {
+ Byte value = valueMapper.getByte(fieldName);
+ if (value != null) {
+ preparedStatement.setByte(index, value);
+ } else {
+ preparedStatement.setNull(index, jdbcType.getVendorTypeNumber());
+ }
+ }
+
+ private static void setDouble(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException {
+ Double value = valueMapper.getDouble(fieldName);
+ if (value != null) {
+ preparedStatement.setDouble(index, value);
+ } else {
+ preparedStatement.setNull(index, jdbcType.getVendorTypeNumber());
+ }
+ }
+
+ private static void setFloat(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException {
+ Float value = valueMapper.getFloat(fieldName);
+ if (value != null) {
+ preparedStatement.setFloat(index, value);
+ } else {
+ preparedStatement.setNull(index, jdbcType.getVendorTypeNumber());
+ }
+ }
+
+ private static void setInt(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException {
+ Integer value = valueMapper.getInteger(fieldName);
+ if (value != null) {
+ preparedStatement.setInt(index, value);
+ } else {
+ preparedStatement.setNull(index, jdbcType.getVendorTypeNumber());
+ }
+ }
+
+ private static void setLong(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException {
+ Long value = valueMapper.getLong(fieldName);
+ if (value != null) {
+ preparedStatement.setLong(index, value);
+ } else {
+ preparedStatement.setNull(index, jdbcType.getVendorTypeNumber());
+ }
+ }
+
+ private static void setShort(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException {
+ Short value = valueMapper.getShort(fieldName);
+ if (value != null) {
+ preparedStatement.setShort(index, value);
+ } else {
+ preparedStatement.setNull(index, jdbcType.getVendorTypeNumber());
+ }
+ }
+
+ private static void setString(PreparedStatement preparedStatement,
+ int index,
+ JDBCType jdbcType,
+ JdbcValueMapper valueMapper,
+ String fieldName) throws SQLException {
+ String value = valueMapper.getString(fieldName);
+ if (value != null) {
+ preparedStatement.setString(index, value);
+ } else {
+ preparedStatement.setNull(index, jdbcType.getVendorTypeNumber());
+ }
+ }
+
+ private static void visitBlob(ResultSet resultSet,
+ JDBCType jdbcType,
+ String columnName,
+ JdbcColumnVisitor columnVisitor) throws SQLException {
+ Blob value = resultSet.getBlob(columnName);
+ log.debug(
+ "Visit Column [{}] type [{}] length [{}]",
+ columnName,
+ jdbcType,
+ (value != null) ? value.length() : null
+ );
+ columnVisitor.visit(columnName, value);
+ }
+
+ private static void visitClob(ResultSet resultSet,
+ JDBCType jdbcType,
+ String columnName,
+ JdbcColumnVisitor columnVisitor) throws SQLException {
+ Clob value = resultSet.getClob(columnName);
+ log.debug(
+ "Visit Column [{}] type [{}] length [{}]",
+ columnName,
+ jdbcType,
+ (value != null) ? value.length() : null
+ );
+ columnVisitor.visit(columnName, value);
+ }
+
+ private static void visitString(ResultSet resultSet,
+ JDBCType jdbcType,
+ String columnName,
+ JdbcColumnVisitor columnVisitor) throws SQLException {
+ String value = resultSet.getString(columnName);
+ log.debug(
+ "Visit Column [{}] type [{}] length [{}]",
+ columnName,
+ jdbcType,
+ (value != null) ? value.length() : null
+ );
+ columnVisitor.visit(columnName, value);
+ }
+
+ private static void visitSqlXml(ResultSet resultSet,
+ JDBCType jdbcType,
+ String columnName,
+ JdbcColumnVisitor columnVisitor) throws SQLException {
+ SQLXML value = resultSet.getSQLXML(columnName);
+ log.debug(
+ "Visit Column [{}] type [{}] isNull? [{}]",
+ columnName,
+ jdbcType,
+ (value == null)
+ );
+ columnVisitor.visit(columnName, value);
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcRecordTransformer.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcRecordTransformer.java
new file mode 100644
index 000000000..e0d282a0a
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcRecordTransformer.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class JdbcRecordTransformer {
+ private final DataSource dataSource;
+ private final Map> configuredTableColumnsMap;
+ private final HashCache hashCache;
+
+ public JdbcRecordTransformer(
+ DataSource dataSource,
+ Map> configuredTableColumnsMap,
+ HashCache hashCache
+ ) {
+ this.dataSource = dataSource;
+ this.configuredTableColumnsMap = new HashMap<>(configuredTableColumnsMap);
+ this.hashCache = hashCache;
+ }
+
+ /**
+ * NOTE: Not threadsafe, as several components update things like basic Collections
+ */
+ public SinkRecord transformRecord(
+ SqlMetadataCache sqlMetadataCache,
+ SinkRecord oldRecord
+ ) throws SQLException {
+ JdbcTableInfo tableInfo = new JdbcTableInfo(oldRecord.headers());
+
+ Set configuredFieldNamesLower =
+ configuredTableColumnsMap.computeIfAbsent(
+ tableInfo,
+ __ -> Collections.emptySet()
+ );
+
+ // No columns to Query? No need to write anything at all to HDFS
+
+ if (configuredFieldNamesLower.isEmpty()) {
+ return null;
+ }
+
+ // Calculate the list of Columns to query
+
+ Schema oldValueSchema = oldRecord.valueSchema();
+
+ Map oldFieldsMap = toFieldsMap(oldValueSchema);
+
+ Set oldFieldNamesLower =
+ oldFieldsMap
+ .keySet()
+ .stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toSet());
+
+ Set columnNamesLowerToQuery =
+ configuredFieldNamesLower
+ .stream()
+ .filter(((Predicate) oldFieldNamesLower::contains).negate())
+ .collect(Collectors.toSet());
+
+ // No actual columns to Query? No need to write anything at all to HDFS
+
+ if (columnNamesLowerToQuery.isEmpty()) {
+ return null;
+ }
+
+ // Gather Column Metadata from the DB
+
+ Map allColumnsLowerMap =
+ sqlMetadataCache
+ .fetchAllColumns(tableInfo)
+ .stream()
+ .collect(Collectors.toMap(
+ column -> column.getName().toLowerCase(),
+ Function.identity()
+ ));
+
+ List primaryKeyColumns =
+ sqlMetadataCache.fetchPrimaryKeyColumns(tableInfo);
+
+ Set primaryKeyColumnNamesLower =
+ primaryKeyColumns
+ .stream()
+ .map(JdbcColumnInfo::getName)
+ .map(String::toLowerCase)
+ .collect(Collectors.toSet());
+
+ List columnsToQuery =
+ columnNamesLowerToQuery
+ .stream()
+ .filter(((Predicate) primaryKeyColumnNamesLower::contains).negate())
+ .map(columnNameLower -> Optional
+ .ofNullable(allColumnsLowerMap.get(columnNameLower))
+ .orElseThrow(() -> new DataException(
+ "Configured Column ["
+ + columnNameLower
+ + "] does not exist in Table ["
+ + tableInfo
+ + "]"
+ ))
+ )
+ .sorted(JdbcColumnInfo.byOrdinal)
+ .collect(Collectors.toList());
+
+ // Create the mew Schema and new value Struct
+
+ Schema newValueSchema = JdbcSchema.createSchema(
+ configuredFieldNamesLower,
+ oldValueSchema,
+ primaryKeyColumns,
+ columnsToQuery
+ );
+
+ Struct newValueStruct = new Struct(newValueSchema);
+
+ // Populate the newValueStruct with existing values from oldValueStruct
+
+ Struct oldValueStruct = (Struct) oldRecord.value();
+
+ newValueSchema
+ .fields()
+ .forEach(newField -> Optional
+ .ofNullable(oldFieldsMap.get(newField.name()))
+ .flatMap(oldField -> Optional.ofNullable(oldValueStruct.get(oldField)))
+ .ifPresent(oldValue -> newValueStruct.put(newField, oldValue))
+ );
+
+ // Execute the query
+
+ String primaryKeyStr = Optional
+ .ofNullable(oldRecord.key())
+ .map(Object::toString)
+ .map(String::trim)
+ .orElse("");
+
+ FilteredColumnToStructVisitor columnVisitor =
+ new FilteredColumnToStructVisitor(
+ hashCache,
+ tableInfo,
+ primaryKeyStr,
+ newValueStruct
+ );
+
+ JdbcQueryUtil.executeSingletonQuery(
+ dataSource,
+ tableInfo,
+ primaryKeyColumns,
+ columnsToQuery,
+ new StructToJdbcValueMapper(oldValueStruct),
+ columnVisitor,
+ primaryKeyStr
+ );
+
+ // Only write a record if there are changes in the columns (usually LOBs),
+ // based on whether the cached Hash of each column has changed or not.
+ // TODO: Make this optimization configurable, so it can be disabled from the config
+
+ if (!columnVisitor.hasChangedColumns()) {
+ return null;
+ }
+
+ // Make sure the newValueStruct is fully populated
+ newValueStruct.validate();
+
+ // Create the newly transformed SourceRecord
+ return oldRecord.newRecord(
+ oldRecord.topic(),
+ oldRecord.kafkaPartition(),
+ oldRecord.keySchema(),
+ oldRecord.key(),
+ newValueSchema,
+ newValueStruct,
+ oldRecord.timestamp()
+ );
+ }
+
+ private Map toFieldsMap(Schema schema) {
+ return schema
+ .fields()
+ .stream()
+ .collect(Collectors.toMap(
+ field -> Optional
+ .ofNullable(field.name())
+ .map(String::trim)
+ .filter(((Predicate) String::isEmpty).negate())
+ // NOTE: Should be impossible to reach here!
+ .orElseThrow(() -> new DataException(
+ "Field ["
+ + field.name()
+ + "] is null or empty for Schema ["
+ + schema.name()
+ + "]"
+ )),
+ Function.identity()
+ ));
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcSchema.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcSchema.java
new file mode 100644
index 000000000..12a1e0907
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcSchema.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcSchema {
+ public static Schema createSchema(Set configuredFieldNamesLower,
+ Schema oldSchema,
+ Collection primaryKeyColumns,
+ Collection columnsToQuery) {
+ SchemaBuilder newSchemaBuilder = SchemaBuilder.struct();
+
+ Set newColumnNames =
+ Stream
+ .concat(
+ primaryKeyColumns.stream(),
+ columnsToQuery.stream()
+ )
+ // Unnecessary, as columnsToQuery already filtered out all primary keys
+ //.filter(distinctBy(JdbcColumn::getName))
+ .sorted(JdbcColumnInfo.byOrdinal)
+ .peek(column -> {
+ String columnName = column.getName();
+ Schema fieldSchema = Optional
+ .ofNullable(oldSchema.field(columnName))
+ .map(Field::schema)
+ .orElseGet(() -> toSchema(column));
+ newSchemaBuilder.field(columnName, fieldSchema);
+ })
+ .map(JdbcColumnInfo::getName)
+ .collect(Collectors.toSet());
+
+ oldSchema
+ .fields()
+ .forEach(field -> {
+ String fieldName = field.name().trim();
+ if (!newColumnNames.contains(fieldName)
+ && configuredFieldNamesLower.contains(fieldName.toLowerCase())) {
+ newSchemaBuilder.field(fieldName, field.schema());
+ }
+ });
+
+ return newSchemaBuilder.build();
+ }
+
+ private static Schema toSchema(JdbcColumnInfo column) {
+ switch (column.getJdbcType()) {
+ case BLOB:
+ return column.isNullable() ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
+ case CLOB:
+ case LONGVARCHAR:
+ case SQLXML:
+ case VARCHAR:
+ return column.isNullable() ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
+ default:
+ throw new DataException(
+ "Cannot convert Column ["
+ + column.getName()
+ + "] type ["
+ + column.getJdbcType()
+ + "] into a Value Schema"
+ );
+ }
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcTableInfo.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcTableInfo.java
new file mode 100644
index 000000000..51666a53c
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcTableInfo.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import org.apache.kafka.connect.header.Headers;
+
+import javax.annotation.Nonnull;
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcTableInfo implements Comparable {
+ private static final String HEADER_DB = "__source_db";
+ private static final String HEADER_SCHEMA = "__source_schema";
+ private static final String HEADER_TABLE = "__source_table";
+
+ private final String db;
+ private final String schema;
+ private final String table;
+
+ public static Comparator comparator =
+ Comparator
+ .comparing(
+ JdbcTableInfo::getDb,
+ Comparator.nullsFirst(Comparator.naturalOrder())
+ )
+ .thenComparing(
+ JdbcTableInfo::getSchema,
+ Comparator.nullsFirst(Comparator.naturalOrder())
+ )
+ .thenComparing(
+ JdbcTableInfo::getTable,
+ Comparator.nullsFirst(Comparator.naturalOrder())
+ );
+
+ public JdbcTableInfo(Headers headers) {
+ this(
+ // TODO: Validate not null or empty?
+ (String) headers.lastWithName(HEADER_DB).value(),
+ // TODO: Validate not null or empty?
+ (String) headers.lastWithName(HEADER_SCHEMA).value(),
+ // TODO: Validate not null or empty!
+ (String) headers.lastWithName(HEADER_TABLE).value()
+ );
+ }
+
+ public JdbcTableInfo(String db, String schema, String table) {
+ this.db = JdbcUtil
+ .trimToNone(db)
+ .map(String::toLowerCase)
+ .orElse(null);
+ this.schema = JdbcUtil
+ .trimToNone(schema)
+ .map(String::toLowerCase)
+ .orElse(null);
+ this.table = JdbcUtil
+ .trimToNone(table)
+ .map(String::toLowerCase)
+ .orElse(null);
+ }
+
+ public String getDb() {
+ return db;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String qualifiedName() {
+ // Same as toString, without being prefixed by the db
+ return JdbcUtil.trimToNull(
+ Stream
+ .of(schema, table)
+ .filter(Objects::nonNull)
+ .collect(Collectors.joining("."))
+ );
+ }
+
+ @Override
+ public int compareTo(@Nonnull JdbcTableInfo tableInfo) {
+ return comparator.compare(this, tableInfo);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JdbcTableInfo)) {
+ return false;
+ }
+ return compareTo((JdbcTableInfo) o) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getDb(), getSchema(), getTable());
+ }
+
+ @Override
+ public String toString() {
+ return Stream
+ .of(db, schema, table)
+ .filter(Objects::nonNull)
+ .collect(Collectors.joining(".", "JdbcTableInfo{", "}"));
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcUtil.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcUtil.java
new file mode 100644
index 000000000..b94db3538
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcUtil.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import java.util.Optional;
+import java.util.function.Predicate;
+
+public class JdbcUtil {
+ public static Optional trimToNone(String value) {
+ return Optional
+ .ofNullable(value)
+ .map(String::trim)
+ .filter(((Predicate) String::isEmpty).negate());
+ }
+
+ public static String trimToNull(String value) {
+ return trimToNone(value)
+ .orElse(null);
+ }
+
+ public static String toUpperCase(String value) {
+ return Optional
+ .ofNullable(value)
+ .map(String::toUpperCase)
+ .orElse(null);
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcValueMapper.java b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcValueMapper.java
new file mode 100644
index 000000000..e227284b3
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/JdbcValueMapper.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+public interface JdbcValueMapper {
+ Boolean getBoolean(String key);
+
+ Byte getByte(String key);
+
+ Double getDouble(String key);
+
+ Float getFloat(String key);
+
+ Integer getInteger(String key);
+
+ Long getLong(String key);
+
+ Short getShort(String key);
+
+ String getString(String key);
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/SqlMetadataCache.java b/src/main/java/io/confluent/connect/hdfs/jdbc/SqlMetadataCache.java
new file mode 100644
index 000000000..33c5d1ec9
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/SqlMetadataCache.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SqlMetadataCache {
+ private final Map> allColumnsMap =
+ new HashMap<>();
+ private final Map> primaryKeyColumnsMap =
+ new HashMap<>();
+ private final DataSource dataSource;
+
+ public SqlMetadataCache(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public synchronized List fetchAllColumns(
+ JdbcTableInfo tableInfo
+ ) throws SQLException {
+ List allColumns = allColumnsMap.get(tableInfo);
+
+ if (allColumns == null) {
+ allColumns = JdbcQueryUtil.fetchAllColumns(dataSource, tableInfo);
+ allColumnsMap.put(tableInfo, allColumns);
+ }
+ return allColumns;
+ }
+
+ public synchronized List fetchPrimaryKeyColumns(
+ JdbcTableInfo tableInfo
+ ) throws SQLException {
+ List primaryKeyColumns = primaryKeyColumnsMap.get(tableInfo);
+
+ if (primaryKeyColumns == null) {
+ Collection primaryKeyNames =
+ JdbcQueryUtil.fetchPrimaryKeyNames(dataSource, tableInfo);
+
+ // TODO: Do we need to verify the PK exists in the list of columns?
+ primaryKeyColumns =
+ fetchAllColumns(tableInfo)
+ .stream()
+ .filter(column -> primaryKeyNames.contains(column.getName()))
+ .collect(Collectors.toList());
+
+ primaryKeyColumnsMap.put(tableInfo, primaryKeyColumns);
+ }
+ return primaryKeyColumns;
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/jdbc/StructToJdbcValueMapper.java b/src/main/java/io/confluent/connect/hdfs/jdbc/StructToJdbcValueMapper.java
new file mode 100644
index 000000000..1066ba1ce
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/jdbc/StructToJdbcValueMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.jdbc;
+
+import org.apache.kafka.connect.data.Struct;
+
+public class StructToJdbcValueMapper implements JdbcValueMapper {
+ public final Struct struct;
+
+ public StructToJdbcValueMapper(Struct struct) {
+ this.struct = struct;
+ }
+
+ @Override
+ public Boolean getBoolean(String key) {
+ return struct.getBoolean(key);
+ }
+
+ @Override
+ public Byte getByte(String key) {
+ return struct.getInt8(key);
+ }
+
+ @Override
+ public Double getDouble(String key) {
+ return struct.getFloat64(key);
+ }
+
+ @Override
+ public Float getFloat(String key) {
+ return struct.getFloat32(key);
+ }
+
+ @Override
+ public Integer getInteger(String key) {
+ return struct.getInt32(key);
+ }
+
+ @Override
+ public Long getLong(String key) {
+ return struct.getInt64(key);
+ }
+
+ @Override
+ public Short getShort(String key) {
+ return struct.getInt16(key);
+ }
+
+ @Override
+ public String getString(String key) {
+ return struct.getString(key);
+ }
+}
diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java
index aba47b9e5..3499986a5 100644
--- a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java
+++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java
@@ -16,9 +16,9 @@
package io.confluent.connect.hdfs.orc;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
+import io.confluent.connect.hdfs.schema.HiveSchemaConverterWithLogicalTypes;
import io.confluent.connect.storage.format.RecordWriter;
import io.confluent.connect.storage.format.RecordWriterProvider;
-import io.confluent.connect.storage.hive.HiveSchemaConverter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
@@ -70,7 +70,7 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) {
}
};
- typeInfo = HiveSchemaConverter.convert(schema);
+ typeInfo = HiveSchemaConverterWithLogicalTypes.convert(schema);
ObjectInspector objectInspector = OrcStruct.createObjectInspector(typeInfo);
log.info("Opening ORC record writer for: {}", filename);
@@ -90,7 +90,10 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) {
);
Struct struct = (Struct) record.value();
- OrcStruct row = OrcUtil.createOrcStruct(typeInfo, OrcUtil.convertStruct(struct));
+ OrcStruct row = OrcUtil.createOrcStruct(
+ typeInfo,
+ OrcUtil.convertStruct(typeInfo, struct)
+ );
writer.addRow(row);
} else {
diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcUtil.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcUtil.java
index 25d783cb9..70443e5d4 100644
--- a/src/main/java/io/confluent/connect/hdfs/orc/OrcUtil.java
+++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcUtil.java
@@ -28,17 +28,21 @@
import static org.apache.kafka.connect.data.Schema.Type.STRING;
import static org.apache.kafka.connect.data.Schema.Type.STRUCT;
+import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
-import java.util.function.BiFunction;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.ArrayPrimitiveWritable;
import org.apache.hadoop.io.BooleanWritable;
@@ -50,6 +54,7 @@
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
@@ -59,10 +64,12 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.function.BiFunction;
public final class OrcUtil {
- private static Map> CONVERSION_MAP = new HashMap<>();
+ private static final Map> CONVERSION_MAP =
+ new HashMap<>();
static {
CONVERSION_MAP.put(ARRAY, OrcUtil::convertArray);
@@ -76,7 +83,6 @@ public final class OrcUtil {
CONVERSION_MAP.put(INT64, OrcUtil::convertInt64);
CONVERSION_MAP.put(MAP, OrcUtil::convertMap);
CONVERSION_MAP.put(STRING, OrcUtil::convertString);
- CONVERSION_MAP.put(STRUCT, OrcUtil::convertStruct);
}
/**
@@ -87,8 +93,8 @@ public final class OrcUtil {
* @return the struct object
*/
@SuppressWarnings("unchecked")
- public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) {
- SettableStructObjectInspector oi = (SettableStructObjectInspector)
+ public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object[] objs) {
+ SettableStructObjectInspector oi = (SettableStructObjectInspector)
OrcStruct.createObjectInspector(typeInfo);
List fields = (List) oi.getAllStructFieldRefs();
@@ -107,22 +113,31 @@ public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) {
* @param struct the struct to convert
* @return the struct as a writable array
*/
- public static Object[] convertStruct(Struct struct) {
+ public static Object[] convertStruct(TypeInfo typeInfo, Struct struct) {
List data = new LinkedList<>();
for (Field field : struct.schema().fields()) {
if (struct.get(field) == null) {
data.add(null);
} else {
Schema.Type schemaType = field.schema().type();
- data.add(CONVERSION_MAP.get(schemaType).apply(struct, field));
+ if (STRUCT.equals(schemaType)) {
+ data.add(convertStruct(typeInfo, struct, field));
+ } else {
+ data.add(CONVERSION_MAP.get(schemaType).apply(struct, field));
+ }
}
}
return data.toArray();
}
- private static Object convertStruct(Struct struct, Field field) {
- return convertStruct(struct.getStruct(field.name()));
+ private static Object convertStruct(TypeInfo typeInfo, Struct struct, Field field) {
+ TypeInfo fieldTypeInfo = ((StructTypeInfo) typeInfo).getStructFieldTypeInfo(field.name());
+
+ return createOrcStruct(
+ fieldTypeInfo,
+ convertStruct(fieldTypeInfo, struct.getStruct(field.name()))
+ );
}
private static Object convertArray(Struct struct, Field field) {
@@ -134,6 +149,12 @@ private static Object convertBoolean(Struct struct, Field field) {
}
private static Object convertBytes(Struct struct, Field field) {
+
+ if (Decimal.LOGICAL_NAME.equals(field.schema().name())) {
+ BigDecimal bigDecimal = (BigDecimal) struct.get(field.name());
+ return new HiveDecimalWritable(HiveDecimal.create(bigDecimal));
+ }
+
return new BytesWritable(struct.getBytes(field.name()));
}
@@ -162,7 +183,7 @@ private static Object convertInt32(Struct struct, Field field) {
if (Time.LOGICAL_NAME.equals(field.schema().name())) {
java.util.Date date = (java.util.Date) struct.get(field);
- return new TimestampWritable(new java.sql.Timestamp(date.getTime()));
+ return new IntWritable((int) date.getTime());
}
return new IntWritable(struct.getInt32(field.name()));
diff --git a/src/main/java/io/confluent/connect/hdfs/schema/HiveSchemaConverterWithLogicalTypes.java b/src/main/java/io/confluent/connect/hdfs/schema/HiveSchemaConverterWithLogicalTypes.java
new file mode 100644
index 000000000..c7eacfe7d
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/schema/HiveSchemaConverterWithLogicalTypes.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.hdfs.schema;
+
+import io.confluent.connect.storage.hive.HiveSchemaConverter;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Timestamp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HiveSchemaConverterWithLogicalTypes {
+
+ public static TypeInfo convert(Schema schema) {
+ // TODO: throw an error on recursive types
+ switch (schema.type()) {
+ case STRUCT:
+ return convertStruct(schema);
+ case ARRAY:
+ return convertArray(schema);
+ case MAP:
+ return convertMap(schema);
+ default:
+ return convertPrimitive(schema);
+ }
+ }
+
+ public static TypeInfo convertStruct(Schema schema) {
+ final List fields = schema.fields();
+ final List names = new ArrayList<>(fields.size());
+ final List types = new ArrayList<>(fields.size());
+ for (Field field : fields) {
+ names.add(field.name());
+ types.add(convert(field.schema()));
+ }
+ return TypeInfoFactory.getStructTypeInfo(names, types);
+ }
+
+ public static TypeInfo convertArray(Schema schema) {
+ return TypeInfoFactory.getListTypeInfo(convert(schema.valueSchema()));
+ }
+
+ public static TypeInfo convertMap(Schema schema) {
+ return TypeInfoFactory.getMapTypeInfo(
+ convert(schema.keySchema()),
+ convert(schema.valueSchema())
+ );
+ }
+
+ public static TypeInfo convertPrimitive(Schema schema) {
+ if (schema.name() != null) {
+ switch (schema.name()) {
+ case Date.LOGICAL_NAME:
+ return TypeInfoFactory.dateTypeInfo;
+ case Timestamp.LOGICAL_NAME:
+ return TypeInfoFactory.timestampTypeInfo;
+ // NOTE: We currently leave TIME values as INT32 (the default).
+ // Converting to a STRING would be ok too.
+ // Sadly, writing as INTERVAL is unsupported in the kafka-connect library.
+ // See: org.apache.hadoop.hive.ql.io.orc.WriterImpl - INTERVAL is missing
+ //case Time.LOGICAL_NAME:
+ // return TypeInfoFactory.intervalDayTimeTypeInfo;
+ default:
+ break;
+ }
+ }
+
+ // HiveSchemaConverter converts primitives just fine, just not all logical-types.
+ return HiveSchemaConverter.convertPrimitiveMaybeLogical(schema);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/confluent/connect/hdfs/orc/DataWriterOrcTest.java b/src/test/java/io/confluent/connect/hdfs/orc/DataWriterOrcTest.java
index 778a0b9d8..a65115949 100644
--- a/src/test/java/io/confluent/connect/hdfs/orc/DataWriterOrcTest.java
+++ b/src/test/java/io/confluent/connect/hdfs/orc/DataWriterOrcTest.java
@@ -19,7 +19,7 @@
import io.confluent.connect.hdfs.DataWriter;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.TestWithMiniDFSCluster;
-import io.confluent.connect.storage.hive.HiveSchemaConverter;
+import io.confluent.connect.hdfs.schema.HiveSchemaConverterWithLogicalTypes;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -80,7 +80,7 @@ protected void verifyContents(List expectedRecords, int startIndex,
expectedRecords.get(startIndex++).value(),
expectedSchema);
- TypeInfo typeInfo = HiveSchemaConverter.convert(expectedSchema);
+ TypeInfo typeInfo = HiveSchemaConverterWithLogicalTypes.convert(expectedSchema);
ArrayList objs = new ArrayList<>();
for (Field field : expectedSchema.fields()) {