Skip to content

Commit 2be7af1

Browse files
committed
Add support for stop(cause:).
1 parent f30c2f8 commit 2be7af1

File tree

2 files changed

+64
-11
lines changed

2 files changed

+64
-11
lines changed

lib/async/task.rb

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,39 @@
1818
module Async
1919
# Raised when a task is explicitly stopped.
2020
class Stop < Exception
21+
# Represents the source of the stop operation.
22+
class Cause < Exception
23+
if RUBY_VERSION >= "3.2"
24+
def self.backtrace
25+
caller_locations(2..-1)
26+
end
27+
else
28+
def self.backtrace
29+
caller(2..-1)
30+
end
31+
end
32+
33+
def self.for(message = "Task was stopped")
34+
instance = self.new(message)
35+
instance.set_backtrace(self.backtrace)
36+
return instance
37+
end
38+
end
39+
40+
# Create a new stop operation.
41+
def initialize(message = "Task was stopped")
42+
super(message)
43+
end
44+
2145
# Used to defer stopping the current task until later.
2246
class Later
2347
# Create a new stop later operation.
2448
#
2549
# @parameter task [Task] The task to stop later.
26-
def initialize(task)
50+
# @parameter cause [Exception] The cause of the stop operation.
51+
def initialize(task, cause = nil)
2752
@task = task
53+
@cause = cause
2854
end
2955

3056
# @returns [Boolean] Whether the task is alive.
@@ -34,7 +60,7 @@ def alive?
3460

3561
# Transfer control to the operation - this will stop the task.
3662
def transfer
37-
@task.stop
63+
@task.stop(false, cause: @cause)
3864
end
3965
end
4066
end
@@ -266,7 +292,13 @@ def wait
266292
# If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later.
267293
#
268294
# @parameter later [Boolean] Whether to stop the task later, or immediately.
269-
def stop(later = false)
295+
# @parameter cause [Exception] The cause of the stop operation.
296+
def stop(later = false, cause: $!)
297+
# If no cause is given, we generate one from the current call stack:
298+
unless cause
299+
cause = Stop::Cause.for("Stopping task!")
300+
end
301+
270302
if self.stopped?
271303
# If the task is already stopped, a `stop` state transition re-enters the same state which is a no-op. However, we will also attempt to stop any running children too. This can happen if the children did not stop correctly the first time around. Doing this should probably be considered a bug, but it's better to be safe than sorry.
272304
return stopped!
@@ -280,27 +312,27 @@ def stop(later = false)
280312
# If we are deferring stop...
281313
if @defer_stop == false
282314
# Don't stop now... but update the state so we know we need to stop later.
283-
@defer_stop = true
315+
@defer_stop = cause
284316
return false
285317
end
286318

287319
if self.current?
288320
# If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`:
289321
if later
290322
# If the fiber is the current fiber and we want to stop it later, schedule it:
291-
Fiber.scheduler.push(Stop::Later.new(self))
323+
Fiber.scheduler.push(Stop::Later.new(self, cause))
292324
else
293325
# Otherwise, raise the exception directly:
294-
raise Stop, "Stopping current task!"
326+
raise Stop, "Stopping current task!", cause: cause
295327
end
296328
else
297329
# If the fiber is not curent, we can raise the exception directly:
298330
begin
299331
# There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later.
300-
Fiber.scheduler.raise(@fiber, Stop)
332+
Fiber.scheduler.raise(@fiber, Stop, cause: cause)
301333
rescue FiberError
302334
# In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later:
303-
Fiber.scheduler.push(Stop::Later.new(self))
335+
Fiber.scheduler.push(Stop::Later.new(self, cause))
304336
end
305337
end
306338
else
@@ -340,7 +372,7 @@ def defer_stop
340372

341373
# If we were asked to stop, we should do so now:
342374
if defer_stop
343-
raise Stop, "Stopping current task (was deferred)!"
375+
raise Stop, "Stopping current task (was deferred)!", cause: defer_stop
344376
end
345377
end
346378
else
@@ -351,7 +383,7 @@ def defer_stop
351383

352384
# @returns [Boolean] Whether stop has been deferred.
353385
def stop_deferred?
354-
@defer_stop
386+
!!@defer_stop
355387
end
356388

357389
# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.

test/async/task.rb

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,27 @@
541541
expect(transient).to be(:running?)
542542
end.wait
543543
end
544+
545+
it "can stop a task and provide a cause" do
546+
error = nil
547+
548+
cause = StandardError.new("boom")
549+
cause.set_backtrace(caller_locations)
550+
551+
task = reactor.async do |task|
552+
begin
553+
task.stop(cause: cause)
554+
rescue Async::Stop => error
555+
raise
556+
end
557+
end
558+
559+
reactor.run
560+
561+
expect(task).to be(:stopped?)
562+
expect(error).to be_a(Async::Stop)
563+
expect(error.cause).to be == cause
564+
end
544565
end
545566

546567
with "#sleep" do
@@ -910,7 +931,7 @@ def sleep_forever
910931

911932
reactor.run_once(0)
912933

913-
expect(child_task.stop_deferred?).to be == nil
934+
expect(child_task.stop_deferred?).to be == false
914935
end
915936
end
916937

0 commit comments

Comments
 (0)