Skip to content
35 changes: 27 additions & 8 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -2051,11 +2051,13 @@ public CompletableFuture<CommandResponse> execute(String sql) {
* <p>Create an instance of {@link ClickHouseBinaryFormatReader} based on response. Table schema is option and only
* required for {@link ClickHouseFormat#RowBinaryWithNames}, {@link ClickHouseFormat#RowBinary}.
* Format {@link ClickHouseFormat#RowBinaryWithDefaults} is not supported for output (read operations).</p>
* @param response
* @param schema
* @return
* @param response - not closed query response object
* @param schema - schema of the response. Can be null.
* @param customTypeMapping - type hint map
* @return Reader object for the format
* @throws IllegalArgumentException if there is no supported reader for the type
*/
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema, Map<ClickHouseDataType, Class<?>> customTypeMapping) {
ClickHouseBinaryFormatReader reader = null;
// Using caching buffer allocator is risky so this parameter is not exposed to the user
boolean useCachingBufferAllocator = MapUtils.getFlag(configuration, "client_allow_binary_reader_to_reuse_buffers", false);
Expand All @@ -2065,24 +2067,41 @@ public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response
switch (response.getFormat()) {
case Native:
reader = new NativeFormatReader(response.getInputStream(), response.getSettings(),
byteBufferPool, typeHintMapping);
byteBufferPool, customTypeMapping);
break;
case RowBinaryWithNamesAndTypes:
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(), byteBufferPool, typeHintMapping);
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(), byteBufferPool, customTypeMapping);
break;
case RowBinaryWithNames:
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema, byteBufferPool, typeHintMapping);
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema, byteBufferPool, customTypeMapping);
break;
case RowBinary:
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema,
byteBufferPool, typeHintMapping);
byteBufferPool, customTypeMapping);
break;
default:
throw new IllegalArgumentException("Binary readers doesn't support format: " + response.getFormat());
}
return reader;
}

/**
* See {@link Client#newBinaryFormatReader(QueryResponse, TableSchema, Map)}
* @param response - not closed query response object
* @param schema - schema of the response. Can be null.
* @return Reader object for the format
* @throws IllegalArgumentException if there is no supported reader for the type
*/
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
return newBinaryFormatReader(response, schema, typeHintMapping);
}

