-
Notifications
You must be signed in to change notification settings - Fork 44
* Refactor to add GZIP Compression #449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/IngestV2
Are you sure you want to change the base?
Conversation
ag-ramachandran
commented
Dec 17, 2025
- Add a simple streaming sample
- Fix issue with compression on Streaming data ingest
* Add a simple streaming sample * Fix issue with compression on Streaming data ingest
Test Results529 tests 520 ✅ 3m 3s ⏱️ Results for commit c5b04b6. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the ingest-v2 API to add GZIP compression support for streaming data ingestion and simplifies the API by embedding database and table information into IngestRequestProperties. The changes include a new Java sample demonstrating streaming ingestion patterns and comprehensive test coverage for compressed file ingestion.
Key changes:
- API Refactoring: Removed explicit database/table parameters from ingest methods; these are now stored within IngestRequestProperties via IngestRequestPropertiesBuilder
- GZIP Compression: Added automatic compression for non-binary formats during upload, with proper Content-Encoding header support for streaming ingestion
- Java Interoperability: Added Java-friendly async methods returning CompletableFuture for all ingest clients
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| samples/src/main/resources/create-table.kql | Adds KQL script for table creation and ingestion mapping setup for the sample dataset |
| samples/src/main/java/ingestv2/StreamingIngestV2.java | New Java sample demonstrating streaming ingestion with various formats and compression types |
| samples/pom.xml | Adds dependency on kusto-ingest-v2 module for sample code |
| ingest-v2/pom.xml | Updates artifact name and adds kotlinx-coroutines-jdk8 dependency for Java interop |
| ingest-v2/src/main/resources/openapi.yaml | Corrects Host header example value in API specification |
| ingest-v2/src/main/kotlin/.../ContainerUploaderBase.kt | Implements automatic GZIP compression for uploadable sources |
| ingest-v2/src/main/kotlin/.../StreamingIngestionErrorResponse.kt | Adds error response model for streaming ingestion failures |
| ingest-v2/src/main/kotlin/.../IngestRequestPropertiesBuilder.kt | Refactors builder to require database/table via factory method; adds extension properties for accessing these values |
| ingest-v2/src/main/kotlin/.../StreamingIngestClient.kt | Updates API to extract database/table from properties; adds Java interop methods and Content-Encoding support |
| ingest-v2/src/main/kotlin/.../QueuedIngestClient.kt | Refactors to use new API pattern with database/table in properties; adds Java interop methods |
| ingest-v2/src/main/kotlin/.../ManagedStreamingIngestClient.kt | Updates to use new API pattern; adds Java interop methods |
| ingest-v2/src/main/kotlin/.../IngestClient.kt | Updates interface signatures to remove explicit database/table parameters |
| ingest-v2/src/test/kotlin/.../StreamingIngestClientTest.kt | Adds test for compressed and uncompressed file ingestion |
| ingest-v2/src/test/kotlin/.../QueuedIngestClientTest.kt | Updates tests to use new builder pattern |
| ingest-v2/src/test/kotlin/.../ManagedStreamingIngestClientTest.kt | Updates tests to use new builder pattern |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /** | ||
| * Sets the data format for ingestion. | ||
| * | ||
| * @param value The data format (e.g., Format.json, Format.csv) | ||
| * @deprecated Format is automatically extracted from the IngestionSource. This method is no longer needed. | ||
| */ | ||
| @Deprecated("Format is automatically extracted from the IngestionSource. This method is no longer needed.") | ||
| fun withFormat(value: com.microsoft.azure.kusto.ingest.v2.models.Format) = | ||
| apply { | ||
| this.format = value | ||
| } |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The withFormat method is deprecated but the deprecation message states "Format is automatically extracted from the IngestionSource." However, if users call this deprecated method and explicitly set a format, it will be silently overridden by withFormatFromSource() during ingestion. This could lead to confusion where users set a format but it gets ignored. Consider adding a warning log or validation to detect when an explicitly-set format conflicts with the source format.
| // Store database and table in the HashMap for retrieval | ||
| properties.put(DATABASE_KEY, database) | ||
| properties.put(TABLE_KEY, table) |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The database and table values are stored in the HashMap using internal keys (_database and _table) which could potentially conflict with user-provided properties if they happen to use the same keys. Consider using a more unique prefix or namespace for internal keys to prevent potential collisions, such as "__internal_database" or using a separate internal properties map.
| @JvmName("ingestAsync") | ||
| fun ingestAsyncJava( | ||
| sources: List<IngestionSource>, | ||
| ingestRequestProperties: IngestRequestProperties, | ||
| ): CompletableFuture<ExtendedIngestResponse> = | ||
| CoroutineScope(Dispatchers.IO).future { | ||
| ingestAsyncInternal(sources, ingestRequestProperties) | ||
| } | ||
|
|
||
| /** | ||
| * Ingests data from a single source with the given properties. This is the | ||
| * Java-friendly version that returns a CompletableFuture. | ||
| */ | ||
| @JvmName("ingestAsync") | ||
| fun ingestAsyncJava( | ||
| source: IngestionSource, | ||
| ingestRequestProperties: IngestRequestProperties, | ||
| ): CompletableFuture<ExtendedIngestResponse> = | ||
| CoroutineScope(Dispatchers.IO).future { | ||
| ingestAsyncSingleInternal(source, ingestRequestProperties) | ||
| } |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both ingestAsyncJava methods (for single source at line 115 and multiple sources at line 102) are annotated with the same @JvmName("ingestAsync"). This will cause a compilation error in Kotlin because JVM method name conflicts are not allowed. These methods need different JVM names, such as "ingestAsync" for single source and "ingestAsyncBatch" for multiple sources.
| /** | ||
| * Extension property to extract the database name from IngestRequestProperties. | ||
| */ | ||
| val IngestRequestProperties.database: String | ||
| get() = | ||
| this.get(IngestRequestPropertiesBuilder.DATABASE_KEY) as? String | ||
| ?: throw IllegalStateException( | ||
| "Database not set in IngestRequestProperties", | ||
| ) | ||
|
|
||
| /** | ||
| * Extension property to extract the table name from IngestRequestProperties. | ||
| */ | ||
| val IngestRequestProperties.table: String | ||
| get() = | ||
| this.get(IngestRequestPropertiesBuilder.TABLE_KEY) as? String | ||
| ?: throw IllegalStateException( | ||
| "Table not set in IngestRequestProperties", | ||
| ) |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extension properties 'database' and 'table' throw IllegalStateException if the keys are not found. However, if old code creates IngestRequestProperties without using the builder (e.g., through direct constructor), these extension properties will fail at runtime. Consider providing a more descriptive error message that guides users to use IngestRequestPropertiesBuilder.create() instead, or handle backward compatibility more gracefully.
samples/pom.xml
Outdated
| </dependency> | ||
| </dependencies> | ||
|
|
||
| </project> |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The trailing blank line at the end of the file has been removed. While this is a minor formatting change, it's considered best practice to maintain a trailing newline in text files according to POSIX standards. This could cause unnecessary noise in future diffs.
| val compression = | ||
| when (compressionType) { | ||
| "NONE" -> CompressionType.NONE | ||
| "GZIP" -> CompressionType.GZIP | ||
| "ZIP" -> CompressionType.ZIP | ||
| else -> | ||
| throw IllegalArgumentException( | ||
| "Unknown compression type: $compressionType", | ||
| ) | ||
| } |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case includes a "ZIP" compression type in the when expression (line 158), but the @CsvSource annotation only provides test cases for "NONE" and "GZIP". The ZIP case will never be executed, making it dead code. Either add a test case for ZIP compression or remove the unused branch.
| val newProperties = this.copy(format = source.format) | ||
| // Copy over HashMap entries (database and table) | ||
| this.forEach { (key, value) -> newProperties[key] = value } | ||
| return newProperties | ||
| } | ||
|
|
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The withFormatFromSource method copies HashMap entries by iterating and setting them individually. However, this doesn't preserve the original HashMap type or capacity settings, and it could be inefficient for large maps. Consider using putAll() or constructing a new properties instance that properly clones the internal state.
| val newProperties = this.copy(format = source.format) | |
| // Copy over HashMap entries (database and table) | |
| this.forEach { (key, value) -> newProperties[key] = value } | |
| return newProperties | |
| } | |
| val newProperties = this.copy(format = source.format).apply { | |
| // Copy over HashMap entries (database and table) | |
| putAll(this@withFormatFromSource) | |
| } | |
| return newProperties | |
| } |
| * Note: The format field will be automatically extracted from the IngestionSource | ||
| * by the client implementation during ingestion using withFormatFromSource(). | ||
| * A placeholder value (Format.csv) is used during build and will be overridden | ||
| * with the actual source format. | ||
| * |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation states "A placeholder value (Format.csv) is used during build and will be overridden with the actual source format" but doesn't explain what happens if users never call an ingest method (e.g., they just build the properties and serialize them). This could lead to confusion about why csv appears in serialized properties when no format was specified.
| * Note: The format field will be automatically extracted from the IngestionSource | |
| * by the client implementation during ingestion using withFormatFromSource(). | |
| * A placeholder value (Format.csv) is used during build and will be overridden | |
| * with the actual source format. | |
| * | |
| * Note: If {@code format} was not explicitly set via [withFormat], a placeholder | |
| * value ([com.microsoft.azure.kusto.ingest.v2.models.Format.csv]) is used here. | |
| * When the properties are passed to an ingest API, the client implementation is | |
| * expected to call {@code withFormatFromSource()} and override this placeholder | |
| * with the actual source format. | |
| * | |
| * If you build and serialize these properties without going through an ingest | |
| * method (i.e., {@code withFormatFromSource()} is never called), the placeholder | |
| * {@code csv} format will remain in the serialized output. In that scenario, | |
| * explicitly set the desired format via [withFormat] before calling [build]. | |
| * |
| InputStream csvInputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(csvData).array()); | ||
|
|
||
| StreamSource csvStreamSource = new StreamSource( | ||
| csvInputStream, CompressionType.NONE, Format.csv, | ||
| UUID.randomUUID(), "csv-test-src", false); | ||
|
|
||
| IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder | ||
| .create(database, table) | ||
| .withEnableTracking(true) | ||
| .build(); | ||
|
|
||
| System.out.println("Ingesting CSV data from string..."); | ||
| ExtendedIngestResponse ingestResponse = streamingIngestClient.ingestAsync(csvStreamSource, csvProperties).get(); | ||
| System.out.println("CSV ingestion completed. Operation ID: " + ingestResponse.getIngestResponse().getIngestionOperationId()); |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stream is not closed after ingestion completes. This could lead to resource leaks. The csvInputStream should be closed, or the code should use try-with-resources pattern.
| private suspend fun compressStream( | ||
| inputStream: InputStream, | ||
| ): ByteArrayInputStream = | ||
| withContext(Dispatchers.IO) { | ||
| val byteArrayOutputStream = ByteArrayOutputStream() | ||
| GZIPOutputStream(byteArrayOutputStream).use { gzipStream -> | ||
| inputStream.copyTo(gzipStream) | ||
| } | ||
| ByteArrayInputStream(byteArrayOutputStream.toByteArray()) | ||
| } |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compressStream method loads the entire stream into memory, which could cause OutOfMemoryError for large files. While there's a size check before this point, the check uses the original uncompressed size. If the max size limit is close to available heap memory, compression could still cause memory issues. Consider documenting this limitation or adding additional memory safety checks.