From 5dc6e173e4ae2c48196e2c7341713d60871129a5 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 13 Sep 2024 17:39:58 -0300 Subject: [PATCH 1/6] maven generate minus the test/ folder --- http/get_compressed/java/server/pom.xml | 90 +++++++++++++++++++ .../server/src/main/java/com/example/App.java | 10 +++ .../src/test/java/com/example/AppTest.java | 19 ++++ 3 files changed, 119 insertions(+) create mode 100644 http/get_compressed/java/server/pom.xml create mode 100644 http/get_compressed/java/server/src/main/java/com/example/App.java create mode 100644 http/get_compressed/java/server/src/test/java/com/example/AppTest.java diff --git a/http/get_compressed/java/server/pom.xml b/http/get_compressed/java/server/pom.xml new file mode 100644 index 0000000..c50865c --- /dev/null +++ b/http/get_compressed/java/server/pom.xml @@ -0,0 +1,90 @@ + + + 4.0.0 + + com.example + arrow-http-server + 1.0-SNAPSHOT + + arrow-http-server + + http://www.example.com + + + UTF-8 + 17 + + + + + + org.junit + junit-bom + 5.11.0 + pom + import + + + + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-params + test + + + + + + + + + maven-clean-plugin + 3.4.0 + + + + maven-resources-plugin + 3.3.1 + + + maven-compiler-plugin + 3.13.0 + + + maven-surefire-plugin + 3.3.0 + + + maven-jar-plugin + 3.4.2 + + + maven-install-plugin + 3.1.2 + + + maven-deploy-plugin + 3.1.2 + + + + maven-site-plugin + 3.12.1 + + + maven-project-info-reports-plugin + 3.6.1 + + + + + diff --git a/http/get_compressed/java/server/src/main/java/com/example/App.java b/http/get_compressed/java/server/src/main/java/com/example/App.java new file mode 100644 index 0000000..daa6d08 --- /dev/null +++ b/http/get_compressed/java/server/src/main/java/com/example/App.java @@ -0,0 +1,10 @@ +package com.example; + +/** + * Hello world! + */ +public class App { + public static void main(String[] args) { + System.out.println("Hello World!"); + } +} diff --git a/http/get_compressed/java/server/src/test/java/com/example/AppTest.java b/http/get_compressed/java/server/src/test/java/com/example/AppTest.java new file mode 100644 index 0000000..e4db7b5 --- /dev/null +++ b/http/get_compressed/java/server/src/test/java/com/example/AppTest.java @@ -0,0 +1,19 @@ +package com.example; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +/** + * Unit test for simple App. + */ +public class AppTest { + + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() { + assertTrue(true); + } +} From 8682da9e73b151fabbaee4b7e4bb22f1c92ed5dd Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 13 Sep 2024 18:01:07 -0300 Subject: [PATCH 2/6] change pom.xml to use Arrow and Jetty dependencies --- http/get_compressed/java/server/pom.xml | 96 ++++++++++--------------- 1 file changed, 38 insertions(+), 58 deletions(-) diff --git a/http/get_compressed/java/server/pom.xml b/http/get_compressed/java/server/pom.xml index c50865c..a64094c 100644 --- a/http/get_compressed/java/server/pom.xml +++ b/http/get_compressed/java/server/pom.xml @@ -1,4 +1,14 @@ + 4.0.0 @@ -8,10 +18,11 @@ 1.0-SNAPSHOT arrow-http-server - - http://www.example.com + https://github.com/apache/arrow-experiments/tree/main/http/get_compressed + 17.0.0 + 11.0.24 UTF-8 17 @@ -19,9 +30,9 @@ - org.junit - junit-bom - 5.11.0 + org.apache.arrow + arrow-bom + ${arrow.version} pom import @@ -30,61 +41,30 @@ - org.junit.jupiter - junit-jupiter-api - test + org.apache.arrow + arrow-memory-core - + - org.junit.jupiter - junit-jupiter-params - test + org.apache.arrow + arrow-memory-netty + + + + org.apache.arrow + arrow-vector - - - - - - - maven-clean-plugin - 3.4.0 - - - - maven-resources-plugin - 3.3.1 - - - maven-compiler-plugin - 3.13.0 - - - maven-surefire-plugin - 3.3.0 - - - maven-jar-plugin - 3.4.2 - - - maven-install-plugin - 3.1.2 - - - maven-deploy-plugin - 3.1.2 - - - - maven-site-plugin - 3.12.1 - - - maven-project-info-reports-plugin - 3.6.1 - - - - + + org.eclipse.jetty + jetty-server + ${jetty.version} + + + + org.eclipse.jetty + jetty-servlet + ${jetty.version} + + From 8154a1152acb55377e801240783a8c354e9de8b2 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Tue, 17 Sep 2024 22:53:27 -0300 Subject: [PATCH 3/6] Almost complete Java compression example with full negotiation of codecs and codings --- .gitignore | 7 + http/get_compressed/java/server/pom.xml | 25 + .../server/src/main/java/com/example/App.java | 10 - .../java/com/example/ArrowHttpServer.java | 487 ++++++++++++++++++ .../src/test/java/com/example/AppTest.java | 19 - 5 files changed, 519 insertions(+), 29 deletions(-) delete mode 100644 http/get_compressed/java/server/src/main/java/com/example/App.java create mode 100644 http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java delete mode 100644 http/get_compressed/java/server/src/test/java/com/example/AppTest.java diff --git a/.gitignore b/.gitignore index d997483..ac500cd 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,10 @@ vendored build .vscode cufile.log + +# Java +.classpath +.mvn +.project +.settings +target diff --git a/http/get_compressed/java/server/pom.xml b/http/get_compressed/java/server/pom.xml index a64094c..94f8c0e 100644 --- a/http/get_compressed/java/server/pom.xml +++ b/http/get_compressed/java/server/pom.xml @@ -27,6 +27,19 @@ 17 + + + + org.codehaus.mojo + exec-maven-plugin + 3.4.1 + + com.example.ArrowHttpServer + + + + + @@ -36,6 +49,11 @@ pom import + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + @@ -50,6 +68,13 @@ arrow-memory-netty + + + org.apache.arrow + arrow-compression + ${arrow.version} + + org.apache.arrow arrow-vector diff --git a/http/get_compressed/java/server/src/main/java/com/example/App.java b/http/get_compressed/java/server/src/main/java/com/example/App.java deleted file mode 100644 index daa6d08..0000000 --- a/http/get_compressed/java/server/src/main/java/com/example/App.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.example; - -/** - * Hello world! - */ -public class App { - public static void main(String[] args) { - System.out.println("Hello World!"); - } -} diff --git a/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java b/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java new file mode 100644 index 0000000..6d01482 --- /dev/null +++ b/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.Random; + +import org.apache.arrow.compression.CommonsCompressionFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.CompressionUtil.CodecType; +import org.apache.arrow.vector.compression.NoCompressionCodec; +import org.apache.arrow.vector.compression.ZstdCompressionCodec; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.QuotedCSV; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; + +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +public class ArrowHttpServer { + static class DataGenerator { + static final String ascii_lowercase = "abcdefghijklmnopqrstuvwxyz"; + static final String ascii_uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + final Random random; + final BufferAllocator allocator; + + DataGenerator(Random random, BufferAllocator allocator) { + this.random = random; + this.allocator = allocator; + } + + public String randomString(String alphabet, int length) { + StringBuilder result = new StringBuilder(); + for (int i = 0; i < length; i++) { + int index = random.nextInt(alphabet.length()); + result.append(alphabet.charAt(index)); + } + return result.toString(); + } + + public String randomName(char initial) { + int length = random.nextInt(5) + 3; // from 3 to 7 + return initial + randomString(ascii_lowercase, length); + } + + public List exampleTickers(int numTickers) { + List tickers = new ArrayList<>(); + while (tickers.size() < numTickers) { + int length = random.nextInt(2) + 3; // from 3 to 4 + String randomTicker = randomString(ascii_uppercase, length); + if (!tickers.contains(randomTicker)) { + tickers.add(randomTicker); + } + } + return tickers; + } + + public Schema theSchema(boolean useDictionaryEncoding) { + ArrowType.Int int32 = new ArrowType.Int(32, true); + ArrowType.Int int64 = new ArrowType.Int(64, true); + ArrowType utf8 = new ArrowType.Utf8(); + DictionaryEncoding dictionary = useDictionaryEncoding ? new DictionaryEncoding(0, /* isOrdered= */false, int32) + : null; + FieldType tickerType = new FieldType(/* nullable= */false, utf8, dictionary, null); + FieldType priceType = FieldType.notNullable(int64); + FieldType volumeType = FieldType.notNullable(int64); + return new Schema( + List.of( + new Field("ticker", tickerType, null), + new Field("price", priceType, null), + new Field("volume", volumeType, null))); + } + + public VectorSchemaRoot exampleBatch(List tickers, Schema schema, int length) { + VarCharVector ticker = new VarCharVector("ticker", allocator); + BigIntVector price = new BigIntVector("price", allocator); + BigIntVector volume = new BigIntVector("volume", allocator); + ticker.allocateNew(length); + price.allocateNew(length); + volume.allocateNew(length); + Text randomTicker = new Text(); // reusable UTF-8 data holder + for (int i = 0; i < length; i++) { + randomTicker.set(tickers.get(random.nextInt(tickers.size()))); + ticker.setSafe(i, randomTicker); + price.set(i, (random.nextInt(1000) + 1) * 100); + volume.set(i, random.nextInt(10000) + 1); + } + ticker.setValueCount(length); + price.setValueCount(length); + volume.setValueCount(length); + ticker.close(); + price.close(); + volume.close(); + + VectorSchemaRoot root = new VectorSchemaRoot(schema, Arrays.asList(ticker, price, volume), length); + root.setRowCount(length); + return root; + } + + public List exampleBatches(List tickers) { + Schema schema = theSchema(USE_DICTIONARY_ENCODING); + int totalRecords = 42000000; + int batchLen = 6 * 1024; + // All the batches sent are random slices of the larger base batch. + VectorSchemaRoot baseBatch = exampleBatch(tickers, schema, 8 * batchLen); + List batches = new ArrayList<>(); + int records = 0; + while (records < totalRecords) { + int length = Math.min(batchLen, totalRecords - records); + int offset = random.nextInt(baseBatch.getRowCount() - length); + VectorSchemaRoot batch = baseBatch.slice(offset, length); + batches.add(batch); + records += length; + } + return batches; + // root.setRowCount(length); + // VectorUnloader unloader = new VectorUnloader(root); + // return unloader.getRecordBatch(); + } + } + + static class Handler extends AbstractHandler { + final static String ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream"; + + /** + * Pick the IPC stream codec according to the Accept header. + * + * This is used when deciding which codec to use for compression of IPC buffer + * streams. This is a feature of the Arrow IPC stream format and is different + * from the HTTP content-coding used to compress the entire HTTP response. + * + * This is how a client may specify the IPC buffer compression codecs it + * accepts: + * + * Accept: application/vnd.apache.arrow.ipc; codecs="zstd, lz4" + * + * @param request The HTTP request object that may contain an Accept header + * @param available The list of codecs that the server can provide in the + * order preferred by the server. Example: [ZSTD, + * LZ4_FRAME]. + * @param defaultCodec The codec to use if the client does not specify the + * ";codecs" parameter in the Accept header. Example: + * Optional.of(CodecType.NO_COMPRESSION). + * @return The codec that the server should use to compress the IPC buffer + * stream. Optional.empty() if the client does not accept any of the + * available codecs explicitly listed. ;codecs="" means no codecs are + * accepted. If the client does not specify the codecs parameter, then + * defaultCodec is returned. + */ + static Optional pickIpcCodec( + Request request, List available, Optional defaultCodec) { + var accept = request.getHttpFields().getField(HttpHeader.ACCEPT); + + boolean didSpecifyCodecs = false; + ArrayList acceptedCodecs = new ArrayList<>(); + if (accept != null) { + QuotedCSV mime_types = new QuotedCSV(accept.getValue()); + for (String mime_type : mime_types.getValues()) { + HashMap params = new HashMap<>(); + String media_range = HttpField.getValueParameters(mime_type, params); + boolean exactMatch = media_range.equals(ARROW_STREAM_FORMAT); + if (exactMatch || media_range.equals("*/*") || media_range.equals("application/*")) { + if (exactMatch) { + // Wildcards should only match when the format isn't specified + // explicitly. So when we find an exact match, we reset the + // accepted codecs state. + didSpecifyCodecs = false; + acceptedCodecs.clear(); + } + var codecs_str = params.get("codecs"); + if (codecs_str == null) { + continue; + } + didSpecifyCodecs = true; + QuotedCSV codecs = new QuotedCSV(codecs_str); + for (String codec : codecs.getValues()) { + if (codec.equals("zstd")) { + acceptedCodecs.add(CodecType.ZSTD); + } else if (codec.equals("lz4")) { + acceptedCodecs.add(CodecType.LZ4_FRAME); + } + } + if (exactMatch) { + // Wildcards shouldn't match after an exact match, + // so we break the loop here. + break; + } + } + } + } + for (CompressionUtil.CodecType codec : available) { + if (acceptedCodecs.contains(codec)) { + return Optional.of(codec); + } + } + return didSpecifyCodecs ? Optional.empty() : defaultCodec; + } + + /** + * Pick the content-coding according to the Accept-Encoding header. + * + * This is used when using HTTP response compression instead of IPC buffer + * compression. + * + * @param request The HTTP request object that may contain an Accept-Encoding + * header + * @param available The content-codings that the server can provide in the order + * preferred by the server. Example: ["zstd", "br", "gzip"] + * @return The content-coding that the server should use to compress the + * response. "identity" is returned if no acceptable content-coding is + * found in the list of available codings. null if the client does not + * accept any of the available content-codings and doesn't accept + * "identity" (uncompressed) either. In this case, a "406 Not + * Acceptable" response should be sent. + */ + static String pickCoding(Request request, List available) { + if (!available.contains("identity")) { + available = new ArrayList<>(available); + available.add("identity"); + } + HttpField acceptEncodingField = request.getHttpFields().getField(HttpHeader.ACCEPT_ENCODING); + if (acceptEncodingField == null) { + return "identity"; + } + for (String value : acceptEncodingField.getValues()) { + if (available.contains(value)) { + return value; + } + } + // TODO: handle Accept-Encoding header + // QuotedQualityCSV quality_csv = new QuotedQualityCSV(); + // quality_csv.addValue(acceptEncoding); + return "identity"; + } + + static class CompressionStrategy { + public CompressionCodec ipcCodec; + public String httpCoding; + + /** + * No compression at all. + */ + CompressionStrategy() { + ipcCodec = NoCompressionCodec.INSTANCE; + httpCoding = "identity"; + } + + /** + * IPC buffer compression without HTTP compression. + */ + CompressionStrategy(CodecType codecType) { + this.ipcCodec = CommonsCompressionFactory.INSTANCE.createCodec(codecType); + this.httpCoding = "identity"; + } + + /** + * HTTP compression without IPC buffer compression. + */ + CompressionStrategy(String coding) { + this.ipcCodec = NoCompressionCodec.INSTANCE; + this.httpCoding = coding; + } + + /** + * IPC buffer compression codec name to be used in HTTP headers. + */ + Optional ipcCodecName() { + switch (ipcCodec.getCodecType()) { + case ZSTD: + return Optional.of("zstd"); + case LZ4_FRAME: + return Optional.of("lz4"); + default: + throw new AssertionError("Unexpected codec type: " + ipcCodec.getCodecType()); + } + } + } + + /** + * Pick the compression strategy based on the Accept and Accept-Encoding + * headers. + * + * @param request The HTTP request object that may contain Accept + * and Accept-Encoding headers. + * @param availableIpcCodecs The codecs that the server can provide for IPC + * buffer compression. + * @param availableCodings The content-codings that the server can provide + * for HTTP response compression. + * @param defaultCompression The default compression strategy to use if the + * client does explicitly choose. + * @return The compression strategy to use or null. + * null means a "406 Not Acceptable" response should be sent. + */ + static CompressionStrategy pickCompression(Request request, List availableIpcCodecs, + List availableCodings, + CompressionStrategy defaultCompression) { + // Here we decide to fallback to HTTP compression when the client doesn't + // explicity opt-in for IPC buffer compression. So we pass defaultCodec + // as Optional.empty() to pickIpcCodec. + Optional defaultCodec = Optional.empty(); + Optional ipcCodecType = pickIpcCodec(request, availableIpcCodecs, defaultCodec); + if (ipcCodecType.isEmpty()) { + if (!request.getHttpFields().contains(HttpHeader.ACCEPT_ENCODING)) { + return defaultCompression; + } + String coding = pickCoding(request, availableCodings); + if (coding == null) { + return null; // 406 Not Acceptable + } + return new CompressionStrategy(coding); // HTTP compression only + } + return new CompressionStrategy(ipcCodecType.get()); // IPC buffer compression + } + + /** + * The list of IPC buffer compression codecs that this server can provide in the + * order preferred by the server. + */ + final List availableCodecs; + /** + * The list of content-codings for HTTP compression that this server can provide + * in the order preferred by the server. + */ + final List availableCodings; + + final BufferAllocator rootAllocator; + final Schema schema; + final List allBatches; + + Handler(BufferAllocator rootAllocator, Schema schema, List allBatches) { + availableCodecs = Arrays.asList(CodecType.LZ4_FRAME, CodecType.ZSTD); + availableCodings = Arrays.asList("zstd", "br", "gzip"); + this.rootAllocator = rootAllocator; + this.schema = schema; + this.allBatches = allBatches; + } + + List resolveBatches() { + return allBatches; + } + + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + // Create one allocator per request + BufferAllocator allocator = rootAllocator.newChildAllocator("request", 0, Long.MAX_VALUE); + + HttpVersion version = baseRequest.getHttpVersion(); + // If client's intent cannot be derived from the headers, return + // uncompressed data for HTTP/1.0 requests and compressed data for + // HTTP/1.1 requests with the safest compression format choice: "gzip". + CompressionStrategy defaultCompression = new CompressionStrategy( + (version == HttpVersion.HTTP_1_0 || !availableCodings.contains("gzip")) ? "identity" + : "gzip"); + CompressionStrategy compression = pickCompression(baseRequest, availableCodecs, availableCodings, + defaultCompression); + if (compression == null) { + this.replyNotAcceptable(baseRequest, response); + baseRequest.setHandled(true); + return; + } + System.out.println("Compression strategy: " + compression); + + // In a real application the data would be resolved from a database or + // another source like a file and error handling would be done here + // before the 200 OK response starts being sent to the client. + var batches = resolveBatches(); + + response.setStatus(HttpServletResponse.SC_OK); + //// set these headers if testing with a local browser-based client: + // response.setHeader("Access-Control-Allow-Origin", "http://localhost:8008"); + // response.setHeader("Access-Control-Allow-Methods", "GET"); + // response.setHeader("Access-Control-Allow-Headers", "Content-Type"); + var codecName = compression.ipcCodecName(); + if (codecName.isPresent()) { + String contentType = String.format("%s; codec=%s", ARROW_STREAM_FORMAT, codecName); + response.setContentType(contentType); + } else { + response.setContentType(ARROW_STREAM_FORMAT); + } + // Suggest a default filename in case this response is saved by the user> + response.setHeader("Content-Disposition", "attachment; filename=\"output.arrows\""); + if (version == HttpVersion.HTTP_1_0) { + response.setHeader(HttpHeader.CONNECTION.asString(), "close"); + } + response.flushBuffer(); + + // TODO: handle HTTP stream compression + + // When Jetty sees that no Content-Length is set, it will automatically + // enable chunked transfer encoding for HTTP/1.1 responses. + try ( + OutputStream stream = response.getOutputStream(); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + ArrowStreamWriter writer = new ArrowStreamWriter(root, /* DictionaryProvider= */null, stream);) { + VectorLoader loader = new VectorLoader(root); + writer.start(); + for (VectorSchemaRoot batchRoot : batches) { + VectorUnloader unloader = new VectorUnloader(batchRoot, true, compression.ipcCodec, true); + ArrowRecordBatch batch = unloader.getRecordBatch(); + loader.load(batch); + writer.writeBatch(); + stream.flush(); + } + writer.end(); + } + baseRequest.setHandled(true); + } + + void replyNotAcceptable(Request baseRequest, HttpServletResponse response) throws IOException { + response.setStatus(HttpServletResponse.SC_NOT_ACCEPTABLE); + response.setContentType("text/plain"); + response.setHeader("Connection", "close"); + PrintWriter writer = response.getWriter(); + writer.println("None of the available codings are accepted by this client."); + String accept = baseRequest.getHeader(HttpHeader.ACCEPT.asString()); + if (accept != null) { + writer.printf("`Accept` header was %s.\n", accept); + } + String acceptEncoding = baseRequest.getHeader(HttpHeader.ACCEPT_ENCODING.asString()); + if (acceptEncoding != null) { + writer.printf("`Accept-Encoding` header was %s.\n", acceptEncoding); + } + } + } + + static final BufferAllocator ROOT_ALLOCATOR = new RootAllocator(); + static final Boolean USE_DICTIONARY_ENCODING = false; + + public static void main(String[] args) throws Exception { + DataGenerator generator = new DataGenerator(new Random(), ROOT_ALLOCATOR); + + System.out.println("Generating example data..."); + Schema schema = generator.theSchema(USE_DICTIONARY_ENCODING); + List allTickers = generator.exampleTickers(60); + List allBatches = generator.exampleBatches(allTickers); + + Handler handler = new Handler(ROOT_ALLOCATOR, schema, allBatches); + + Server server = new Server(8008); + server.setHandler(handler); + server.start(); + System.out.println("Serving on localhost:8008..."); + server.join(); + } +} diff --git a/http/get_compressed/java/server/src/test/java/com/example/AppTest.java b/http/get_compressed/java/server/src/test/java/com/example/AppTest.java deleted file mode 100644 index e4db7b5..0000000 --- a/http/get_compressed/java/server/src/test/java/com/example/AppTest.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.example; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -import org.junit.jupiter.api.Test; - -/** - * Unit test for simple App. - */ -public class AppTest { - - /** - * Rigorous Test :-) - */ - @Test - public void shouldAnswerWithTrue() { - assertTrue(true); - } -} From ae23afed6c80747a689f7ebbf388f8e4deae79b0 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Tue, 17 Sep 2024 22:59:10 -0300 Subject: [PATCH 4/6] handle NO_COMPRESSION --- .../java/server/src/main/java/com/example/ArrowHttpServer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java b/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java index 6d01482..c0cadea 100644 --- a/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java +++ b/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java @@ -312,6 +312,8 @@ Optional ipcCodecName() { return Optional.of("zstd"); case LZ4_FRAME: return Optional.of("lz4"); + case NO_COMPRESSION: + return Optional.empty(); default: throw new AssertionError("Unexpected codec type: " + ipcCodec.getCodecType()); } From 1e9edf2dd228b766d6cac41d43a471ca28aa6287 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Tue, 17 Sep 2024 23:08:22 -0300 Subject: [PATCH 5/6] add CompressionStrategy.toString() --- .../java/com/example/ArrowHttpServer.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java b/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java index c0cadea..be7ea80 100644 --- a/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java +++ b/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java @@ -189,7 +189,7 @@ static class Handler extends AbstractHandler { * accepted. If the client does not specify the codecs parameter, then * defaultCodec is returned. */ - static Optional pickIpcCodec( + static public Optional pickIpcCodec( Request request, List available, Optional defaultCodec) { var accept = request.getHttpFields().getField(HttpHeader.ACCEPT); @@ -255,7 +255,7 @@ static Optional pickIpcCodec( * "identity" (uncompressed) either. In this case, a "406 Not * Acceptable" response should be sent. */ - static String pickCoding(Request request, List available) { + public static String pickCoding(Request request, List available) { if (!available.contains("identity")) { available = new ArrayList<>(available); available.add("identity"); @@ -282,7 +282,7 @@ static class CompressionStrategy { /** * No compression at all. */ - CompressionStrategy() { + public CompressionStrategy() { ipcCodec = NoCompressionCodec.INSTANCE; httpCoding = "identity"; } @@ -290,7 +290,7 @@ static class CompressionStrategy { /** * IPC buffer compression without HTTP compression. */ - CompressionStrategy(CodecType codecType) { + public CompressionStrategy(CodecType codecType) { this.ipcCodec = CommonsCompressionFactory.INSTANCE.createCodec(codecType); this.httpCoding = "identity"; } @@ -298,7 +298,7 @@ static class CompressionStrategy { /** * HTTP compression without IPC buffer compression. */ - CompressionStrategy(String coding) { + public CompressionStrategy(String coding) { this.ipcCodec = NoCompressionCodec.INSTANCE; this.httpCoding = coding; } @@ -306,7 +306,7 @@ static class CompressionStrategy { /** * IPC buffer compression codec name to be used in HTTP headers. */ - Optional ipcCodecName() { + public Optional ipcCodecName() { switch (ipcCodec.getCodecType()) { case ZSTD: return Optional.of("zstd"); @@ -318,6 +318,11 @@ Optional ipcCodecName() { throw new AssertionError("Unexpected codec type: " + ipcCodec.getCodecType()); } } + + @Override + public String toString() { + return ipcCodecName().map(name -> "identity+" + name).orElse(httpCoding); + } } /** @@ -402,7 +407,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques baseRequest.setHandled(true); return; } - System.out.println("Compression strategy: " + compression); + System.out.printf("Compression strategy: %s\n", compression); // In a real application the data would be resolved from a database or // another source like a file and error handling would be done here From 1267ee5776228c6464210d039a4e53a6163fc2c5 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Tue, 17 Sep 2024 23:10:49 -0300 Subject: [PATCH 6/6] mark class fields final --- .../server/src/main/java/com/example/ArrowHttpServer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java b/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java index be7ea80..e9beed1 100644 --- a/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java +++ b/http/get_compressed/java/server/src/main/java/com/example/ArrowHttpServer.java @@ -276,8 +276,8 @@ public static String pickCoding(Request request, List available) { } static class CompressionStrategy { - public CompressionCodec ipcCodec; - public String httpCoding; + final public CompressionCodec ipcCodec; + final public String httpCoding; /** * No compression at all.