-
Notifications
You must be signed in to change notification settings - Fork 5.1k
CAMEL-22898: Fix memory leak in Google PubSub consumer #21209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🤖 CI automation will test this PR automatically. 🐫 Apache Camel Committers, please review the following items:
|
When a subscription doesn't exist, the subscriber.startAsync().awaitRunning() call throws a NotFoundException. However, the subscriber was being added to the subscribers list BEFORE this call, causing it to accumulate in the list on repeated failures. This fix: 1. Moves the subscribers.add() call to AFTER successful startup 2. Adds proper cleanup in catch block to remove subscribers on failure 3. Adds a 5-second backoff delay to prevent tight loop on persistent failures 4. Ensures cleanup in finally block Applied to both GooglePubsubConsumer and GooglePubsubLiteConsumer.
b79c7bd to
f7efc21
Compare
...ite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
Outdated
Show resolved
Hide resolved
…rors Enhanced error handling to distinguish between recoverable and non-recoverable errors using ApiException.isRetryable(): - Recoverable errors (e.g., ABORTED, UNAVAILABLE, UNKNOWN): Continue retrying with 5-second backoff delay - Non-recoverable errors (e.g., NotFoundException, permission errors): Exit the subscriber loop immediately without retrying This prevents infinite retry loops for errors that cannot be recovered, while still allowing automatic recovery from transient failures. Applied to both GooglePubsubConsumer and GooglePubsubLiteConsumer.
… delays Replaced Thread.sleep() with Tasks.foregroundTask() using IterationBudget to implement the 5-second backoff delay for recoverable errors. This integrates better with Camel's task management framework. Applied to both GooglePubsubConsumer and GooglePubsubLiteConsumer.
2c35419 to
9fda7c8
Compare
The ForegroundTask only sleeps between iterations, not before the first iteration. For a one-time delay with maxIterations(1), we must use withInitialDelay() to ensure the delay happens before the first iteration. This fixes the backoff delay in both GooglePubsubConsumer and GooglePubsubLiteConsumer.
Resolved conflict in GooglePubsubConsumer.java by keeping both the new UnitOfWorkHelper import from main and our Task API imports.
Description
Fixes memory leak in Google PubSub consumer when subscription doesn't exist.
Problem
When a subscription no longer exists, starting a subscriber throws a
NotFoundException. However, the subscriber was being added to thesubscriberslist BEFORE thestartAsync().awaitRunning()call. This caused subscribers to accumulate in the list on repeated failures, leading to unbounded memory growth until JVM runs out of memory.Additionally, there was no backoff delay, causing a tight loop that consumed CPU resources.
Solution
This fix addresses the issue in both
GooglePubsubConsumerandGooglePubsubLiteConsumer:Testing
Related Issue
https://issues.apache.org/jira/browse/CAMEL-22898
Pull Request opened by Augment Code with guidance from the PR author