Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,9 @@ public void testQueryProgress() throws Throwable {
"set hive.support.concurrency = false;\n"
+ "set hive.server2.logging.operation.level=execution;\n"
+ "select count(*) from " + tableName + ";\n";
// Check for part of log message as well as part of progress information
final String EXPECTED_PATTERN = "ELAPSED TIME";
// HS2 may log "ELAPSED TIME" (legacy); Beeline may print row timing or Driver logs "Time taken:" on stderr.
final String EXPECTED_PATTERN =
"(ELAPSED TIME|row selected \\([0-9]+\\.[0-9]+ seconds\\)|Time taken:)";
final String UNEXPECTED_PATTERN = "(?=Reducer 2\\:).*(?=Map 1\\:)";
testScriptFile(SCRIPT_TEXT, getBaseArgs(miniHS2.getBaseJdbcURL()), OutStream.ERR,
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hive.service.cli.operation.ClassicTableTypeMapping.ClassicTableTypes;
import org.apache.hive.service.cli.operation.HiveTableTypeMapping;
import org.apache.hive.service.cli.operation.TableTypeMappingFactory.TableTypeMappings;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down Expand Up @@ -128,9 +129,46 @@ public class TestJdbcDriver2 {
private static Connection con;
private static final float floatCompareDelta = 0.0001f;

/**
* Required prefix of {@link SQLTimeoutException#getMessage()} for a 1s limit. HS2 may append
* {@code ; Query ID: ...} after the base text from {@code HiveSQLException}.
*/
private static final String QUERY_TIMED_OUT_AFTER_1_SECONDS = "Query timed out after 1 seconds";

@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public final TestName testName = new TestName();

/**
* {@code SET hive.query.timeout.seconds} applies to the whole HS2 session. Tests such as
* {@link #testQueryTimeoutMessageUsesHiveConf()} must not leave a short limit on the shared
* {@link #con}, or unrelated tests will see {@link SQLTimeoutException}.
*/
@After
public void resetHiveSessionQueryTimeout() {
try {
if (con == null || con.isClosed()) {
return;
}
try (Statement st = con.createStatement()) {
st.execute("set hive.query.timeout.seconds=0s");
}
} catch (SQLException e) {
LOG.warn("Could not reset hive.query.timeout.seconds after {}", testName.getMethodName(), e);
}
}

private static void assertTimeoutMessageShowsOneSecond(String context, SQLTimeoutException e) {
String msg = e.getMessage();
assertNotNull(context + ": message should not be null", msg);
assertTrue(
context + ": should start with " + QUERY_TIMED_OUT_AFTER_1_SECONDS
+ " (HS2 may append ; Query ID: ...); actual=" + msg,
msg.startsWith(QUERY_TIMED_OUT_AFTER_1_SECONDS));
assertFalse(
"HIVE-28265: message should not claim 0 seconds: " + msg,
msg.contains("after 0 seconds"));
}

private static Connection getConnection(String prefix, String postfix) throws SQLException {
Connection con1;
String connString = "jdbc:hive2:///" + prefix + "?" + conf.getOverlayOptionsAsQueryString()
Expand Down Expand Up @@ -2661,7 +2699,8 @@ public void testQueryTimeout() throws Exception {
+ " t2 on t1.under_col = t2.under_col");
fail("Expecting SQLTimeoutException");
} catch (SQLTimeoutException e) {
assertNotNull(e);
assertTimeoutMessageShowsOneSecond(
"JDBC query timeout (1s)", e);
System.err.println(e.toString());
} catch (SQLException e) {
fail("Expecting SQLTimeoutException, but got SQLException: " + e);
Expand All @@ -2680,6 +2719,36 @@ public void testQueryTimeout() throws Exception {
stmt.close();
}

/**
* When only {@code hive.query.timeout.seconds} applies (no {@link Statement#setQueryTimeout(int)}),
* the client must still report the real limit in {@link SQLTimeoutException#getMessage()} (before
* HIVE-28265 some paths wrongly showed "after 0 seconds"). Message must begin with
* {@link #QUERY_TIMED_OUT_AFTER_1_SECONDS}; HS2 may append {@code ; Query ID: ...}.
*/
@Test
public void testQueryTimeoutMessageUsesHiveConf() throws Exception {
String udfName = SleepMsUDF.class.getName();
Statement stmt1 = con.createStatement();
stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
stmt1.close();

Statement stmt = con.createStatement();
stmt.execute("set hive.query.timeout.seconds=1s");

try {
stmt.executeQuery("select sleepMsUDF(t1.under_col, 5) as u0, t1.under_col as u1, "
+ "t2.under_col as u2 from " + tableName + " t1 join " + tableName
+ " t2 on t1.under_col = t2.under_col");
fail("Expecting SQLTimeoutException");
} catch (SQLTimeoutException e) {
assertTimeoutMessageShowsOneSecond(
"Session query timeout (1s)", e);
} catch (SQLException e) {
fail("Expecting SQLTimeoutException, but got SQLException: " + e);
}
stmt.close();
}

/**
* Test the non-null value of the Yarn ATS GUID.
* We spawn 2 threads - one running the query and
Expand Down
28 changes: 28 additions & 0 deletions jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

/**
Expand All @@ -163,6 +164,18 @@
*/
public class HiveConnection implements java.sql.Connection {
private static final Logger LOG = LoggerFactory.getLogger(HiveConnection.class);

/**
* Sentinel: no {@code SET hive.query.timeout.seconds} has been observed on this connection yet.
*/
static final long SESSION_QUERY_TIMEOUT_NOT_TRACKED = -1L;
/**
* Last effective {@code hive.query.timeout.seconds} from a client {@code SET} (seconds), or
* {@link #SESSION_QUERY_TIMEOUT_NOT_TRACKED}. A JDBC {@code Connection} may be shared across threads
* with concurrent {@link org.apache.hive.jdbc.HiveStatement}s on one HS2 session; this field uses an
* {@link AtomicLong} so updates remain well-defined (last SET wins).
*/
private final AtomicLong sessionQueryTimeoutSeconds = new AtomicLong(SESSION_QUERY_TIMEOUT_NOT_TRACKED);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud: I wonder if a connection can have concurrency issue: I mean, you can have multiple individual connections to Hive, but inside a connection itself, can we have multiple hive statements in parallel?
I have no such use case in my mind, but let me ping Ayush about this question.

@ayushtkn , what do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a single JDBC Connection can be shared across multiple threads, and it is entirely possible to have multiple HiveStatement objects executing concurrently on the same connection (which maps to a single session on the HS2 side).

via Beeline or so maybe not but In Hive Server 2 (HS2), a single JDBC Connection corresponds to a single HS2 Session. You can absolutely execute multiple queries concurrently within the same session by spawning multiple threads on the client side, each using a different HiveStatement created from that single HiveConnection.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx

private String jdbcUriString;
private String host;
private int port;
Expand Down Expand Up @@ -190,6 +203,21 @@ public class HiveConnection implements java.sql.Connection {

public TCLIService.Iface getClient() { return client; }

/**
* Sets the effective {@code hive.query.timeout.seconds} (in seconds) after a successful
* {@code SET hive.query.timeout.seconds=...} on this connection. Used for JDBC timeout messages.
*/
void setSessionQueryTimeoutSeconds(long seconds) {
sessionQueryTimeoutSeconds.set(seconds);
}

/**
* @return seconds from the last client-tracked SET, or {@link #SESSION_QUERY_TIMEOUT_NOT_TRACKED} if none
*/
long getSessionQueryTimeoutSeconds() {
return sessionQueryTimeoutSeconds.get();
}

/**
* Get all direct HiveServer2 URLs from a ZooKeeper based HiveServer2 URL
* @param zookeeperBasedHS2Url
Expand Down
152 changes: 115 additions & 37 deletions jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hive.jdbc;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hive.jdbc.logs.InPlaceUpdateStream;
import org.apache.hive.service.cli.RowSet;
Expand Down Expand Up @@ -57,6 +58,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.hadoop.hive.ql.ErrorMsg.CLIENT_POLLING_OPSTATUS_INTERRUPTED;

Expand All @@ -70,6 +74,10 @@ public class HiveStatement implements java.sql.Statement {

public static final String QUERY_CANCELLED_MESSAGE = "Query was cancelled.";

/** Last assignment wins if multiple appear (e.g. multi-line script). Uses find(), not full-string match. */
private static final Pattern SET_HIVE_QUERY_TIMEOUT_SECONDS = Pattern.compile(
"(?i)set\\s+hive\\.query\\.timeout\\.seconds\\s*=\\s*([^;\\n]+)");

private final HiveConnection connection;
private TCLIService.Iface client;
private Optional<TOperationHandle> stmtHandle;
Expand Down Expand Up @@ -298,6 +306,7 @@ public void closeOnCompletion() throws SQLException {
public boolean execute(String sql) throws SQLException {
runAsyncOnServer(sql);
TGetOperationStatusResp status = waitForOperationToComplete();
trackSessionQueryTimeoutIfSet(sql);

// The query should be completed by now
if (!status.isHasResultSet() && stmtHandle.isPresent() && !stmtHandle.get().isHasResultSet()) {
Expand Down Expand Up @@ -398,20 +407,120 @@ private TGetOperationStatusResp waitForResultSetStatus() throws SQLException {
return statusResp;
}

/**
* When {@code SET hive.query.timeout.seconds=...} succeeds, remember the effective value on the
* connection so {@code TIMEDOUT_STATE} can report it if the server omits {@code errorMessage}
* (HIVE-28265).
*/
private void trackSessionQueryTimeoutIfSet(String sql) {
if (sql == null) {
return;
}
Matcher m = SET_HIVE_QUERY_TIMEOUT_SECONDS.matcher(sql);
Long lastSec = null;
while (m.find()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this PR interesting.

Unfortunately, I have no time to finish this review as I go for a long vacation.

But this part made me suspicious as I'm pretty sure we usually don't read Hive this way.
So, I read the Jira ticket itself: https://issues.apache.org/jira/browse/HIVE-28265

My fault, I should start with this one.
Actually, the ticket says the feature itself is already working but we get a wrong error message.

Just thinking out loud:

As I see HiveStatement doesn't contain any reference to Hive Configuration. Creating a hiveConf object is not a top of my mind but I affraid with this way you ignore the actual HiveConf loaded in the Hive Server session. I'm sad that I have no time to debug it out but for me, it looks suspicious.
I bet Hive already has it's method to read the SET ... commands out. As you can see, we have no such (or similar) parsing logic to read the Hive settings but still, if you run a set command, you can easily read the value from HiveConf. It would worth doing a debug session and figuring out how Hive exactly handles session level configurations.
Based on that I would say maybe there is a place where HiveStatement.setQueryTimeout should be called but it is not.

Anyway, good luck with the PR. If you have still have open questions at the end of the next week, I would be happy to help and learn this part of the code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @InvisibleProgrammer This was reproducable.

Could you please let me know any setting or runtime configs, if you feel can fix the issue?

Could you also please let me know, how to refactor? As I am new to the community

I'm sad that I have no time to debug it out but for me, it looks suspicious.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for pointing at HIVE-28265 first — we stayed focused on the wrong error message, not changing how timeout itself works.

What we changed in this update

  1. HiveStatement — We no longer strip the ; Query ID: … suffix from the timeout string. When the server returns a usable message, the JDBC client passes it through as HS2/HiveSQLException format it (same as you see in Beeline). The HIVE-28265 logic is unchanged for the broken paths: empty error text or “after 0 seconds”, where we still derive the text from Statement#setQueryTimeout and the last SET hive.query.timeout.seconds tracked on the connection.

  2. TestJdbcDriver2 — Assertions now require the message to start with Query timed out after 1 seconds and not contain after 0 seconds, instead of matching the entire string exactly. That matches real HS2 output (…; Query ID: …) and still guards the original bug.

try {
HiveConf conf = new HiveConf();
conf.set(HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS.varname, m.group(1).trim());
long sec = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
lastSec = sec;
} catch (Exception e) {
LOG.debug("Could not parse session query timeout fragment: {}", m.group(0), e);
}
}
if (lastSec != null) {
connection.setSessionQueryTimeoutSeconds(lastSec);
}
}

/**
* HIVE-28265: Prefer server error text unless it is empty or the known-broken "0 seconds" case;
* otherwise derive seconds from JDBC {@link #setQueryTimeout(int)} or last session SET.
*/
private String sqlTimeoutMessageForTimedOutState(String serverMessage) {
if (!needsLocalTimeoutMessageForTimedOut(serverMessage)) {
return serverMessage;
}
long effectiveSec = resolveEffectiveTimeoutSecondsForMessage();
if (effectiveSec > 0) {
return "Query timed out after " + effectiveSec + " seconds";
}
return "Query timed out";
}

private boolean needsLocalTimeoutMessageForTimedOut(String timeoutMsg) {
return StringUtils.isBlank(timeoutMsg)
|| StringUtils.containsIgnoreCase(timeoutMsg, "after 0 seconds");
}

private long resolveEffectiveTimeoutSecondsForMessage() {
if (queryTimeout > 0) {
return queryTimeout;
}
long tracked = connection.getSessionQueryTimeoutSeconds();
if (tracked > 0) {
return tracked;
}
return 0L;
}

private SQLException sqlExceptionForCanceledState(TGetOperationStatusResp statusResp) {
final String errMsg = statusResp.getErrorMessage();
final String fullErrMsg;
if (errMsg == null || errMsg.isEmpty()) {
fullErrMsg = QUERY_CANCELLED_MESSAGE;
} else {
fullErrMsg = QUERY_CANCELLED_MESSAGE + " " + errMsg;
}
return new SQLException(fullErrMsg, "01000");
}

/**
* One GetOperationStatus response: progress update, Thrift status check, then terminal states.
* Extracted to keep {@link #waitForOperationToComplete()} smaller for static analysis (Sonar).
*/
private void processOperationStatusResponse(TGetOperationStatusResp statusResp) throws SQLException {
if (!isOperationComplete && inPlaceUpdateStream.isPresent()) {
inPlaceUpdateStream.get().update(statusResp.getProgressUpdateResponse());
}
Utils.verifySuccessWithInfo(statusResp.getStatus());
if (!statusResp.isSetOperationState()) {
return;
}
switch (statusResp.getOperationState()) {
case CLOSED_STATE:
case FINISHED_STATE:
isOperationComplete = true;
isLogBeingGenerated = false;
break;
case CANCELED_STATE:
throw sqlExceptionForCanceledState(statusResp);
case TIMEDOUT_STATE:
throw new SQLTimeoutException(sqlTimeoutMessageForTimedOutState(statusResp.getErrorMessage()));
case ERROR_STATE:
throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), statusResp.getErrorCode());
case UKNOWN_STATE:
throw new SQLException("Unknown query", "HY000");
case INITIALIZED_STATE:
case PENDING_STATE:
case RUNNING_STATE:
break;
}
}

TGetOperationStatusResp waitForOperationToComplete() throws SQLException {
TGetOperationStatusResp statusResp = null;

final TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle.get());
statusReq.setGetProgressUpdate(inPlaceUpdateStream.isPresent());
boolean progressUpdates = inPlaceUpdateStream.isPresent();
statusReq.setGetProgressUpdate(progressUpdates);

// Progress bar is completed if there is nothing to request
if (inPlaceUpdateStream.isPresent()) {
if (progressUpdates) {
inPlaceUpdateStream.get().getEventNotifier().progressBarCompleted();
}

LOG.debug("Waiting on operation to complete: Polling operation status");

// Poll on the operation status, till the operation is complete
do {
try {
if (Thread.currentThread().isInterrupted()) {
Expand All @@ -424,37 +533,7 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException {
*/
statusResp = client.GetOperationStatus(statusReq);
LOG.debug("Status response: {}", statusResp);
if (!isOperationComplete && inPlaceUpdateStream.isPresent()) {
inPlaceUpdateStream.get().update(statusResp.getProgressUpdateResponse());
}
Utils.verifySuccessWithInfo(statusResp.getStatus());
if (statusResp.isSetOperationState()) {
switch (statusResp.getOperationState()) {
case CLOSED_STATE:
case FINISHED_STATE:
isOperationComplete = true;
isLogBeingGenerated = false;
break;
case CANCELED_STATE:
// 01000 -> warning
final String errMsg = statusResp.getErrorMessage();
final String fullErrMsg =
(errMsg == null || errMsg.isEmpty()) ? QUERY_CANCELLED_MESSAGE : QUERY_CANCELLED_MESSAGE + " " + errMsg;
throw new SQLException(fullErrMsg, "01000");
case TIMEDOUT_STATE:
throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds");
case ERROR_STATE:
// Get the error details from the underlying exception
throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(),
statusResp.getErrorCode());
case UKNOWN_STATE:
throw new SQLException("Unknown query", "HY000");
case INITIALIZED_STATE:
case PENDING_STATE:
case RUNNING_STATE:
break;
}
}
processOperationStatusResponse(statusResp);
} catch (SQLException e) {
isLogBeingGenerated = false;
throw e;
Expand All @@ -464,8 +543,7 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException {
}
} while (!isOperationComplete);

// set progress bar to be completed when hive query execution has completed
if (inPlaceUpdateStream.isPresent()) {
if (progressUpdates) {
inPlaceUpdateStream.get().getEventNotifier().progressBarCompleted();
}
return statusResp;
Expand Down
2 changes: 1 addition & 1 deletion ql/src/test/queries/clientpositive/llap_io_cache.q
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ TBLPROPERTIES (
INSERT INTO TABLE tbl_parq
SELECT
1 AS id,
RPAD('x', 16777177, 'x') AS payload;
RPAD('x', 8388608, 'x') AS payload;

SELECT LENGTH(payload) FROM tbl_parq;

Expand Down
Loading
Loading