diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java index 228360c7..700e219e 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java @@ -384,7 +384,8 @@ public void submitQuery( long initialCredit, int bindCount, long bindPayloadPtr, - long bindPayloadLen + long bindPayloadLen, + long queryFlags ) throws InterruptedException { pendingRequest.sql = sql; pendingRequest.requestId = requestId; @@ -392,6 +393,7 @@ public void submitQuery( pendingRequest.bindCount = bindCount; pendingRequest.bindPayloadPtr = bindPayloadPtr; pendingRequest.bindPayloadLen = bindPayloadLen; + pendingRequest.queryFlags = queryFlags; requests.put(pendingRequest); } @@ -715,6 +717,11 @@ private void sendQueryRequest(QueryRequest req) { if (req.bindCount > 0 && req.bindPayloadLen > 0) { sendScratch.putBlockOfBytes(req.bindPayloadPtr, req.bindPayloadLen); } + // Optional query_flags trailer; omitted when zero so a baseline frame + // stays byte-identical and the server defaults the flags to 0. + if (req.queryFlags != 0) { + sendScratch.putVarint(req.queryFlags); + } wsClient.sendBinary(sendScratch.getBufferPtr(), sendScratch.getPosition()); sendScratch.reset(); } @@ -773,6 +780,7 @@ private static final class QueryRequest { long bindPayloadLen; long bindPayloadPtr; long initialCredit; + long queryFlags; long requestId; CharSequence sql; } diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java index 63d1fd43..eddd08dc 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java @@ -38,6 +38,13 @@ public final class QwpEgressMsgKind { */ public static final byte CACHE_RESET = 0x17; public static final byte CANCEL = 0x14; + /** + * {@code SERVER_INFO.capabilities} bit: the server parses the optional + * {@code query_flags:varint} trailer on {@code QUERY_REQUEST}. The client + * appends the trailer only when this bit is set. Mirrors the server-side + * constant {@code io.questdb.cutlass.qwp.codec.QwpEgressMsgKind#CAP_QUERY_FLAGS}. + */ + public static final int CAP_QUERY_FLAGS = 0x00000002; /** * {@code SERVER_INFO.capabilities} bit advertising that the frame ends with * an additional {@code zone_id:u16_len+utf8} field after {@code node_id}. @@ -54,6 +61,12 @@ public final class QwpEgressMsgKind { */ public static final byte EXEC_DONE = 0x16; public static final byte QUERY_ERROR = 0x13; + /** + * {@code QUERY_REQUEST.query_flags} bit: reset the connection-scoped SYMBOL + * dict before this query, scoping it to the query. Sent only when the server + * advertised {@link #CAP_QUERY_FLAGS}. + */ + public static final byte QUERY_FLAG_RESET_DICT = 0x01; public static final byte QUERY_REQUEST = 0x10; /** * Reset mask bit: clear the connection-scoped SYMBOL dict. diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java index 1706401e..d7e82982 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java @@ -789,7 +789,18 @@ public synchronized void connect() { * before the reset should be discarded by the handler. */ public void execute(CharSequence sql, QwpColumnBatchHandler handler) { - execute(sql, null, handler); + execute(sql, null, handler, false); + } + + /** + * As {@link #execute(CharSequence, QwpColumnBatchHandler)}, but when + * {@code resetSymbolDict} is set the server resets the connection-scoped + * SYMBOL dict before this query, scoping the dict to this query's symbols. + * The flag reaches the wire only when the server advertised + * {@link QwpEgressMsgKind#CAP_QUERY_FLAGS}; otherwise it is silently ignored. + */ + public void execute(CharSequence sql, QwpColumnBatchHandler handler, boolean resetSymbolDict) { + execute(sql, null, handler, resetSymbolDict); } /** @@ -804,6 +815,17 @@ public void execute(CharSequence sql, QwpColumnBatchHandler handler) { * defeats this reuse. */ public void execute(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler handler) { + execute(sql, binds, handler, false); + } + + /** + * As {@link #execute(CharSequence, QwpBindSetter, QwpColumnBatchHandler)}, + * but when {@code resetSymbolDict} is set the server resets the + * connection-scoped SYMBOL dict before this query, scoping the dict to this + * query's symbols. The flag reaches the wire only when the server advertised + * {@link QwpEgressMsgKind#CAP_QUERY_FLAGS}; otherwise it is silently ignored. + */ + public void execute(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler handler, boolean resetSymbolDict) { if (!executing.compareAndSet(false, true)) { throw new IllegalStateException( "QwpQueryClient.execute called while another execute is in flight; one query at a time per client"); @@ -815,7 +837,7 @@ public void execute(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler // is intentionally NOT cleared inside executeOnce(). pendingCancel = false; try { - executeImpl(sql, binds, handler); + executeImpl(sql, binds, handler, resetSymbolDict); } finally { executing.set(false); } @@ -1392,7 +1414,7 @@ private void connectToEndpoint(Endpoint ep) { } } - private void executeImpl(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler handler) { + private void executeImpl(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler handler, boolean resetSymbolDict) { if (closedFlag.get()) { throw new IllegalStateException("QwpQueryClient is closed"); } @@ -1416,7 +1438,7 @@ private void executeImpl(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHa while (true) { attempt++; FailoverProbeHandler probe = new FailoverProbeHandler(handler); - executeOnce(sql, binds, probe); + executeOnce(sql, binds, probe, resetSymbolDict); if (!probe.transportFailureIntercepted) { return; } @@ -1509,7 +1531,7 @@ private void executeImpl(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHa * the user's handler in a {@link FailoverProbeHandler} so that the outer * loop can intercept transport failures before they reach the user. */ - private void executeOnce(CharSequence sql, QwpBindSetter binds, FailoverProbeHandler probe) { + private void executeOnce(CharSequence sql, QwpBindSetter binds, FailoverProbeHandler probe, boolean resetSymbolDict) { // Cache the I/O thread reference at entry: close() may null the field while // we are inside this loop, so reading the field per-iteration would NPE // exactly when the user is mid-execute() and close() races. The queue and @@ -1553,7 +1575,8 @@ private void executeOnce(CharSequence sql, QwpBindSetter binds, FailoverProbeHan io.requestCancel(requestId); } try { - io.submitQuery(sql, requestId, initialCreditBytes, bindValues.count(), bindValues.bufferPtr(), bindValues.bufferLen()); + io.submitQuery(sql, requestId, initialCreditBytes, bindValues.count(), bindValues.bufferPtr(), bindValues.bufferLen(), + resolveQueryFlags(resetSymbolDict)); while (true) { QueryEvent ev = io.takeEvent(); try { @@ -1744,6 +1767,16 @@ private void reconnectViaTracker() { + ", lastError=" + (lastError == null ? "" : lastError.getMessage()) + ']'); } + private long resolveQueryFlags(boolean resetSymbolDict) { + if (!resetSymbolDict) { + return 0L; + } + QwpServerInfo info = serverInfo; + return info != null && (info.getCapabilities() & QwpEgressMsgKind.CAP_QUERY_FLAGS) != 0 + ? QwpEgressMsgKind.QUERY_FLAG_RESET_DICT + : 0L; + } + private void runUpgradeWithTimeout(Endpoint ep) { int timeoutMs = (int) Math.min(authTimeoutMs, Integer.MAX_VALUE); try { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientQueryFlagsTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientQueryFlagsTest.java new file mode 100644 index 00000000..07a1643c --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpQueryClientQueryFlagsTest.java @@ -0,0 +1,210 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client; + +import io.questdb.client.cutlass.qwp.client.QwpColumnBatch; +import io.questdb.client.cutlass.qwp.client.QwpColumnBatchHandler; +import io.questdb.client.cutlass.qwp.client.QwpEgressMsgKind; +import io.questdb.client.cutlass.qwp.client.QwpQueryClient; +import io.questdb.client.cutlass.qwp.protocol.QwpConstants; +import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Wire coverage for the per-query {@code query_flags} trailer that + * {@link QwpQueryClient#execute(CharSequence, QwpColumnBatchHandler, boolean)} + * adds. The mock server captures the raw {@code QUERY_REQUEST} the client emits + * and replies with an {@code EXEC_DONE} so {@code execute} returns; the test + * then inspects the captured bytes. Pins: + * + * The decode -> dict-reset effect on the server, and the end-to-end honouring + * of the flag, are covered against a live server in the questdb repo + * (QwpEgressQueryFlagsResetTest, QwpEgressCacheResetWireTest). + */ +public class QwpQueryClientQueryFlagsTest { + + private static final QwpColumnBatchHandler NOOP_HANDLER = new QwpColumnBatchHandler() { + @Override + public void onBatch(QwpColumnBatch batch) { + } + + @Override + public void onEnd(long totalRows) { + } + + @Override + public void onError(byte status, String message) { + } + }; + + @Test + public void testFlagDroppedWhenServerLacksCapability() throws Exception { + byte[] frame = runAndCapture(0, c -> c.execute("SELECT 1", NOOP_HANDLER, true)); + Assert.assertEquals("flag must be dropped when the server does not advertise CAP_QUERY_FLAGS", + -1L, queryFlagsTrailer(frame)); + } + + @Test + public void testLegacyThreeArgOverloadSendsNoTrailer() throws Exception { + byte[] frame = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", null, NOOP_HANDLER)); + Assert.assertEquals(-1L, queryFlagsTrailer(frame)); + } + + @Test + public void testLegacyTwoArgOverloadSendsNoTrailer() throws Exception { + byte[] frame = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", NOOP_HANDLER)); + Assert.assertEquals(-1L, queryFlagsTrailer(frame)); + } + + @Test + public void testResetFlagAppendsTrailerWhenCapabilityAdvertised() throws Exception { + byte[] frame = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", NOOP_HANDLER, true)); + Assert.assertEquals((long) QwpEgressMsgKind.QUERY_FLAG_RESET_DICT, queryFlagsTrailer(frame)); + } + + @Test + public void testResetTrailerIsTheOnlyDifferenceFromBaseline() throws Exception { + byte[] off = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", NOOP_HANDLER, false)); + byte[] on = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", NOOP_HANDLER, true)); + // request_id increments per execute; both run on a fresh client so they + // already match, but zero it to keep the comparison about the trailer. + zeroRequestId(off); + zeroRequestId(on); + Assert.assertEquals("reset frame must be the baseline plus a one-byte trailer", off.length + 1, on.length); + Assert.assertArrayEquals("baseline bytes must be untouched", off, Arrays.copyOf(on, off.length)); + Assert.assertEquals(QwpEgressMsgKind.QUERY_FLAG_RESET_DICT, on[on.length - 1]); + } + + private static byte[] buildExecDone(byte[] queryRequest) { + int bodyLen = 1 + 8 + 1 + 1; // msg_kind + request_id + op_type + rows_affected varint + byte[] frame = new byte[QwpConstants.HEADER_SIZE + bodyLen]; + ByteBuffer bb = ByteBuffer.wrap(frame).order(ByteOrder.LITTLE_ENDIAN); + bb.put((byte) 'Q').put((byte) 'W').put((byte) 'P').put((byte) '1'); + bb.put((byte) 1); // version + bb.put((byte) 0); // flags + bb.putShort((short) 0); // table_count + bb.putInt(bodyLen); // payload_length + bb.put(QwpEgressMsgKind.EXEC_DONE); + bb.put(queryRequest, 1, 8); // echo request_id verbatim + bb.put((byte) 0); // op_type + bb.put((byte) 0); // rows_affected = 0 + return frame; + } + + /** + * Parses the {@code QUERY_REQUEST} the client emitted and returns the + * decoded {@code query_flags} trailer, or {@code -1} when no trailer was + * appended. Assumes no binds (the tests use none). + */ + private static long queryFlagsTrailer(byte[] f) { + Assert.assertTrue("captured frame is too short", f.length > 1 + 8); + Assert.assertEquals("captured frame must be a QUERY_REQUEST", QwpEgressMsgKind.QUERY_REQUEST, f[0]); + int[] p = {1 + 8}; // skip msg_kind + request_id + long sqlLen = readVarint(f, p); + p[0] += (int) sqlLen; + readVarint(f, p); // initial_credit + long bindCount = readVarint(f, p); + Assert.assertEquals("test frames carry no binds", 0L, bindCount); + if (p[0] >= f.length) { + return -1L; + } + return readVarint(f, p); + } + + private static long readVarint(byte[] buf, int[] pos) { + long value = 0; + int shift = 0; + while (true) { + byte b = buf[pos[0]++]; + value |= (long) (b & 0x7F) << shift; + if ((b & 0x80) == 0) { + return value; + } + shift += 7; + } + } + + private static byte[] runAndCapture(int serverCapabilities, ExecuteAction action) throws Exception { + CapturingQueryServer handler = new CapturingQueryServer(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + server.setSendServerInfo(true); + server.setCapabilities(serverCapabilities); + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + try (QwpQueryClient client = QwpQueryClient.fromConfig( + "ws::addr=localhost:" + port + ";auth_timeout_ms=2000;")) { + client.connect(); + Assert.assertTrue("client must connect", client.isConnected()); + action.run(client); + } + } + byte[] frame = handler.captured.get(); + Assert.assertNotNull("server never received a QUERY_REQUEST", frame); + return frame; + } + + private static void zeroRequestId(byte[] frame) { + Arrays.fill(frame, 1, 1 + 8, (byte) 0); + } + + @FunctionalInterface + private interface ExecuteAction { + void run(QwpQueryClient client); + } + + private static final class CapturingQueryServer implements TestWebSocketServer.WebSocketServerHandler { + final AtomicReference captured = new AtomicReference<>(); + + @Override + public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { + if (data.length == 0 || data[0] != QwpEgressMsgKind.QUERY_REQUEST) { + return; + } + captured.compareAndSet(null, data); + try { + client.sendBinary(buildExecDone(data)); + } catch (IOException e) { + // best-effort: a failed reply surfaces to the client as a transport error + } + } + } +} diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java index 806d3750..417f6055 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java @@ -83,6 +83,7 @@ public class TestWebSocketServer implements Closeable { // QwpQueryClient tests enable this; ingress sender tests leave it off so their // connections carry only ACK frames. private volatile boolean sendServerInfo; + private volatile int capabilities; // When non-null the next handshake responds with HTTP 421 Misdirected // Request + X-QuestDB-Role: , mimicking a server whose // QwpServerInfoProvider reports REPLICA / PRIMARY_CATCHUP. Set after @@ -229,7 +230,13 @@ public void setSendServerInfo(boolean sendServerInfo) { this.sendServerInfo = sendServerInfo; } - private static byte[] buildServerInfoFrame(byte role) { + // Advertised SERVER_INFO capabilities. CAP_ZONE is unsupported here: the + // frame builder emits no zone_id trailer. + public void setCapabilities(int capabilities) { + this.capabilities = capabilities; + } + + private static byte[] buildServerInfoFrame(byte role, int capabilities) { byte[] clusterId = "questdb".getBytes(StandardCharsets.UTF_8); byte[] nodeId = "test-node".getBytes(StandardCharsets.UTF_8); int bodyLen = 1 + 1 + 8 + 4 + 8 + 2 + clusterId.length + 2 + nodeId.length; @@ -242,7 +249,7 @@ private static byte[] buildServerInfoFrame(byte role) { bb.put((byte) 0x18); // SERVER_INFO msg_kind bb.put(role); bb.putLong(0L); // epoch - bb.putInt(0); // capabilities (no CAP_ZONE -> no zone_id trailer) + bb.putInt(capabilities); // CAP_ZONE unsupported here -> no zone_id trailer bb.putLong(1L); // server_wall_ns (positive) bb.putShort((short) clusterId.length); bb.put(clusterId); @@ -567,7 +574,7 @@ void start() { try { if (sendServerInfo) { - sendBinary(buildServerInfoFrame(roleByte(advertisedRole))); + sendBinary(buildServerInfoFrame(roleByte(advertisedRole), capabilities)); } byte[] readBuf = new byte[8192];