Skip to content

Commit a14dd5c

Browse files
authored
Merge pull request #2645 from ClickHouse/fix_http_compression
[V1, V2] Fix http compression
2 parents 98ab5cf + 6f6b2ca commit a14dd5c

File tree

12 files changed

+312
-62
lines changed

12 files changed

+312
-62
lines changed

clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public enum ClickHouseDataType implements SQLType {
130130
Dynamic(Object.class, true, true, false, 0, 0, 0, 0, 0, true, 0x2B),
131131
Time(LocalDateTime.class, true, false, false, 4, 9, 0, 0, 9, false, 0x32), // 0x33 for Time(Timezone)
132132
Time64(LocalDateTime.class, true, false, false, 8, 9, 0, 0, 0, false, 0x34), // 0x35 for Time64(P, Timezone)
133-
QBit(Double.class, true, true, false, 0, 0, 0, 0, 0, true, 0x36),
133+
QBit(Double.class, true, true, false, 0, 0, 0, 0, 0, false, 0x36),
134134
;
135135

136136
public static final List<ClickHouseDataType> ORDERED_BY_RANGE_INT_TYPES =

clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseColumnTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ public boolean isWidenUnsignedTypes() {
423423
if (type.isNested() || type == ClickHouseDataType.AggregateFunction
424424
|| type == ClickHouseDataType.SimpleAggregateFunction || type == ClickHouseDataType.Enum
425425
|| type == ClickHouseDataType.Nullable || type == ClickHouseDataType.BFloat16 ||
426-
type == ClickHouseDataType.Time || type == ClickHouseDataType.Time64) {
426+
type == ClickHouseDataType.Time || type == ClickHouseDataType.Time64 || type == ClickHouseDataType.QBit) {
427427
continue;
428428
}
429429

clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,11 +275,10 @@ protected static Map<String, String> createDefaultHeaders(ClickHouseConfig confi
275275
}
276276
// Also, you can use the ‘default_format’ URL parameter
277277
map.put("x-clickhouse-format", config.getFormat().name());
278-
if (config.isResponseCompressed()) {
278+
if (config.isResponseCompressed() && config.getResponseCompressAlgorithm() != ClickHouseCompression.LZ4) {
279279
map.put("accept-encoding", config.getResponseCompressAlgorithm().encoding());
280280
}
281-
if (config.isRequestCompressed()
282-
&& config.getRequestCompressAlgorithm() != ClickHouseCompression.LZ4) {
281+
if (config.isRequestCompressed() && config.getRequestCompressAlgorithm() != ClickHouseCompression.LZ4) {
283282
map.put("content-encoding", config.getRequestCompressAlgorithm().encoding());
284283
}
285284
return map;

client-v2/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,12 @@
141141
<version>5.19.0</version>
142142
<scope>test</scope>
143143
</dependency>
144+
<dependency>
145+
<groupId>com.github.luben</groupId>
146+
<artifactId>zstd-jni</artifactId>
147+
<version>1.5.7-6</version>
148+
<scope>test</scope>
149+
</dependency>
144150
</dependencies>
145151

146152
<build>
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package com.clickhouse.client.api.internal;
2+
3+
import org.apache.commons.compress.compressors.CompressorException;
4+
import org.apache.commons.compress.compressors.CompressorStreamFactory;
5+
import org.apache.hc.core5.function.Supplier;
6+
import org.apache.hc.core5.http.Header;
7+
import org.apache.hc.core5.http.HttpEntity;
8+
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.io.OutputStream;
12+
import java.util.List;
13+
import java.util.Set;
14+
15+
public class CompressedEntity implements HttpEntity {
16+
17+
private HttpEntity httpEntity;
18+
private final boolean isResponse;
19+
private final CompressorStreamFactory compressorStreamFactory;
20+
private final String compressionAlgo;
21+
22+
CompressedEntity(HttpEntity httpEntity, boolean isResponse, CompressorStreamFactory compressorStreamFactory) {
23+
this.httpEntity = httpEntity;
24+
this.isResponse = isResponse;
25+
this.compressorStreamFactory = compressorStreamFactory;
26+
this.compressionAlgo = getCompressionAlgoName(httpEntity.getContentEncoding());
27+
}
28+
29+
@Override
30+
public boolean isRepeatable() {
31+
return httpEntity.isRepeatable();
32+
}
33+
34+
@Override
35+
public InputStream getContent() throws IOException, UnsupportedOperationException {
36+
if (!isResponse) {
37+
throw new UnsupportedOperationException("Unsupported: getting compressed content of request");
38+
}
39+
40+
try {
41+
return compressorStreamFactory.createCompressorInputStream(compressionAlgo, httpEntity.getContent());
42+
} catch (CompressorException e) {
43+
throw new IOException("Failed to create decompressing input stream", e);
44+
}
45+
}
46+
47+
@Override
48+
public void writeTo(OutputStream outStream) throws IOException {
49+
if (isResponse) {
50+
// called by us to get compressed response
51+
throw new UnsupportedOperationException("Unsupported: writing compressed response to elsewhere");
52+
}
53+
54+
try {
55+
httpEntity.writeTo(compressorStreamFactory.createCompressorOutputStream(compressionAlgo, outStream));
56+
} catch (CompressorException e) {
57+
throw new IOException("Failed to create compressing output stream", e);
58+
}
59+
}
60+
61+
@Override
62+
public boolean isStreaming() {
63+
return httpEntity.isStreaming();
64+
}
65+
66+
@Override
67+
public Supplier<List<? extends Header>> getTrailers() {
68+
return httpEntity.getTrailers();
69+
}
70+
71+
@Override
72+
public void close() throws IOException {
73+
httpEntity.close();
74+
}
75+
76+
@Override
77+
public long getContentLength() {
78+
return httpEntity.getContentLength();
79+
}
80+
81+
@Override
82+
public String getContentType() {
83+
return httpEntity.getContentType();
84+
}
85+
86+
@Override
87+
public String getContentEncoding() {
88+
return httpEntity.getContentEncoding();
89+
}
90+
91+
@Override
92+
public boolean isChunked() {
93+
return httpEntity.isChunked();
94+
}
95+
96+
@Override
97+
public Set<String> getTrailerNames() {
98+
return httpEntity.getTrailerNames();
99+
}
100+
101+
private String getCompressionAlgoName(String contentEncoding) {
102+
String algo = contentEncoding;
103+
if (algo.equalsIgnoreCase("gzip")) {
104+
algo = CompressorStreamFactory.GZIP;
105+
} else if (algo.equalsIgnoreCase("lz4")) {
106+
algo = CompressorStreamFactory.LZ4_FRAMED;
107+
}
108+
return algo;
109+
}
110+
}

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 58 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.clickhouse.client.api.transport.Endpoint;
1717
import com.clickhouse.data.ClickHouseFormat;
1818
import net.jpountz.lz4.LZ4Factory;
19+
import org.apache.commons.compress.compressors.CompressorStreamFactory;
1920
import org.apache.hc.client5.http.ConnectTimeoutException;
2021
import org.apache.hc.client5.http.classic.methods.HttpPost;
2122
import org.apache.hc.client5.http.config.ConnectionConfig;
@@ -105,6 +106,8 @@ public class HttpAPIClientHelper {
105106

106107
private static final int ERROR_BODY_BUFFER_SIZE = 1024; // Error messages are usually small
107108

109+
private final String DEFAULT_HTTP_COMPRESSION_ALGO = "lz4";
110+
108111
private static final Pattern PATTERN_HEADER_VALUE_ASCII = Pattern.compile(
109112
"\\p{Graph}+(?:[ ]\\p{Graph}+)*");
110113

@@ -322,6 +325,8 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map<String,
322325
clientBuilder.setKeepAliveStrategy((response, context) -> TimeValue.ofMilliseconds(keepAliveTimeout));
323326
}
324327

328+
clientBuilder.disableContentCompression(); // will handle ourselves
329+
325330
return clientBuilder.build();
326331
}
327332

@@ -427,14 +432,12 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
427432
// req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding
428433
addHeaders(req, requestConfig);
429434

430-
boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig);
431-
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig);
432-
boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig);
433-
434435

