-
Notifications
You must be signed in to change notification settings - Fork 6
refactor(inkless): consolidate executor management in SharedState #468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2691b9d to
1764bf3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors executor management by consolidating all ExecutorService creation and lifecycle management into SharedState, eliminating the Single Responsibility Principle violations where Reader, Writer, and FileCommitter each managed their own thread pools. The refactoring also removes the unnecessary CacheStoreJob and moves cache updates inline within FileCommitter's commit completion callback.
Key Changes:
- Centralized executor creation in SharedState for fetch operations (metadata/data), file operations (upload/commit), and commit ticking
- Removed CacheStoreJob class and its dedicated executor; cache updates now happen inline after successful commits
- Updated Reader, Writer, and FileCommitter constructors to accept externally-managed executors instead of creating their own
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| SharedState.java | Creates and manages all broker-scoped executors; added shutdown logic and thread pool monitoring |
| FileCommitter.java | Removed cache store executor parameter; added inline cache update logic and single-thread validation |
| Writer.java | Removed internal scheduler creation; now accepts externally-managed ScheduledExecutorService |
| Reader.java | Removed internal executor creation; now accepts externally-managed metadata and data executors |
| CacheStoreJob.java | Deleted; functionality moved inline to FileCommitter's commit completion handler |
| AppendHandler.java | Updated to wire FileCommitter with executors from SharedState |
| FetchHandler.java | Updated to pass executors from SharedState to Reader constructor |
| WriterIntegrationTest.java | Manually creates and shuts down executors since SharedState is not used |
| WriterMockedTest.java | Updated assertions to verify executors are NOT shutdown by Writer.close() |
| FileCommitterTest.java | Added tests for single-threaded executor validation; removed cache store executor |
| ReaderTest.java | Updated assertions to verify executors are NOT shutdown by Reader.close() |
| WriterPropertyTest.java | Removed CacheStoreHandler since cache updates are now inline |
| AppendHandlerTest.java | Added helper method to create SharedState via initialize() |
| DeleteRecordsInterceptorTest.java | Added helper method to create SharedState via initialize() |
| FileMergerMockedTest.java | Updated to use SharedState.initialize() with required config parameters |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java
Show resolved
Hide resolved
d20fe7c to
95608a8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
cd7fb3e to
190a9e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
f96aa70 to
f9a30b2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/FileCommitterTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java
Outdated
Show resolved
Hide resolved
52a1699 to
bed54fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java:193
- The scheduledTick future should be cancelled in the close() method before closing resources. If a tick is scheduled but hasn't executed yet, it may fire after the Writer is closed, potentially causing issues. Consider cancelling the scheduledTick before setting closed to true.
public void close() throws IOException {
lock.lock();
try {
if (closed) {
return;
}
closed = true;
// Rotate file before closing the uploader so the file gets into the queue first.
rotateFile(true);
fileCommitter.close();
writerMetrics.close();
} finally {
lock.unlock();
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
9dc496e to
a729102
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java
Outdated
Show resolved
Hide resolved
a729102 to
8cc9fb4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (1)
storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java:200
- The SharedState instance created via getSharedState() is not closed in this test. Since SharedState.initialize() creates several executor services that need to be shut down, failing to close SharedState will leak these resources. The test should use try-with-resources to ensure SharedState is properly closed.
public void close() throws IOException {
final AppendHandler interceptor = new AppendHandler(getSharedState(), writer);
interceptor.close();
verify(writer).close();
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterIntegrationTest.java
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/produce/FileCommitter.java
Show resolved
Hide resolved
Moves all ExecutorService creation and lifecycle management from Reader, Writer, and FileCommitter to SharedState, establishing a centralized pattern for managing broker-scoped thread pools. This refactoring also eliminates an unnecessary executor (cache store) and fixes a resource leak bug. Previously, Reader, Writer, and FileCommitter each created and managed their own thread pools, violating the Single Responsibility Principle: - **Reader**: Created 2 fixed thread pools for fetch operations - **Writer**: Created 1 scheduled thread pool for commit ticking - **FileCommitter**: Created 3 thread pools (upload, commit, cache store) Instead of having the CacheStoreThreadPool, reuse the existing upload thread pool for caching.
8cc9fb4 to
3ae78ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| this.storageMetrics = new Metrics( | ||
| new MetricConfig(), List.of(reporter), Time.SYSTEM, | ||
| new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) | ||
| ); | ||
| this.fetchStorage = config.storage(storageMetrics); | ||
| this.produceStorage = config.storage(storageMetrics); | ||
| this.backgroundStorage = config.storage(storageMetrics); | ||
| } | ||
|
|
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storage backend instances are created in the constructor but are not cleaned up in the exception handling path of the initialize method. If ThreadPoolMonitor creation fails after line 133-135 execute but before the SharedState constructor completes, these storage backends will leak. The exception handling in initialize() only cleans up executors, monitors, and caches, but does not close the storage backends created in the constructor.
| this.storageMetrics = new Metrics( | |
| new MetricConfig(), List.of(reporter), Time.SYSTEM, | |
| new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) | |
| ); | |
| this.fetchStorage = config.storage(storageMetrics); | |
| this.produceStorage = config.storage(storageMetrics); | |
| this.backgroundStorage = config.storage(storageMetrics); | |
| } | |
| Metrics storageMetricsLocal = null; | |
| StorageBackend fetchStorageLocal = null; | |
| StorageBackend produceStorageLocal = null; | |
| StorageBackend backgroundStorageLocal = null; | |
| try { | |
| storageMetricsLocal = new Metrics( | |
| new MetricConfig(), List.of(reporter), Time.SYSTEM, | |
| new KafkaMetricsContext(STORAGE_METRIC_CONTEXT) | |
| ); | |
| fetchStorageLocal = config.storage(storageMetricsLocal); | |
| produceStorageLocal = config.storage(storageMetricsLocal); | |
| backgroundStorageLocal = config.storage(storageMetricsLocal); | |
| } catch (final RuntimeException | final Error e) { | |
| closeStorageBackendQuietly(backgroundStorageLocal); | |
| closeStorageBackendQuietly(produceStorageLocal); | |
| closeStorageBackendQuietly(fetchStorageLocal); | |
| if (storageMetricsLocal != null) { | |
| try { | |
| storageMetricsLocal.close(); | |
| } catch (final Exception closeException) { | |
| LOGGER.warn("Failed to close storage metrics after constructor failure", closeException); | |
| } | |
| } | |
| throw e; | |
| } | |
| this.storageMetrics = storageMetricsLocal; | |
| this.fetchStorage = fetchStorageLocal; | |
| this.produceStorage = produceStorageLocal; | |
| this.backgroundStorage = backgroundStorageLocal; | |
| } | |
| private static void closeStorageBackendQuietly(final StorageBackend backend) { | |
| if (backend == null) { | |
| return; | |
| } | |
| try { | |
| backend.close(); | |
| } catch (final IOException e) { | |
| LOGGER.warn("Failed to close storage backend after constructor failure", e); | |
| } | |
| } |
...ge/inkless/src/test/java/io/aiven/inkless/storage_backend/in_memory/InMemoryStorageTest.java
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java
Show resolved
Hide resolved
| // Caching is not strictly IO-bound, but may be slowed by memory pressure. | ||
| commitFuture.thenRunAsync( | ||
| new CacheStoreJob( | ||
| time, | ||
| objectCache, | ||
| keyAlignmentStrategy, | ||
| file.data(), | ||
| uploadFuture, | ||
| metrics::cacheStoreFinished | ||
| ), | ||
| // Reuse the upload executor for caching as well. | ||
| // Caching is substantially faster than uploading, | ||
| // so this should be fine to allocating some upload executor's capacity to caching. | ||
| executorServiceUpload | ||
| ); |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states that caching is reused with the upload thread pool because "caching is substantially faster than uploading". However, the actual implementation shows caching happens after commit completes via commitFuture.thenRunAsync(). This means cache operations could be delayed if the commit queue is backed up, and they compete for upload executor threads that may be busy with actual uploads. Consider whether this design meets the performance expectations stated in the comment, especially under heavy load when the upload executor may be saturated.
| // (3) validation prevents the real bug (multithreaded executor causing ordering violations) | ||
| // compared to a theoretical misconfiguration that would be immediately obvious. |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment explains using newFixedThreadPool(1) instead of newSingleThreadExecutor() for validation purposes, but the tradeoff section mentions three reasons why this is acceptable. However, the third reason states "validation prevents the real bug (multithreaded executor causing ordering violations) compared to a theoretical misconfiguration that would be immediately obvious" - this sentence is unclear and seems incomplete. The comparison between what validation prevents and what is "immediately obvious" is confusing. Consider rephrasing for clarity.
| // (3) validation prevents the real bug (multithreaded executor causing ordering violations) | |
| // compared to a theoretical misconfiguration that would be immediately obvious. | |
| // (3) the validation ensures we never accidentally use a multi-threaded executor that would | |
| // break commit ordering, whereas any change to the pool size would be a deliberate and | |
| // immediately noticeable code modification. |
Moves all ExecutorService creation and lifecycle management from Reader, Writer, and FileCommitter to SharedState, establishing a centralized pattern for managing broker-scoped thread pools.
This refactoring also eliminates an unnecessary executor (cache store) and fixes a resource leak bug.
Previously, Reader, Writer, and FileCommitter each created and managed their own thread pools, violating the Single Responsibility Principle:
Instead of having the CacheStoreThreadPool, reuse the existing upload thread pool for caching.