Skip to content

Commit 0156671

Browse files
authored
Merge pull request #220 from embulk/add_statement_timeout_to_postgre
Add statement timeout for PostgreSQL
2 parents 1b9b8ba + 6645834 commit 0156671

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

embulk-input-postgresql/src/main/java/org/embulk/input/PostgreSQLInputPlugin.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ public interface PostgreSQLPluginTask
5353
@Config("application_name")
5454
@ConfigDefault("\"embulk-input-postgresql\"")
5555
public String getApplicationName();
56+
57+
@Config("statement_timeout_millis")
58+
@ConfigDefault("null")
59+
public Optional<Integer> getStatementTimeoutMillis();
5660
}
5761

5862
@Override
@@ -96,7 +100,7 @@ protected PostgreSQLInputConnection newConnection(PluginTask task) throws SQLExc
96100

97101
Connection con = DriverManager.getConnection(url, props);
98102
try {
99-
PostgreSQLInputConnection c = new PostgreSQLInputConnection(con, t.getSchema());
103+
PostgreSQLInputConnection c = new PostgreSQLInputConnection(con, t.getSchema(), t.getStatementTimeoutMillis());
100104
con = null;
101105
return c;
102106
} finally {

embulk-input-postgresql/src/main/java/org/embulk/input/postgresql/PostgreSQLInputConnection.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package org.embulk.input.postgresql;
22

3+
import java.sql.Statement;
34
import java.util.List;
45
import java.sql.Connection;
56
import java.sql.PreparedStatement;
67
import java.sql.ResultSet;
78
import java.sql.SQLException;
9+
import java.util.Optional;
10+
811
import org.slf4j.Logger;
912
import org.slf4j.LoggerFactory;
1013
import org.embulk.input.jdbc.JdbcInputConnection;
@@ -16,10 +19,11 @@ public class PostgreSQLInputConnection
1619
{
1720
private static final Logger logger = LoggerFactory.getLogger(PostgreSQLInputConnection.class);
1821

19-
public PostgreSQLInputConnection(Connection connection, String schemaName)
22+
public PostgreSQLInputConnection(Connection connection, String schemaName, Optional<Integer> statementTimeoutMillis)
2023
throws SQLException
2124
{
2225
super(connection, schemaName);
26+
setStatementTimeoutIfSpecified(statementTimeoutMillis);
2327
}
2428

2529
@Override
@@ -76,4 +80,19 @@ public void close() throws SQLException
7680
// TODO close?
7781
}
7882
}
83+
84+
private void setStatementTimeoutIfSpecified(Optional<Integer> statementTimeoutMillis)
85+
throws SQLException
86+
{
87+
if (statementTimeoutMillis.isPresent() && statementTimeoutMillis.get() > 0) {
88+
Statement stmt = connection.createStatement();
89+
try {
90+
String sql = "SET statement_timeout TO " + quoteIdentifierString(String.valueOf(statementTimeoutMillis.get()));
91+
executeUpdate(sql);
92+
}
93+
finally {
94+
stmt.close();
95+
}
96+
}
97+
}
7998
}

embulk-input-redshift/src/main/java/org/embulk/input/RedshiftInputPlugin.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.embulk.input;
22

3+
import java.util.Optional;
34
import java.util.Properties;
45
import java.sql.Connection;
56
import java.sql.Driver;
@@ -46,6 +47,10 @@ public interface RedshiftPluginTask
4647
@Config("ssl")
4748
@ConfigDefault("false")
4849
public boolean getSsl();
50+
51+
@Config("statement_timeout_millis")
52+
@ConfigDefault("null")
53+
public Optional<Integer> getStatementTimeoutMillis();
4954
}
5055

5156
@Override
@@ -85,7 +90,7 @@ protected PostgreSQLInputConnection newConnection(PluginTask task) throws SQLExc
8590

8691
Connection con = driver.connect(url, props);
8792
try {
88-
PostgreSQLInputConnection c = new PostgreSQLInputConnection(con, t.getSchema());
93+
PostgreSQLInputConnection c = new PostgreSQLInputConnection(con, t.getSchema(), t.getStatementTimeoutMillis());
8994
con = null;
9095
return c;
9196
} finally {

0 commit comments

Comments
 (0)