435436
// setting entity. wrapping if compression is enabled
436-
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback),
437-
clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig));
437+
String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null;
438+
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback),
439+
lz4Factory,
440+
requestConfig));
438441

439442
HttpClientContext context = HttpClientContext.create();
440443
Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig);
@@ -448,8 +451,11 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
448451
ClassicHttpResponse httpResponse = null;
449452
try {
450453
httpResponse = httpClient.executeOpen(null, req, context);
451-
boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig);
452-
httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(), httpResponse.getCode(), serverCompression, useHttpCompression, lz4Factory, requestConfig));
454+
455+
httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(),
456+
httpResponse.getCode(),
457+
lz4Factory,
458+
requestConfig));
453459

454460
if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) {
455461
throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings.");
@@ -493,30 +499,30 @@ public static void closeQuietly(ClassicHttpResponse httpResponse) {
493499
private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8");
494500

495501
private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {
496-
addHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType());
502+
setHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType());
497503
if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) {
498-
addHeader(
504+
setHeader(
499505
req,
500506
ClickHouseHttpProto.HEADER_FORMAT,
501507
((ClickHouseFormat) requestConfig.get(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())).name());
502508
}
503509
if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) {
504-
addHeader(
510+
setHeader(
505511
req,
506512
ClickHouseHttpProto.HEADER_QUERY_ID,
507513
(String) requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()));
508514
}
509-
addHeader(
515+
setHeader(
510516
req,
511517
ClickHouseHttpProto.HEADER_DATABASE,
512518
ClientConfigProperties.DATABASE.getOrDefault(requestConfig));
513519

514520
if (ClientConfigProperties.SSL_AUTH.<Boolean>getOrDefault(requestConfig).booleanValue()) {
515-
addHeader(
521+
setHeader(
516522
req,
517523
ClickHouseHttpProto.HEADER_DB_USER,
518524
ClientConfigProperties.USER.getOrDefault(requestConfig));
519-
addHeader(
525+
setHeader(
520526
req,
521527
ClickHouseHttpProto.HEADER_SSL_CERT_AUTH,
522528
"on");
@@ -529,11 +535,11 @@ private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {
529535
"Basic " + Base64.getEncoder().encodeToString(
530536
(user + ":" + password).getBytes(StandardCharsets.UTF_8)));
531537
} else {
532-
addHeader(
538+
setHeader(
533539
req,
534540
ClickHouseHttpProto.HEADER_DB_USER,
535541
ClientConfigProperties.USER.getOrDefault(requestConfig));
536-
addHeader(
542+
setHeader(
537543
req,
538544
ClickHouseHttpProto.HEADER_DB_PASSWORD,
539545
ClientConfigProperties.PASSWORD.getOrDefault(requestConfig));
@@ -551,18 +557,19 @@ private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {
551557

552558
if (useHttpCompression) {
553559
if (serverCompression) {
554-
addHeader(req, HttpHeaders.ACCEPT_ENCODING, "lz4");
560+
setHeader(req, HttpHeaders.ACCEPT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO);
555561
}
562+
556563
if (clientCompression && !appCompressedData) {
557-
addHeader(req, HttpHeaders.CONTENT_ENCODING, "lz4");
564+
setHeader(req, HttpHeaders.CONTENT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO);
558565
}
559566
}
560567

561568
for (String key : requestConfig.keySet()) {
562569
if (key.startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) {
563570
Object val = requestConfig.get(key);
564571
if (val != null) {
565-
addHeader(
572+
setHeader(
566573
req,
567574
key.substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()),
568575
String.valueOf(val));
@@ -626,11 +633,19 @@ private void addQueryParams(URIBuilder req, Map<String, Object> requestConfig) {
626633
}
627634
}
628635

629-
private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompression, boolean useHttpCompression,
630-
boolean appControlledCompression, LZ4Factory lz4Factory, Map<String, Object> requestConfig) {
631-
LOG.debug("wrapRequestEntity: client compression: {}, http compression: {}", clientCompression, useHttpCompression);
636+
private HttpEntity wrapRequestEntity(HttpEntity httpEntity, LZ4Factory lz4Factory, Map<String, Object> requestConfig) {
632637

633-
if (clientCompression && !appControlledCompression) {
638+
boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig);
639+
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig);
640+
boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig);
641+
642+
LOG.debug("wrapRequestEntity: client compression: {}, http compression: {}, content encoding: {}",
643+
clientCompression, useHttpCompression, httpEntity.getContentEncoding());
644+
645+
if (httpEntity.getContentEncoding() != null && !appCompressedData) {
646+
// http header is set and data is not compressed
647+
return new CompressedEntity(httpEntity, false, CompressorStreamFactory.getSingleton());
648+
} else if (clientCompression && !appCompressedData) {
634649
int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig);
635650
return new LZ4Entity(httpEntity, useHttpCompression, false, true,
636651
buffSize, false, lz4Factory);
@@ -639,25 +654,22 @@ private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompre
639654
}
640655
}
641656

642-
private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, boolean serverCompression, boolean useHttpCompression, LZ4Factory lz4Factory, Map<String, Object> requestConfig) {
643-
LOG.debug("wrapResponseEntity: server compression: {}, http compression: {}", serverCompression, useHttpCompression);
644-
645-
if (serverCompression) {
646-
// Server doesn't compress certain errors like 403
647-
switch (httpStatus) {
648-
case HttpStatus.SC_OK:
649-
case HttpStatus.SC_CREATED:
650-
case HttpStatus.SC_ACCEPTED:
651-
case HttpStatus.SC_NO_CONTENT:
652-
case HttpStatus.SC_PARTIAL_CONTENT:
653-
case HttpStatus.SC_RESET_CONTENT:
654-
case HttpStatus.SC_NOT_MODIFIED:
655-
case HttpStatus.SC_BAD_REQUEST:
656-
case HttpStatus.SC_INTERNAL_SERVER_ERROR:
657-
case HttpStatus.SC_NOT_FOUND:
658-
int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig);
659-
return new LZ4Entity(httpEntity, useHttpCompression, true, false, buffSize, true, lz4Factory);
660-
}
657+
private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, LZ4Factory lz4Factory, Map<String, Object> requestConfig) {
658+
boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig);
659+
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig);
660+
661+
LOG.debug("wrapResponseEntity: server compression: {}, http compression: {}, content encoding: {}",
662+
serverCompression, useHttpCompression, httpEntity.getContentEncoding());
663+
664+
if (httpEntity.getContentEncoding() != null) {
665+
// http compressed response
666+
return new CompressedEntity(httpEntity, true, CompressorStreamFactory.getSingleton());
667+
}
668+
669+
// data compression
670+
if (serverCompression && !(httpStatus == HttpStatus.SC_FORBIDDEN || httpStatus == HttpStatus.SC_UNAUTHORIZED)) {
671+
int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig);
672+
return new LZ4Entity(httpEntity, useHttpCompression, true, false, buffSize, true, lz4Factory);
661673
}
662674

663675
return httpEntity;
@@ -803,8 +815,8 @@ public void close() {
803815
httpClient.close(CloseMode.IMMEDIATE);
804816
}
805817

806-
private static <T> void addHeader(HttpRequest req, String headerName,
807-
String value)
818+
private static <T> void setHeader(HttpRequest req, String headerName,
819+
String value)
808820
{
809821
if (value == null) {
810822
return;
@@ -814,10 +826,10 @@ private static <T> void addHeader(HttpRequest req, String headerName,
814826
return;
815827
}
816828
if (PATTERN_HEADER_VALUE_ASCII.matcher(value).matches()) {
817-
req.addHeader(headerName, value);
829+
req.setHeader(headerName, value);
818830
} else {
819831
try {
820-
req.addHeader(
832+
req.setHeader(
821833
headerName + "*",
822834
"UTF-8''" + URLEncoder.encode(value, StandardCharsets.UTF_8.name()));
823835
} catch (UnsupportedEncodingException e) {

client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
class LZ4Entity implements HttpEntity {
1717

18-
private HttpEntity httpEntity;
18+
private final HttpEntity httpEntity;
1919

2020
private final boolean useHttpCompression;
2121

0 commit comments

Comments
 (0)