Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -384,14 +384,16 @@ public void submitQuery(
long initialCredit,
int bindCount,
long bindPayloadPtr,
long bindPayloadLen
long bindPayloadLen,
long queryFlags
) throws InterruptedException {
pendingRequest.sql = sql;
pendingRequest.requestId = requestId;
pendingRequest.initialCredit = initialCredit;
pendingRequest.bindCount = bindCount;
pendingRequest.bindPayloadPtr = bindPayloadPtr;
pendingRequest.bindPayloadLen = bindPayloadLen;
pendingRequest.queryFlags = queryFlags;
requests.put(pendingRequest);
}

Expand Down Expand Up @@ -715,6 +717,11 @@ private void sendQueryRequest(QueryRequest req) {
if (req.bindCount > 0 && req.bindPayloadLen > 0) {
sendScratch.putBlockOfBytes(req.bindPayloadPtr, req.bindPayloadLen);
}
// Optional query_flags trailer; omitted when zero so a baseline frame
// stays byte-identical and the server defaults the flags to 0.
if (req.queryFlags != 0) {
sendScratch.putVarint(req.queryFlags);
}
wsClient.sendBinary(sendScratch.getBufferPtr(), sendScratch.getPosition());
sendScratch.reset();
}
Expand Down Expand Up @@ -773,6 +780,7 @@ private static final class QueryRequest {
long bindPayloadLen;
long bindPayloadPtr;
long initialCredit;
long queryFlags;
long requestId;
CharSequence sql;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public final class QwpEgressMsgKind {
*/
public static final byte CACHE_RESET = 0x17;
public static final byte CANCEL = 0x14;
/**
* {@code SERVER_INFO.capabilities} bit: the server parses the optional
* {@code query_flags:varint} trailer on {@code QUERY_REQUEST}. The client
* appends the trailer only when this bit is set. Mirrors the server-side
* constant {@code io.questdb.cutlass.qwp.codec.QwpEgressMsgKind#CAP_QUERY_FLAGS}.
*/
public static final int CAP_QUERY_FLAGS = 0x00000002;
/**
* {@code SERVER_INFO.capabilities} bit advertising that the frame ends with
* an additional {@code zone_id:u16_len+utf8} field after {@code node_id}.
Expand All @@ -54,6 +61,12 @@ public final class QwpEgressMsgKind {
*/
public static final byte EXEC_DONE = 0x16;
public static final byte QUERY_ERROR = 0x13;
/**
* {@code QUERY_REQUEST.query_flags} bit: reset the connection-scoped SYMBOL
* dict before this query, scoping it to the query. Sent only when the server
* advertised {@link #CAP_QUERY_FLAGS}.
*/
public static final byte QUERY_FLAG_RESET_DICT = 0x01;
public static final byte QUERY_REQUEST = 0x10;
/**
* Reset mask bit: clear the connection-scoped SYMBOL dict.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,18 @@ public synchronized void connect() {
* before the reset should be discarded by the handler.
*/
public void execute(CharSequence sql, QwpColumnBatchHandler handler) {
execute(sql, null, handler);
execute(sql, null, handler, false);
}

/**
* As {@link #execute(CharSequence, QwpColumnBatchHandler)}, but when
* {@code resetSymbolDict} is set the server resets the connection-scoped
* SYMBOL dict before this query, scoping the dict to this query's symbols.
* The flag reaches the wire only when the server advertised
* {@link QwpEgressMsgKind#CAP_QUERY_FLAGS}; otherwise it is silently ignored.
*/
public void execute(CharSequence sql, QwpColumnBatchHandler handler, boolean resetSymbolDict) {
execute(sql, null, handler, resetSymbolDict);
}

/**
Expand All @@ -804,6 +815,17 @@ public void execute(CharSequence sql, QwpColumnBatchHandler handler) {
* defeats this reuse.
*/
public void execute(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler handler) {
execute(sql, binds, handler, false);
}

/**
* As {@link #execute(CharSequence, QwpBindSetter, QwpColumnBatchHandler)},
* but when {@code resetSymbolDict} is set the server resets the
* connection-scoped SYMBOL dict before this query, scoping the dict to this
* query's symbols. The flag reaches the wire only when the server advertised
* {@link QwpEgressMsgKind#CAP_QUERY_FLAGS}; otherwise it is silently ignored.
*/
public void execute(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler handler, boolean resetSymbolDict) {
if (!executing.compareAndSet(false, true)) {
throw new IllegalStateException(
"QwpQueryClient.execute called while another execute is in flight; one query at a time per client");
Expand All @@ -815,7 +837,7 @@ public void execute(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler
// is intentionally NOT cleared inside executeOnce().
pendingCancel = false;
try {
executeImpl(sql, binds, handler);
executeImpl(sql, binds, handler, resetSymbolDict);
} finally {
executing.set(false);
}
Expand Down Expand Up @@ -1392,7 +1414,7 @@ private void connectToEndpoint(Endpoint ep) {
}
}

private void executeImpl(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler handler) {
private void executeImpl(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHandler handler, boolean resetSymbolDict) {
if (closedFlag.get()) {
throw new IllegalStateException("QwpQueryClient is closed");
}
Expand All @@ -1416,7 +1438,7 @@ private void executeImpl(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHa
while (true) {
attempt++;
FailoverProbeHandler probe = new FailoverProbeHandler(handler);
executeOnce(sql, binds, probe);
executeOnce(sql, binds, probe, resetSymbolDict);
if (!probe.transportFailureIntercepted) {
return;
}
Expand Down Expand Up @@ -1509,7 +1531,7 @@ private void executeImpl(CharSequence sql, QwpBindSetter binds, QwpColumnBatchHa
* the user's handler in a {@link FailoverProbeHandler} so that the outer
* loop can intercept transport failures before they reach the user.
*/
private void executeOnce(CharSequence sql, QwpBindSetter binds, FailoverProbeHandler probe) {
private void executeOnce(CharSequence sql, QwpBindSetter binds, FailoverProbeHandler probe, boolean resetSymbolDict) {
// Cache the I/O thread reference at entry: close() may null the field while
// we are inside this loop, so reading the field per-iteration would NPE
// exactly when the user is mid-execute() and close() races. The queue and
Expand Down Expand Up @@ -1553,7 +1575,8 @@ private void executeOnce(CharSequence sql, QwpBindSetter binds, FailoverProbeHan
io.requestCancel(requestId);
}
try {
io.submitQuery(sql, requestId, initialCreditBytes, bindValues.count(), bindValues.bufferPtr(), bindValues.bufferLen());
io.submitQuery(sql, requestId, initialCreditBytes, bindValues.count(), bindValues.bufferPtr(), bindValues.bufferLen(),
resolveQueryFlags(resetSymbolDict));
while (true) {
QueryEvent ev = io.takeEvent();
try {
Expand Down Expand Up @@ -1744,6 +1767,16 @@ private void reconnectViaTracker() {
+ ", lastError=" + (lastError == null ? "<none>" : lastError.getMessage()) + ']');
}

private long resolveQueryFlags(boolean resetSymbolDict) {
if (!resetSymbolDict) {
return 0L;
}
QwpServerInfo info = serverInfo;
return info != null && (info.getCapabilities() & QwpEgressMsgKind.CAP_QUERY_FLAGS) != 0
? QwpEgressMsgKind.QUERY_FLAG_RESET_DICT
: 0L;
}

private void runUpgradeWithTimeout(Endpoint ep) {
int timeoutMs = (int) Math.min(authTimeoutMs, Integer.MAX_VALUE);
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*+*****************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2026 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package io.questdb.client.test.cutlass.qwp.client;

import io.questdb.client.cutlass.qwp.client.QwpColumnBatch;
import io.questdb.client.cutlass.qwp.client.QwpColumnBatchHandler;
import io.questdb.client.cutlass.qwp.client.QwpEgressMsgKind;
import io.questdb.client.cutlass.qwp.client.QwpQueryClient;
import io.questdb.client.cutlass.qwp.protocol.QwpConstants;
import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* Wire coverage for the per-query {@code query_flags} trailer that
* {@link QwpQueryClient#execute(CharSequence, QwpColumnBatchHandler, boolean)}
* adds. The mock server captures the raw {@code QUERY_REQUEST} the client emits
* and replies with an {@code EXEC_DONE} so {@code execute} returns; the test
* then inspects the captured bytes. Pins:
* <ul>
* <li>{@code resetSymbolDict=true} against a server advertising
* {@link QwpEgressMsgKind#CAP_QUERY_FLAGS} appends the
* {@link QwpEgressMsgKind#QUERY_FLAG_RESET_DICT} trailer;</li>
* <li>the trailer is the only difference from the flag-off baseline, which
* stays byte-identical;</li>
* <li>without the capability the flag is dropped (no trailer);</li>
* <li>the flag-less overloads never append a trailer.</li>
* </ul>
* The decode -&gt; dict-reset effect on the server, and the end-to-end honouring
* of the flag, are covered against a live server in the questdb repo
* (QwpEgressQueryFlagsResetTest, QwpEgressCacheResetWireTest).
*/
public class QwpQueryClientQueryFlagsTest {

private static final QwpColumnBatchHandler NOOP_HANDLER = new QwpColumnBatchHandler() {
@Override
public void onBatch(QwpColumnBatch batch) {
}

@Override
public void onEnd(long totalRows) {
}

@Override
public void onError(byte status, String message) {
}
};

@Test
public void testFlagDroppedWhenServerLacksCapability() throws Exception {
byte[] frame = runAndCapture(0, c -> c.execute("SELECT 1", NOOP_HANDLER, true));
Assert.assertEquals("flag must be dropped when the server does not advertise CAP_QUERY_FLAGS",
-1L, queryFlagsTrailer(frame));
}

@Test
public void testLegacyThreeArgOverloadSendsNoTrailer() throws Exception {
byte[] frame = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", null, NOOP_HANDLER));
Assert.assertEquals(-1L, queryFlagsTrailer(frame));
}

@Test
public void testLegacyTwoArgOverloadSendsNoTrailer() throws Exception {
byte[] frame = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", NOOP_HANDLER));
Assert.assertEquals(-1L, queryFlagsTrailer(frame));
}

@Test
public void testResetFlagAppendsTrailerWhenCapabilityAdvertised() throws Exception {
byte[] frame = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", NOOP_HANDLER, true));
Assert.assertEquals((long) QwpEgressMsgKind.QUERY_FLAG_RESET_DICT, queryFlagsTrailer(frame));
}

@Test
public void testResetTrailerIsTheOnlyDifferenceFromBaseline() throws Exception {
byte[] off = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", NOOP_HANDLER, false));
byte[] on = runAndCapture(QwpEgressMsgKind.CAP_QUERY_FLAGS, c -> c.execute("SELECT 1", NOOP_HANDLER, true));
// request_id increments per execute; both run on a fresh client so they
// already match, but zero it to keep the comparison about the trailer.
zeroRequestId(off);
zeroRequestId(on);
Assert.assertEquals("reset frame must be the baseline plus a one-byte trailer", off.length + 1, on.length);
Assert.assertArrayEquals("baseline bytes must be untouched", off, Arrays.copyOf(on, off.length));
Assert.assertEquals(QwpEgressMsgKind.QUERY_FLAG_RESET_DICT, on[on.length - 1]);
}

private static byte[] buildExecDone(byte[] queryRequest) {
int bodyLen = 1 + 8 + 1 + 1; // msg_kind + request_id + op_type + rows_affected varint
byte[] frame = new byte[QwpConstants.HEADER_SIZE + bodyLen];
ByteBuffer bb = ByteBuffer.wrap(frame).order(ByteOrder.LITTLE_ENDIAN);
bb.put((byte) 'Q').put((byte) 'W').put((byte) 'P').put((byte) '1');
bb.put((byte) 1); // version
bb.put((byte) 0); // flags
bb.putShort((short) 0); // table_count
bb.putInt(bodyLen); // payload_length
bb.put(QwpEgressMsgKind.EXEC_DONE);
bb.put(queryRequest, 1, 8); // echo request_id verbatim
bb.put((byte) 0); // op_type
bb.put((byte) 0); // rows_affected = 0
return frame;
}

/**
* Parses the {@code QUERY_REQUEST} the client emitted and returns the
* decoded {@code query_flags} trailer, or {@code -1} when no trailer was
* appended. Assumes no binds (the tests use none).
*/
private static long queryFlagsTrailer(byte[] f) {
Assert.assertTrue("captured frame is too short", f.length > 1 + 8);
Assert.assertEquals("captured frame must be a QUERY_REQUEST", QwpEgressMsgKind.QUERY_REQUEST, f[0]);
int[] p = {1 + 8}; // skip msg_kind + request_id
long sqlLen = readVarint(f, p);
p[0] += (int) sqlLen;
readVarint(f, p); // initial_credit
long bindCount = readVarint(f, p);
Assert.assertEquals("test frames carry no binds", 0L, bindCount);
if (p[0] >= f.length) {
return -1L;
}
return readVarint(f, p);
}

private static long readVarint(byte[] buf, int[] pos) {
long value = 0;
int shift = 0;
while (true) {
byte b = buf[pos[0]++];
value |= (long) (b & 0x7F) << shift;
if ((b & 0x80) == 0) {
return value;
}
shift += 7;
}
}

private static byte[] runAndCapture(int serverCapabilities, ExecuteAction action) throws Exception {
CapturingQueryServer handler = new CapturingQueryServer();
try (TestWebSocketServer server = new TestWebSocketServer(handler)) {
server.setSendServerInfo(true);
server.setCapabilities(serverCapabilities);
int port = server.getPort();
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));
try (QwpQueryClient client = QwpQueryClient.fromConfig(
"ws::addr=localhost:" + port + ";auth_timeout_ms=2000;")) {
client.connect();
Assert.assertTrue("client must connect", client.isConnected());
action.run(client);
}
}
byte[] frame = handler.captured.get();
Assert.assertNotNull("server never received a QUERY_REQUEST", frame);
return frame;
}

private static void zeroRequestId(byte[] frame) {
Arrays.fill(frame, 1, 1 + 8, (byte) 0);
}

@FunctionalInterface
private interface ExecuteAction {
void run(QwpQueryClient client);
}

private static final class CapturingQueryServer implements TestWebSocketServer.WebSocketServerHandler {
final AtomicReference<byte[]> captured = new AtomicReference<>();

@Override
public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
if (data.length == 0 || data[0] != QwpEgressMsgKind.QUERY_REQUEST) {
return;
}
captured.compareAndSet(null, data);
try {
client.sendBinary(buildExecDone(data));
} catch (IOException e) {
// best-effort: a failed reply surfaces to the client as a transport error
}
}
}
}
Loading
Loading