Skip to content

NIFI-15883: Fix Wait/Notify race condition with CAS-aware replace and complete#11275

Open
rakesh-rsky wants to merge 1 commit into
apache:mainfrom
rakesh-rsky:fix/NIFI-15883-wait-notify-race-condition
Open

NIFI-15883: Fix Wait/Notify race condition with CAS-aware replace and complete#11275
rakesh-rsky wants to merge 1 commit into
apache:mainfrom
rakesh-rsky:fix/NIFI-15883-wait-notify-race-condition

Conversation

@rakesh-rsky
Copy link
Copy Markdown
Contributor

@rakesh-rsky rakesh-rsky commented May 22, 2026

NIFI-15883: Fix Wait/Notify race condition with CAS-aware replace and complete

  • WaitNotifyProtocol.replace(Signal): Changed from returning boolean to void.
    Throws ConcurrentModificationException when the underlying cache.replace()
    returns false (CAS failure due to concurrent Notify), making the API
    consistent with complete(Signal) which also throws on concurrent modification.

  • WaitNotifyProtocol.notify(): Updated retry loop to catch
    ConcurrentModificationException from replace() instead of checking
    the boolean return value.

  • WaitNotifyProtocol.complete(Signal): Replaces raw cache.remove() with a
    version-aware approach that re-fetches the cache entry and compares
    revisions before removing. Throws ConcurrentModificationException if
    a concurrent Notify updated the signal between Wait's initial read
    and the remove call, preventing silent data loss.

  • Wait.java: Simplified call sites - protocol.replace(signal) and
    protocol.complete(signal) both now throw ConcurrentModificationException
    on concurrent modification. The catch block handles both IOException and
    ConcurrentModificationException with session rollback and ProcessException.

  • TestWaitNotifyProtocol: Add three tests for complete(Signal):
    happy-path removal, concurrent-modification detection, and no-op
    when the signal is already absent.

  • TestWait: Add two integration tests that verify session rollback and
    ProcessException wrapping when replace() throws on CAS failure
    (waitProgressed path) and when complete() detects a stale revision
    (waitCompleted path).

Summary

NIFI-15883

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

}
} else if (waitProgressed) {
protocol.replace(signal);
if (!protocol.replace(signal)) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All though the fix works but the implementation of complete() and replace() are inconsistent. complete() throws the exception, but for replace the caller have to check.

It would be better to that replace() also throws exception when concurrent updates are detected.

Copy link
Copy Markdown

@sheelchand-bah sheelchand-bah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: make replace() and complete() function consistent - either both return true/false or both throw the exception.

@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15883-wait-notify-race-condition branch from 519d519 to ddbe7f0 Compare May 22, 2026 17:24
@rakesh-rsky
Copy link
Copy Markdown
Contributor Author

@sheelchand-bah Thank you for the review.

Good point on API consistency. Both replace(Signal) and complete(Signal) now throw ConcurrentModificationException on concurrent modification — callers don't need to check return values. I also updated the retry loop in notify() to catch ConcurrentModificationException from replace() and continue retrying (up to MAX_REPLACE_RETRY_COUNT = 5), since that was the previous behavior when it returned false.

Copy link
Copy Markdown
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on the issue @rakesh-rsky!

The general concept looks good. Please find my comments below.

final Object actualRevision = current.cachedEntry.getRevision().orElse(null);
if (expectedRevision != null && !expectedRevision.equals(actualRevision)) {
throw new ConcurrentModificationException(String.format(
"Failed to complete signal [%s]: signal was concurrently modified (expected revision %s, found %s). Will retry.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the caller whether it retries the transaction or gives up, so I suggest removing the "Will retry." suffix.

return cache.replace(signal.cachedEntry, stringSerializer, stringSerializer);
if (!cache.replace(signal.cachedEntry, stringSerializer, stringSerializer)) {
throw new ConcurrentModificationException(String.format(
"Failed to update signal [%s] in cache due to concurrent modification. Will retry.", signal.identifier));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as line 316.

Comment on lines +307 to +309
final Signal current = getSignal(signalId);
if (current == null) {
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this case should also be treated as a concurrent modification and should throw an exception.

Another process may have consumed and removed the signal after we initially read it, which means the value we based our decision on was no longer valid.

Possible scenario:

  • signal counter = 1 initially
  • 2 concurrent threads read the signal
  • both decrease the counter to 0 and resume the waiting FlowFile
  • both removes the signal (as the counter reached 0)

=> only 1 FF should have been resumed based on the counter

Comment on lines 574 to 577
} catch (final IOException | ConcurrentModificationException e) {
session.rollback();
throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the event of an optimistic lock failure, the Wait processor should retry the transaction silently rather than failing the FlowFile immediately. (similar to the for cycle and MAX_REPLACE_RETRY_COUNT in WaitNotifyProtocol.notify()).

I see the Wait flow is more complicated and a simple for cycle around it may not be possible.

Minimally, I would separate the exception handling:

  • the Unable to communicate with cache error message is not appropriate for the concurrent modification case
  • warning log may be enough instead of error

The preferred way would definitely be the internal retry with max retry count.

… complete

- WaitNotifyProtocol.replace(Signal): Changed from returning boolean to void.
  Throws ConcurrentModificationException when the underlying cache.replace()
  returns false (CAS failure due to concurrent Notify), making the API
  consistent with complete(Signal) which also throws on concurrent modification.

- WaitNotifyProtocol.notify(): Updated retry loop to catch
  ConcurrentModificationException from replace() instead of checking
  the boolean return value.

- WaitNotifyProtocol.complete(Signal): Replaces raw cache.remove() with a
  version-aware approach that re-fetches the cache entry and compares
  revisions before removing. Throws ConcurrentModificationException if
  a concurrent Notify updated the signal between Wait's initial read
  and the remove call, preventing silent data loss.

- Wait.java: Simplified call sites - protocol.replace(signal) and
  protocol.complete(signal) both now throw ConcurrentModificationException
  on concurrent modification. The catch block handles both IOException and
  ConcurrentModificationException with session rollback and ProcessException.

- TestWaitNotifyProtocol: Add three tests for complete(Signal):
  happy-path removal, concurrent-modification detection, and no-op
  when the signal is already absent.

- TestWait: Add two integration tests that verify session rollback and
  ProcessException wrapping when replace() throws on CAS failure
  (waitProgressed path) and when complete() detects a stale revision
  (waitCompleted path).
@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15883-wait-notify-race-condition branch from ddbe7f0 to 16d1446 Compare May 27, 2026 05:15
@rakesh-rsky rakesh-rsky requested a review from turcsanyip May 27, 2026 05:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants