GROOVY-12033: groovy.concurrent.Actor: pre-GA hardening — Stop sentin…#2555
Open
paulk-asert wants to merge 1 commit into
Open
GROOVY-12033: groovy.concurrent.Actor: pre-GA hardening — Stop sentin…#2555paulk-asert wants to merge 1 commit into
paulk-asert wants to merge 1 commit into
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2555 +/- ##
==================================================
+ Coverage 68.1611% 68.2021% +0.0410%
- Complexity 33112 33186 +74
==================================================
Files 1508 1510 +2
Lines 126154 126433 +279
Branches 22890 22924 +34
==================================================
+ Hits 85988 86230 +242
- Misses 32540 32567 +27
- Partials 7626 7636 +10
🚀 New features to boost your workflow:
|
39f80c1 to
eab66d0
Compare
…el, error callback, bounded mailbox, per-actor pool
✅ All tests passed ✅Test Summary
🏷️ Commit: 296731d Learn more about TestLens at testlens.app. |
| throw new RuntimeException("Interrupted while sending to actor", ie); | ||
| } | ||
| } catch (Throwable t) { | ||
| } |
Comment on lines
+503
to
+534
| // Chicken-and-egg: the task wants to deregister itself from | ||
| // pendingTimers after firing, but the ScheduledFuture doesn't | ||
| // exist until schedule(...) returns. Stash it via AtomicReference | ||
| // and read it inside the task. | ||
| AtomicReference<ScheduledFuture<?>> ref = new AtomicReference<>(); | ||
| Runnable task = () -> { | ||
| // Hand the send off to the async executor so the scheduler | ||
| // thread never blocks on a full BLOCK mailbox. The scheduler | ||
| // is a shared resource (one per JVM); blocking it on one | ||
| // actor's bound would starve every other actor's timers. | ||
| AsyncSupport.getExecutor().execute(() -> { | ||
| try { | ||
| send(message); | ||
| } catch (IllegalStateException ignored) { | ||
| // Actor was stopped between schedule and fire — drop. | ||
| } | ||
| }); | ||
| ScheduledFuture<?> f = ref.get(); | ||
| if (f != null) pendingTimers.remove(f); | ||
| }; | ||
| ScheduledFuture<?> future = AsyncSupport.getScheduler() | ||
| .schedule(task, delay.toNanos(), TimeUnit.NANOSECONDS); | ||
| // Race window: with a near-zero delay, the task can fire and | ||
| // reach `ref.get()` before this set() runs, observing null and | ||
| // skipping its self-deregistration. The future is then added to | ||
| // pendingTimers below and stays there until stop() cancels it | ||
| // (a no-op on an already-fired future). Functionally benign — | ||
| // just a dangling entry until shutdown — so we don't pay for a | ||
| // compareAndSet here. Documented for future readers. | ||
| ref.set(future); | ||
| pendingTimers.add(future); | ||
| return new TimerCancellable(future, pendingTimers); |
Comment on lines
+537
to
+559
| private Cancellable scheduleAtFixedRateInternal(T message, Duration initialDelay, Duration interval) { | ||
| checkOnWorkerThread("scheduleAtFixedRate()"); | ||
| Objects.requireNonNull(message, "message must not be null"); | ||
| Objects.requireNonNull(initialDelay, "initialDelay must not be null"); | ||
| Objects.requireNonNull(interval, "interval must not be null"); | ||
| // Periodic timers never self-deregister — they fire repeatedly | ||
| // until cancelled explicitly or via stop(). Each fire offloads | ||
| // the send to the async executor so a BLOCK-bounded actor can | ||
| // never starve the shared scheduler thread. | ||
| Runnable task = () -> AsyncSupport.getExecutor().execute(() -> { | ||
| try { | ||
| send(message); | ||
| } catch (IllegalStateException ignored) { | ||
| // Actor stopped — drop. | ||
| } | ||
| }); | ||
| ScheduledFuture<?> future = AsyncSupport.getScheduler() | ||
| .scheduleAtFixedRate(task, | ||
| initialDelay.toNanos(), | ||
| interval.toNanos(), | ||
| TimeUnit.NANOSECONDS); | ||
| pendingTimers.add(future); | ||
| return new TimerCancellable(future, pendingTimers); |
Comment on lines
+189
to
+191
| * a full mailbox will block the shared scheduler thread; prefer | ||
| * {@link ActorOptions.Overflow#FAIL FAIL} or | ||
| * {@link ActorOptions.Overflow#DROP_NEWEST DROP_NEWEST} for |
Comment on lines
+307
to
+312
| actor.stop() // flips isActive() to false immediately | ||
| assert !actor.isActive() // refuses new sends from this point on | ||
|
|
||
| actor.stop() // graceful: processes remaining messages then exits | ||
| Thread.sleep(50) | ||
| assert !actor.isActive() | ||
| // The worker may still be draining queued messages. Poll for terminated | ||
| // when you actually need to be sure the actor has finished shutting down. | ||
| while (!actor.isTerminated()) Thread.sleep(10) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
…el, error callback, bounded mailbox, per-actor pool