-
Notifications
You must be signed in to change notification settings - Fork 261
feat(java): Add TCP transport support for Flink connector #2560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(java): Add TCP transport support for Flink connector #2560
Conversation
|
@GaneshPatil7517 Hey, I haven't reviewed the code fully yet, but seems like the redshift connector are unrelated to this change. Please clean the changes and let me know once this is ready to be reviewed. |
bee34ee to
964d07c
Compare
|
Hi @mmodzelewski, thanks for the quick review You're right - the Redshift connector commits were accidentally included. I've cleaned up the branch and force-pushed. The PR now contains only the TCP transport support changes: Single commit: Changes summary:
All 137 tests pass locally. Ready for review! |
@GaneshPatil7517 your PR title is invalid, see https://github.com/apache/iggy/actions/runs/20995847599/job/60353117214?pr=2560 use java or connectors Also fix other lints. Dont ping maintainers if CI is not passing for your PR. |
ok ill fix it.... |
- Add TransportType enum with HTTP (default) and TCP options - Create IggyClientFactory for transport-agnostic client creation - Update IggyConnectionConfig with transport type and port settings - Modify IggySink/IggySinkWriter to use IggyBaseClient interface - Add comprehensive unit tests for new components - Update README with TCP configuration examples - Fix checkstyle NPath complexity in IggySinkWriter - Fix markdown table formatting in README Resolves apache#2484
f3e3991 to
6b46df4
Compare
|
|
||
| | Transport | Default Port | Use Case | | ||
| | -------------------- | ------------ | ------------------------------------------------------------ | | ||
| | **HTTP** (default) | 3000 | Development, debugging, environments with HTTP proxies | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we should ask users to leverage production settings as much as possible? Thus making TCP the default makes more sense?
| * | ||
| * @return the port for the current transport type | ||
| */ | ||
| public int getEffectivePort() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see it's used anywhere except the test, can we remove it or restrict its access to tests only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds TCP transport protocol support to the Apache Iggy Flink connector, which previously only supported HTTP transport. The change provides users with a more performant option for high-throughput streaming workloads.
Changes:
- Introduced
TransportTypeenum to define HTTP and TCP transport options with default ports - Created
IggyClientFactoryto abstract client creation based on transport configuration - Extended
IggyConnectionConfigwith transport type, HTTP port, and TCP port configuration fields
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| TransportType.java | Defines transport protocol enum with HTTP (port 3000) and TCP (port 8090) options |
| IggyClientFactory.java | Factory class that creates appropriate client (HTTP/TCP) based on configuration |
| IggyConnectionConfig.java | Extended with transport type and port configuration options, including validation |
| IggySink.java | Updated to use IggyClientFactory instead of inline HTTP client creation |
| IggySinkWriter.java | Modified to use IggyBaseClient interface for transport flexibility and refactored validation logic |
| README.md | Added documentation for TCP transport configuration and usage examples |
| TransportTypeTest.java | Unit tests for transport type enum parsing and validation |
| IggyClientFactoryTest.java | Unit tests for host extraction from various server address formats |
| IggyConnectionConfigTest.java | Extended tests to cover new transport configuration options |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Close TCP client if applicable (TCP clients need explicit close) | ||
| if (iggyClient instanceof IggyTcpClient) { | ||
| log.debug("Closing TCP client connection"); | ||
| // TCP client connections are managed internally and closed when JVM exits | ||
| // or when connection pool is explicitly closed | ||
| } | ||
| // HTTP client doesn't need explicit close - connections managed by Java HttpClient pool |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The close() method checks if the client is a TCP client but doesn't actually call any close method on it. If TCP clients don't need explicit cleanup, this instanceof check and the empty block add unnecessary complexity. Consider removing the instanceof check entirely, or if TCP clients will need cleanup in the future, add a TODO comment explaining what needs to be done.
| // Close TCP client if applicable (TCP clients need explicit close) | |
| if (iggyClient instanceof IggyTcpClient) { | |
| log.debug("Closing TCP client connection"); | |
| // TCP client connections are managed internally and closed when JVM exits | |
| // or when connection pool is explicitly closed | |
| } | |
| // HTTP client doesn't need explicit close - connections managed by Java HttpClient pool | |
| // Underlying Iggy clients manage their own connections; no explicit close required here. |
|
|
||
| private static void validatePositiveDuration(Duration value, String name) { | ||
| if (value == null || value.isNegative()) { | ||
| throw new IllegalArgumentException(name + " must be positive"); |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation allows zero duration (Duration.ZERO), but the error message says 'must be positive'. A zero flush interval would mean immediate flushing which may not be intended. Either allow zero explicitly by adjusting the error message to 'must be non-negative', or check for zero and negative values with value.isNegative() || value.isZero() to enforce truly positive durations.
| throw new IllegalArgumentException(name + " must be positive"); | |
| throw new IllegalArgumentException(name + " must be non-negative"); |
| if (address.contains(":")) { | ||
| String[] parts = address.split(":"); | ||
| if (parts.length >= 1 && !parts[0].isBlank()) { | ||
| return parts[0]; | ||
| } | ||
| throw new IllegalArgumentException("Cannot extract host from address: " + serverAddress); | ||
| } |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The host extraction logic for addresses with colons can fail for IPv6 addresses (e.g., '[::1]:8090' or '2001:db8::1'). IPv6 addresses contain multiple colons which would cause incorrect splitting. Consider handling IPv6 addresses explicitly, either by checking for brackets or by using more sophisticated URI parsing for all cases.
…nstanceof check, fix error message, handle IPv6 addresses
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
This pull request was automatically closed because it has been inactive for 14 days. Feel free to reopen it if you'd like to continue working on it. |

Summary
Adds TCP transport protocol support to the Apache Iggy Flink connector, addressing the limitation where the sink only supported HTTP transport.
Resolves #2484
Changes
New Files
TransportType.java- Enum defining HTTP (port 3000) and TCP (port 8090) transport protocolsIggyClientFactory.java- Factory class for creating HTTP or TCP clients based on configurationModified Files
IggyConnectionConfig.java- AddedtransportType,httpPort, andtcpPortconfiguration optionsIggySink.java- Updated to useIggyClientFactoryinstead of hardcoded HTTP clientIggySinkWriter.java- Changed to useIggyBaseClientinterface, refactored validationREADME.md- Added documentation for TCP transport configurationTests
TransportTypeTest.java- Unit tests for transport type enumIggyClientFactoryTest.java- Unit tests for client factoryIggyConnectionConfigTest.java- Extended with transport configuration testsUsage