make resulting effect cancellable again#192
Conversation
core/shared/src/test/scala/cats/effect/cps/AsyncAwaitSpec.scala
Outdated
Show resolved
Hide resolved
|
Somehow the "be cancellable" test is now flaky - I could make it pass reliably with this change: 99e8dbf. But this looks suspicious to me, maybe there is something else going on? |
|
I have added a potential fix, which makes the test pass without the above mentioned workaround. See: c4e98a8 There, I have introduced a stopSignal to cancel tasks running in the dispatcher, when the actual effect is cancelled. Please let me know, if this makes sense or whether there is a better fix. From what I can see, it does solve the issue at hand. |
| // such as OptionT, EitherT, ... | ||
| var awaitedValue: Option[AnyRef] = None | ||
| F.uncancelable { poll => | ||
| F.race(stopSignal, F.uncancelable { poll => |
There was a problem hiding this comment.
Why is the race required here? Shouldn't Dispatcher itself shut things down?
There was a problem hiding this comment.
I thought the same, but this comment indicates that maybe not : #192 (comment)
There was a problem hiding this comment.
Indeed, this is referencing the mentioned workaround. Looking at the commit before, there is a green checkmark, but actually the test is skipped (it is timing out!). See: https://github.com/typelevel/cats-effect-cps/actions/runs/7575437745/job/20632044148#step:10:64
The test gets stuck somewhere when waiting for the defer.get (in the test) - and my debugging journey led me to believe that the dispatcher is working on some task and is not shutting down.
This is not always happening, which is why I did not realize it initially when testing locally.
There was a problem hiding this comment.
What is interesting though:
- Why does this workaround (99e8dbf) in the test fix the issue as well?
- When replacing the
defer.get(in the test) withsleepof around 300 millis, it also reliably works in the test - so the dispatcher is normally stopping, but maybe something can deadlock here?
There was a problem hiding this comment.
I am wondering, whether it makes sense to first merge the change from F.async_ to a cancellable F.async. Then we can discuss the remaining issues in a separate issue.
There was a problem hiding this comment.
@cornerman was the original test passing consistently after you changed the async call ? Because if that was the case, sure, please open a more atomic PR
There was a problem hiding this comment.
@Baccata 👍 Yes, it was. I have now updated the PR to only contain one fix. Also leaving the existing test in place and instead adding a new passing test - though this one has a TODO that we should tackle another time.
| ${F}.flatten { | ||
| _root_.cats.effect.std.Dispatcher.sequential[$effect].use { dispatcher => | ||
| ${F}.async_[$name#FF[AnyRef]](cb => new $name(dispatcher, cb).start()) | ||
| ${F}.async[$name#FF[AnyRef]](cb => ${F}.delay { new $name(dispatcher, cb).start(); Some(${F}.unit) }) |
There was a problem hiding this comment.
I'm wary of the F.unit call here : I think what should happen instead is some mutation to the StateMachine that could/would be inspected in any subsequent step in the state machine to short circuit the computation
it'd look like :
${F}.delay {
val stateMachine = $name(dispatcher, cb)
stateMachine.start()
Some(${F}.delay(stateMachine.cancel()))
}
Then in AsyncStateMachine, you'd have something like
private val isCancelled = new AtomicBoolean(false)
def cancel() : Unit = isCancelled.set(true)
def onComplete(f: F[AnyRef]) : Unit = {
// short-circuits the next call to dispatcher
if (isCancelled.get) {
// surfaces the cancellation to the `async` invocation.
this(Left(F.canceled.asInstanceOf[F[AnyRef]]))
} else dispatcher.unsafeRunAndForget(...)
}
There was a problem hiding this comment.
@djspiewak, mind giving a sanity to what I said above ? Also maybe an AtomicBoolean is overkill and a simple @volatile var boolean might do the trick.
There was a problem hiding this comment.
I think this is exactly correct. What's being revealed here is the fact that AsyncStateMachine at present has no support for cancelation, but the old properties of async_ caused that reality to be ignored, effectively leaking fibers (and potentially even creating thread starvation issues) when async/await blocks were canceled.
Also yes, a volatile should be mostly sufficient in this case. You're really just looking for a flag to make it stop. The one trick though is you'll need to make sure the finalizer itself asynchronously blocks until the state machine has been fully interrupted, which obviously might take a bit.
Currently, any effect created by the
asyncmacro is not cancelable. This PR fixes that.Probably a regression after the semantics of
async_/asyncchanged in regards to cancellability.I have also fixed the test for "be cancellable" - which was not asserting correctly.