/**
* See {@link Client#newBinaryFormatReader(QueryResponse, TableSchema, Map)}
* @param response - not closed query response object
* @return Reader object for the format
* @throws IllegalArgumentException if there is no supported reader for the type
*/
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
return newBinaryFormatReader(response, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -70,14 +71,19 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm

private TableSchema schema;
private ClickHouseColumn[] columns;
private Class<?>[] columnTypeHints;
private Map[] convertions;
private Map<ClickHouseDataType, Class<?>> defaultTypeHintMap;
private boolean hasNext = true;
private boolean initialState = true; // reader is in initial state, no records have been read yet
private long row = -1; // before first row
private long lastNextCallTs; // for exception to detect slow reader

protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,BinaryStreamReader.ByteBufferAllocator byteBufferAllocator, Map<ClickHouseDataType, Class<?>> defaultTypeHintMap) {
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator,
Map<ClickHouseDataType, Class<?>> defaultTypeHintMap) {
this.input = inputStream;
this.defaultTypeHintMap = defaultTypeHintMap == null ? Collections.emptyMap() : defaultTypeHintMap;
Map<String, Object> settings = querySettings == null ? Collections.emptyMap() : querySettings.getAllSettings();
Boolean useServerTimeZone = (Boolean) settings.get(ClientConfigProperties.USE_SERVER_TIMEZONE.getKey());
TimeZone timeZone = (useServerTimeZone == Boolean.TRUE && querySettings != null) ?
Expand All @@ -89,7 +95,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
boolean jsonAsString = MapUtils.getFlag(settings,
ClientConfigProperties.serverSetting(ServerSettings.OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING), false);
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator, jsonAsString,
defaultTypeHintMap);
defaultTypeHintMap, ByteBuffer::allocate);
if (schema != null) {
setSchema(schema);
}
Expand Down Expand Up @@ -188,7 +194,7 @@ protected boolean readRecord(Object[] record) throws IOException {
boolean firstColumn = true;
for (int i = 0; i < columns.length; i++) {
try {
Object val = binaryStreamReader.readValue(columns[i]);
Object val = binaryStreamReader.readValue(columns[i], columnTypeHints[i]);
if (val != null) {
record[i] = val;
} else {
Expand All @@ -212,13 +218,18 @@ public <T> T readValue(int colIndex) {
if (colIndex < 1 || colIndex > getSchema().getColumns().size()) {
throw new ClientException("Column index out of bounds: " + colIndex);
}
return (T) currentRecord[colIndex - 1];

T value = (T) currentRecord[colIndex - 1];
if (value instanceof BinaryString) {
return (T) ((BinaryString) value).asString();
}
return value;
}

@SuppressWarnings("unchecked")
@Override
public <T> T readValue(String colName) {
return (T) currentRecord[getSchema().nameToIndex(colName)];
return readValue(getSchema().nameToColumnIndex(colName));
}

@Override
Expand Down Expand Up @@ -300,16 +311,21 @@ protected void setSchema(TableSchema schema) {
this.schema = schema;
this.columns = schema.getColumns().toArray(ClickHouseColumn.EMPTY_ARRAY);
this.convertions = new Map[columns.length];

this.columnTypeHints = new Class[columns.length];
this.currentRecord = new Object[columns.length];
this.nextRecord = new Object[columns.length];

Class<?> stringTypeHint = defaultTypeHintMap.get(ClickHouseDataType.String);

for (int i = 0; i < columns.length; i++) {
ClickHouseColumn column = columns[i];
ClickHouseDataType columnDataType = column.getDataType();
if (columnDataType.equals(ClickHouseDataType.SimpleAggregateFunction)){
columnDataType = column.getNestedColumns().get(0).getDataType();
}
if (columnDataType.equals(ClickHouseDataType.String)) {
columnTypeHints[i] = stringTypeHint;
}
switch (columnDataType) {
case Int8:
case Int16:
Expand Down Expand Up @@ -530,9 +546,11 @@ private <T> T getPrimitiveArray(int index, Class<?> componentType) {
for (int i = 0; i < list.size(); i++) {
Array.set(array, i, list.get(i));
}
return (T)array;
return (T) array;
} else if (componentType == byte.class) {
if (value instanceof String) {
if (value instanceof BinaryString) {
return (T) ((BinaryString)value).asBytes();
} else if(value instanceof String) {
return (T) ((String) value).getBytes(StandardCharsets.UTF_8);
} else if (value instanceof InetAddress) {
return (T) ((InetAddress) value).getAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.math.BigInteger;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -33,6 +35,7 @@
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.function.Function;

/**
* This class is not thread safe and should not be shared between multiple threads.
Expand All @@ -51,6 +54,8 @@

private final ByteBufferAllocator bufferAllocator;

private final StringBufferAllocator stringBufferAllocator;

private final boolean jsonAsString;

private final Class<?> arrayDefaultTypeHint;
Expand All @@ -69,11 +74,17 @@
* @param jsonAsString - use string to serialize/deserialize JSON columns
* @param typeHintMapping - what type use as hint if hint is not set or may not be known.
*/
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log, ByteBufferAllocator bufferAllocator, boolean jsonAsString, Map<ClickHouseDataType, Class<?>> typeHintMapping) {
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log,
ByteBufferAllocator bufferAllocator,
boolean jsonAsString,
Map<ClickHouseDataType,
Class<?>> typeHintMapping,
StringBufferAllocator stringBufferAllocator) {
this.log = log == null ? NOPLogger.NOP_LOGGER : log;
this.timeZone = timeZone;
this.input = input;
this.bufferAllocator = bufferAllocator;
this.stringBufferAllocator = stringBufferAllocator;
this.jsonAsString = jsonAsString;

this.arrayDefaultTypeHint = typeHintMapping == null ||
Expand Down Expand Up @@ -121,13 +132,19 @@
switch (dataType) {
// Primitives
case FixedString: {
byte[] bytes = precision > STRING_BUFF.length ?
new byte[precision] : STRING_BUFF;
readNBytes(input, bytes, 0, precision);
return (T) new String(bytes, 0, precision, StandardCharsets.UTF_8);
if (typeHint == BinaryString.class) {
return (T) readBinaryString(precision, stringBufferAllocator::allocate);
} else {
return (T) readString(input, precision);
}
}
case String: {
return (T) readString();
if (typeHint == BinaryString.class) {
int len = readVarInt(input);
return (T) readBinaryString(len, stringBufferAllocator::allocate);
} else {
return (T) readString(input);
}
}
case Int8:
return (T) Byte.valueOf(readByte());
Expand Down Expand Up @@ -627,10 +644,10 @@
if (itemTypeColumn.isNullable() || itemTypeColumn.getDataType() == ClickHouseDataType.Variant) {
array = new ArrayValue(Object.class, len);
for (int i = 0; i < len; i++) {
array.set(i, readValue(itemTypeColumn));
array.set(i, readArrayItemValue(itemTypeColumn));
}
} else {
Object firstValue = readValue(itemTypeColumn);
Object firstValue = readArrayItemValue(itemTypeColumn);
Class<?> itemClass = firstValue.getClass();
if (firstValue instanceof Byte) {
itemClass = byte.class;
Expand All @@ -657,12 +674,17 @@
array = new ArrayValue(itemClass, len);
array.set(0, firstValue);
for (int i = 1; i < len; i++) {
array.set(i, readValue(itemTypeColumn));
array.set(i, readArrayItemValue(itemTypeColumn));
}
}
return array;
}

private Object readArrayItemValue(ClickHouseColumn itemTypeColumn) throws IOException {
Class<?> typeHint = itemTypeColumn.getDataType() == ClickHouseDataType.String ? String.class : null;
return readValue(itemTypeColumn, typeHint);
}

public void skipValue(ClickHouseColumn column) throws IOException {
readValue(column, null);
}
Expand Down Expand Up @@ -835,13 +857,18 @@
ClickHouseColumn valueType = column.getValueInfo();
LinkedHashMap<Object, Object> map = new LinkedHashMap<>(len);
for (int i = 0; i < len; i++) {
Object key = readValue(keyType);
Object value = readValue(valueType);
Object key = readMapKeyOrValue(keyType);
Object value = readMapKeyOrValue(valueType);
map.put(key, value);
}
return map;
}

private Object readMapKeyOrValue(ClickHouseColumn c) throws IOException {
Class<?> typeHint = c.getDataType() == ClickHouseDataType.String ? String.class : null;
return readValue(c, typeHint);
}

/**
* Reads a tuple.
* @param column - column information
Expand Down Expand Up @@ -1114,6 +1141,26 @@
return new String(dest, 0, len, StandardCharsets.UTF_8);
}

public BinaryString readBinaryString(int len, Function<Integer, ByteBuffer> bufferAllocator) throws IOException {

Check warning on line 1144 in client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this code to use the more specialised Functional Interface 'IntFunction<ByteBuffer>'

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ1znqHzfrTUmNjb20JI&open=AZ1znqHzfrTUmNjb20JI&pullRequest=2813
ByteBuffer buffer = bufferAllocator.apply(len);
if (buffer == null) {
throw new IOException("bufferAllocator returned `null`");
}
if (buffer.hasArray()) {
readNBytes(input, buffer.array(), 0, len);
} else {
int left = len;
while (left > 0) {
int chunkSize = Math.min(STRING_BUFF.length, left);
readNBytes(input, STRING_BUFF, 0, chunkSize);
buffer.put(STRING_BUFF, 0, chunkSize);
left -= chunkSize;
}
}

return new BinaryStringImpl(buffer);
}

/**
* Reads a decimal value from input stream.
* @param input - source of bytes
Expand All @@ -1122,6 +1169,10 @@
*/
public static String readString(InputStream input) throws IOException {
int len = readVarInt(input);
return readString(input, len);
}

public static String readString(InputStream input, int len) throws IOException {
if (len == 0) {
return "";
}
Expand All @@ -1140,6 +1191,10 @@
byte[] allocate(int size);
}

public interface StringBufferAllocator {
ByteBuffer allocate(int size);
}

/**
* Byte allocator that creates a new byte array for each request.
*/
Expand Down Expand Up @@ -1394,4 +1449,56 @@
}
return obj;
}

static final class BinaryStringImpl implements BinaryString {

private final ByteBuffer buffer;
private final int len;
private CharBuffer charBuffer = null;
private String strValue = null;

BinaryStringImpl(ByteBuffer buffer) {
this.buffer = buffer;
this.len = buffer.limit();
}

@Override
public String asString() {
if (strValue == null) {
if (buffer.hasArray()) {
strValue = new String(buffer.array(), StandardCharsets.UTF_8);
} else {
ensureCharBuffer();
strValue = charBuffer.toString();
}
}
return strValue;
}

@Override
public byte[] asBytes() {
if (buffer.hasArray()) {
return buffer.array();
}

throw new UnsupportedOperationException("String is stored out of the heap and has no byte buffer easily accessible");
}

@Override
public int length() {
return len;
}

private void ensureCharBuffer() {
if (charBuffer == null) {
buffer.rewind();
charBuffer = StandardCharsets.UTF_8.decode(buffer);
}
}

@Override
public int compareTo(String o) {
return asString().compareTo(o);
}
}
}
Loading
Loading