diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index e91d97c..0f6263c 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -44,4 +44,4 @@ jobs: - name: Run tests timeout-minutes: 10 - run: bundle exec bake test + run: bundle exec sus --verbose diff --git a/async-container.gemspec b/async-container.gemspec index da9d466..3741248 100644 --- a/async-container.gemspec +++ b/async-container.gemspec @@ -24,5 +24,7 @@ Gem::Specification.new do |spec| spec.required_ruby_version = ">= 3.3" - spec.add_dependency "async", "~> 2.22" + spec.add_dependency "async", "~> 2.41" + spec.add_dependency "async-signals", "~> 0.6" + spec.add_dependency "io-event", "~> 1.18" end diff --git a/lib/async/container.rb b/lib/async/container.rb index bd0d47a..425ccab 100644 --- a/lib/async/container.rb +++ b/lib/async/container.rb @@ -3,11 +3,7 @@ # Released under the MIT License. # Copyright, 2017-2025, by Samuel Williams. -require_relative "container/controller" +# This sets up graceful handling of SIGINT and SIGTERM. +require "async/signals/graceful" -# @namespace -module Async - # @namespace - module Container - end -end +require_relative "container/controller" diff --git a/lib/async/container/controller.rb b/lib/async/container/controller.rb index 75b1a95..79ef3a0 100644 --- a/lib/async/container/controller.rb +++ b/lib/async/container/controller.rb @@ -10,6 +10,10 @@ require_relative "notify" require_relative "policy" +require "async" +require "async/signals" +require "async/signals/graceful" + module Async module Container # The default graceful stop policy for controllers. @@ -27,9 +31,35 @@ module Container # Manages the life-cycle of one or more containers in order to support a persistent system. # e.g. a web server, job server or some other long running system. class Controller + # Represents a trapped process signal as a queued controller event. + class SignalEvent + # Initialize the signal event. + # @parameter signal [Symbol | String | Integer] The signal that was received. + # @parameter handler [Proc] The handler to invoke when the event is processed. + def initialize(signal, handler) + @signal = signal + @handler = handler + end + + # @attribute [Symbol | String | Integer] The signal that was received. + attr :signal + + # Process the signal event by invoking the registered handler. + def call + @handler.call + end + end + + # Represents the end of the active container lifecycle. + class StopEvent + # Process the stop event. + def call + end + end + + STOP_EVENT = StopEvent.new.freeze + SIGHUP = Signal.list["HUP"] - SIGINT = Signal.list["INT"] - SIGTERM = Signal.list["TERM"] SIGUSR1 = Signal.list["USR1"] SIGUSR2 = Signal.list["USR2"] @@ -41,10 +71,23 @@ def initialize(notify: Notify.open!, container_class: Container, graceful_stop: @graceful_stop = graceful_stop @container = nil - @signals = {} + @events = ::Thread::Queue.new + @signals = Async::Signals::Handlers.new + @running = false + + # Serializes lifecycle transitions such as start, restart and reload. `Container#stop` (which can also take time) is performed outside this guard, so that live container events are not blocked by the stop operation (e.g. restarting). + @guard = ::Thread::Mutex.new self.trap(SIGHUP) do self.restart + rescue SetupError => error + Console.error(self, error) + end + + self.trap(SIGUSR1) do + self.reload + rescue SetupError => error + Console.error(self, error) end end @@ -80,7 +123,15 @@ def to_s # @parameters signal [Symbol] The signal to trap, e.g. `:INT`. # @parameters block [Proc] The signal handler to invoke. def trap(signal, &block) - @signals[signal] = block + if block + event = SignalEvent.new(signal, block).freeze + + @signals.trap(signal) do + enqueue_event(event) + end + else + @signals.ignore(signal) + end end # Create a policy for managing child lifecycle events. @@ -100,12 +151,12 @@ def create_container # Whether the controller has a running container. # @returns [Boolean] def running? - !!@container + @guard.synchronize{!!@container} end # Wait for the underlying container to start. def wait - @container&.wait + @guard.synchronize{@container}&.wait end # Spawn container instances into the given container. @@ -117,160 +168,199 @@ def setup(container) end # Start the container unless it's already running. - def start - unless @container - Console.info(self, "Controller starting...") - self.restart - end - - Console.info(self, "Controller started.") + # @returns [Generic] The container. + def restart + self.start(restart: true) end # Stop the container if it's running. - # @parameter graceful [Boolean] Whether to give the children instances time to shut down or to kill them immediately. + # @parameter graceful [Boolean | Numeric] Whether to give the children instances time to shut down or to kill them immediately. def stop(graceful = @graceful_stop) - @container&.stop(graceful) - @container = nil + container = nil + + @guard.synchronize do + if container = @container + @container = nil + end + end + + container&.stop(graceful) end # Restart the container. A new container is created, and if successful, any old container is terminated gracefully. # This is equivalent to a blue-green deployment. - def restart - if @container - @notify&.restarting! - - Console.info(self, "Restarting container...") - else - Console.info(self, "Starting container...") - end - - container = self.create_container + def start(restart: false) + old_container = nil + new_container = nil - begin - self.setup(container) - rescue => error - @notify&.error!(error.to_s) + @guard.synchronize do + if @container + if restart + @notify&.restarting! + + Console.info(self, "Restarting container...") + else + return @container + end + else + Console.info(self, "Starting container...") + end - raise SetupError, container - end - - # Wait for all child processes to enter the ready state. - Console.info(self, "Waiting for startup...") - container.wait_until_ready - Console.info(self, "Finished startup.") - - if container.failed? - @notify&.error!("Container failed to start!") + container = self.create_container + + begin + self.setup(container) + rescue => error + @notify&.error!(error.to_s) + + raise SetupError, container + end + + # Wait for all child processes to enter the ready state. + Console.info(self, "Waiting for startup...") + + container.wait_until_ready - raise SetupError, container + Console.info(self, "Finished startup.") + + if container.failed? + @notify&.error!("Container failed to start!") + + raise SetupError, container + end + + # The following swap should be atomic: + old_container = @container + @container = container + new_container = container + container = nil + + @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") + rescue => error + raise + ensure + # If we are leaving this function with an exception, kill the container: + if container + Console.warn(self, "Stopping failed container...", exception: error) + container.stop(false) + end end - # The following swap should be atomic: - old_container = @container - @container = container - container = nil - if old_container Console.info(self, "Stopping old container...") - old_container&.stop(@graceful_stop) + old_container.stop(@graceful_stop) end - @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") - rescue => error - raise - ensure - # If we are leaving this function with an exception, kill the container: - if container - Console.warn(self, "Stopping failed container...", exception: error) - container.stop(false) - end + return new_container end # Reload the existing container. Children instances will be reloaded using `SIGHUP`. def reload - @notify&.reloading! - - Console.info(self){"Reloading container: #{@container}..."} - - begin - self.setup(@container) - rescue - raise SetupError, container + @guard.synchronize do + @notify&.reloading! + + Console.info(self){"Reloading container: #{@container}..."} + + begin + self.setup(@container) + rescue + raise SetupError, @container + end + + # Wait for all child processes to enter the ready state. + Console.info(self, "Waiting for startup...") + @container.wait_until_ready + Console.info(self, "Finished startup.") + + if @container.failed? + @notify.error!("Container failed to reload!") + + raise SetupError, @container + else + @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") + end end - - # Wait for all child processes to enter the ready state. - Console.info(self, "Waiting for startup...") - @container.wait_until_ready - Console.info(self, "Finished startup.") - - if @container.failed? - @notify.error!("Container failed to reload!") + end + + private def enqueue_event(event) + @events << event + end + + private def open_event_queue + @guard.synchronize do + if @running + raise RuntimeError, "Controller is already running." + end - raise SetupError, @container - else - @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") + @running = true + @events = ::Thread::Queue.new end end - # Enter the controller run loop, trapping `SIGINT` and `SIGTERM`. - def run - @notify&.status!("Initializing controller...") - - with_signal_handlers do - self.start + private def stop_event_queue(events) + events << STOP_EVENT + end + + private def finish_event_queue(events) + @guard.synchronize do + if @events.equal?(events) + @running = false + @events = ::Thread::Queue.new + end + end + end + + private def wait_for_container(events) + while true + container = @guard.synchronize{@container} - while @container&.running? - begin - @container.wait - rescue SignalException => exception - if handler = @signals[exception.signo] - begin - handler.call - rescue SetupError => error - Console.error(self, error) - end - else - raise - end + if container.nil? + stop_event_queue(events) + return + end + + container.wait + + @guard.synchronize do + # If this is still the active container, it completed naturally. Clear it and close the event queue so the controller run loop can finish. If it was replaced by a restart, keep waiting for the new active container. + if @container.equal?(container) + @container = nil + stop_event_queue(events) + return end end end - rescue Interrupt - self.stop - rescue Terminate - self.stop(false) - ensure - self.stop(false) end - private def with_signal_handlers - # I thought this was the default... but it doesn't always raise an exception unless you do this explicitly. - - interrupt_action = Signal.trap(:INT) do - # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. - # $stderr.puts "Received INT signal, interrupting...", caller - ::Thread.current.raise(Interrupt) - end - - # SIGTERM behaves the same as SIGINT by default. - terminate_action = Signal.trap(:TERM) do - # $stderr.puts "Received TERM signal, interrupting...", caller - ::Thread.current.raise(Interrupt) # Same as SIGINT - end - - hangup_action = Signal.trap(:HUP) do - # $stderr.puts "Received HUP signal, restarting...", caller - ::Thread.current.raise(Restart) - end + # Enter the controller run loop. + # @parameter signals [#install] The signal backend to use while running the controller. + def run(signals: Async::Signals.default) + @notify&.status!("Initializing controller...") + events = open_event_queue - ::Thread.handle_interrupt(SignalException => :never) do - yield + signals.install(@signals) do + Sync do |task| + self.start + + task.async{wait_for_container(events)} + + while event = events.pop + event.call + + break if event.equal?(STOP_EVENT) + end + rescue Async::Cancel + # Graceful shutdown: + self.stop + ensure + # Forced shutdown: + self.stop(false) + end end + rescue Interrupt + self.stop(false) ensure - # Restore the interrupt handler: - Signal.trap(:INT, interrupt_action) - Signal.trap(:TERM, terminate_action) - Signal.trap(:HUP, hangup_action) + finish_event_queue(events) if events end end end diff --git a/lib/async/container/error.rb b/lib/async/container/error.rb index 299058d..71d7213 100644 --- a/lib/async/container/error.rb +++ b/lib/async/container/error.rb @@ -31,16 +31,6 @@ def initialize end end - # Similar to {Interrupt}, but represents `SIGHUP`. - class Restart < SignalException - SIGHUP = Signal.list["HUP"] - - # Create a new restart error. - def initialize - super(SIGHUP) - end - end - # Represents the error which occured when a container failed to start up correctly. class SetupError < Error # Create a new setup error. diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 21fe621..0dc0333 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -104,13 +104,12 @@ def self.fork(**options) # Fork from `Thread.new` so the child does not inherit the parent fiber scheduler or the current caller's fiber stack. Only this short-lived thread is copied into the child process: ::Thread.new do ::Process.fork do - # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. - Signal.trap(:INT){::Thread.current.raise(Interrupt)} - Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. - Signal.trap(:HUP){::Thread.current.raise(Restart)} + # Convert process signals into pending interrupts on the surviving fork thread so they respect `Thread.handle_interrupt` in the child: + ::Signal.trap(:INT){::Thread.current.raise(::Interrupt)} + ::Signal.trap(:TERM){::Thread.current.raise(::Interrupt)} - # Reset `SignalException` delivery because CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`. Async deliberately masks signal exceptions, and the signal traps above should be delivered promptly: - ::Thread.handle_interrupt(SignalException => :immediate) do + # Reset interrupt masking - `Exception` is a fast path: + ::Thread.handle_interrupt(Exception => :immediate) do yield Instance.for(process) rescue Interrupt # Graceful exit. diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index def3182..5a1eb96 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -230,11 +230,9 @@ def wait_for_children(duration = nil) # Wait for a child process to exit OR a signal to be received. def select(duration) - ::Thread.handle_interrupt(SignalException => :immediate) do - readable, _, _ = ::IO.select(@running.keys, nil, nil, duration) - - return readable - end + readable, _, _ = ::IO.select(@running.keys, nil, nil, duration) + + return readable end end end diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 5224519..23c2531 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -211,11 +211,6 @@ def kill! @thread.kill end - # Raise {Restart} in the child thread. - def restart! - @thread.raise(Restart) - end - # Wait for the thread to exit and return he exit status. # @asynchronous This method may block. # diff --git a/lib/async/container/version.rb b/lib/async/container/version.rb index cd71a1a..62925ef 100644 --- a/lib/async/container/version.rb +++ b/lib/async/container/version.rb @@ -3,7 +3,9 @@ # Released under the MIT License. # Copyright, 2017-2026, by Samuel Williams. +# @namespace module Async + # @namespace module Container VERSION = "0.37.0" end diff --git a/readme.md b/readme.md index 22b9885..dbb8317 100644 --- a/readme.md +++ b/readme.md @@ -28,6 +28,10 @@ Please see the [project documentation](https://socketry.github.io/async-containe Please see the [project releases](https://socketry.github.io/async-container/releases/index) for all releases. +### Unreleased + + - Add `Async::Container::Signals` for installing scoped signal traps that enqueue signal events. + ### v0.37.0 - Rename `ASYNC_CONTAINER_GRACEFUL_TIMEOUT` to `ASYNC_CONTAINER_GRACEFUL_STOP` and apply it at the controller level as `GRACEFUL_STOP`. `Group#stop` now only applies the shutdown policy it is given. @@ -64,10 +68,6 @@ Please see the [project releases](https://socketry.github.io/async-container/rel - Add `Async::Container::Generic#stopping?` so that policies can more accurately track the state of the container. -### v0.33.0 - - - Add `Policy#make_statistics` to allow policies to customize statistics initialization. - ## Contributing We welcome contributions to this project. diff --git a/releases.md b/releases.md index 73d4f2f..f9f7d1d 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,11 @@ # Releases +## Unreleased + + - Use `async-signals` to coordinate controller signal traps while queueing signal events through the controller event loop. + - Require `async-signals` v0.6 so controller signal handling is implicit only when running on the main thread without an existing fiber scheduler, and loading `async-container` installs graceful default `SIGINT`/`SIGTERM` handling. + - Remove the obsolete `Async::Container::Restart` signal exception. + ## v0.37.0 - Rename `ASYNC_CONTAINER_GRACEFUL_TIMEOUT` to `ASYNC_CONTAINER_GRACEFUL_STOP` and apply it at the controller level as `GRACEFUL_STOP`. `Group#stop` now only applies the shutdown policy it is given. diff --git a/test/async/container.rb b/test/async/container.rb index a080f58..80383c2 100644 --- a/test/async/container.rb +++ b/test/async/container.rb @@ -4,8 +4,23 @@ # Copyright, 2017-2025, by Samuel Williams. require "async/container" +require "open3" +require "rbconfig" describe Async::Container do + def ruby(script) + ::Open3.capture3(::RbConfig.ruby, "-Ilib", "-e", script) + end + + def expect_success(script) + stdout, stderr, status = ruby(script) + + expect(status).to be(:success?) + expect(stderr).to be == "" + + return stdout + end + with ".processor_count" do it "can get processor count" do expect(Async::Container.processor_count).to be >= 1 @@ -35,6 +50,46 @@ end end + with "graceful signal defaults" do + it "installs graceful SIGINT handling" do + stdout = expect_success(<<~RUBY) + require "async/container" + + begin + ::Thread.handle_interrupt(::Interrupt => :never) do + ::Process.kill(:INT, ::Process.pid) + puts "inner" + end + + sleep 1 + rescue ::Interrupt + puts "outer" + end + RUBY + + expect(stdout).to be == "inner\nouter\n" + end + + it "installs graceful SIGTERM handling" do + stdout = expect_success(<<~RUBY) + require "async/container" + + begin + ::Thread.handle_interrupt(::Interrupt => :never) do + ::Process.kill(:TERM, ::Process.pid) + puts "inner" + end + + sleep 1 + rescue ::Interrupt + puts "outer" + end + RUBY + + expect(stdout).to be == "inner\nouter\n" + end + end + with ".best" do it "can get the best container class" do expect(Async::Container.best_container_class).not.to be_nil diff --git a/test/async/container/controller.rb b/test/async/container/controller.rb index 273b016..2576f68 100644 --- a/test/async/container/controller.rb +++ b/test/async/container/controller.rb @@ -155,6 +155,27 @@ def controller.setup(container) controller.stop end + it "does not restart an already running container" do + count = 0 + + controller.define_singleton_method(:setup) do |container| + count += 1 + + container.spawn do |instance| + instance.ready! + sleep + end + end + + first = controller.start + second = controller.start + + expect(second).to be == first + expect(count).to be == 1 + + controller.stop(false) + end + it "propagates exceptions" do def controller.setup(container) raise "Boom!" @@ -166,6 +187,35 @@ def controller.setup(container) end end + with "#run" do + it "can run the same controller more than once" do + input, output = IO.pipe + + controller.instance_variable_set(:@output, output) + + def controller.setup(container) + container.spawn do |instance| + instance.ready! + + sleep(0.01) + + @output.puts("done") + @output.flush + end + end + + 2.times do + controller.run(signals: Async::Signals::Ignore) + + expect(IO.select([input], nil, nil, 1)).not.to be_nil + expect(input.gets).to be == "done\n" + end + ensure + input&.close + output&.close + end + end + with "graceful controller" do include_context Async::Container::AController, "graceful" @@ -210,6 +260,137 @@ def controller.setup(container) with "signals" do include_context Async::Container::AController, "dots" + it "uses the provided signal backend" do + signals = Module.new do + def self.install(handlers) + @handlers = handlers + yield + end + + def self.handlers + @handlers + end + end + + def controller.setup(container) + end + + Sync do + controller.run(signals: signals) + end + + expect(signals.handlers).to be == controller.instance_variable_get(:@signals) + end + + it "uses the default ignored signal backend inside an async scheduler" do + original = Async::Signals::Ignore.method(:install) + installed = nil + + Async::Signals::Ignore.define_singleton_method(:install) do |handlers, &block| + installed = handlers + block.call + end + + def controller.setup(container) + end + + Sync do + controller.run + end + + expect(installed).to be == controller.instance_variable_get(:@signals) + ensure + Async::Signals::Ignore.define_singleton_method(:install, original) if original + end + + it "can ignore trapped signals" do + controller.trap(:USR2) + + handlers = controller.instance_variable_get(:@signals).to_h + + expect(handlers.fetch("USR2")).to be_nil + end + + it "queues trapped signal events" do + controller = Async::Container::Controller.new(notify: nil) + applied = false + + controller.trap(:USR1) do + applied = true + end + + Async::Signals.install(controller.instance_variable_get(:@signals)) do + Process.kill(:USR1, Process.pid) + + event = controller.instance_variable_get(:@events).pop(timeout: 1) + + expect(event.signal).to be == :USR1 + + event.call + end + + expect(applied).to be == true + end + + it "handles setup errors when restarting from a signal" do + def controller.restart + raise Async::Container::SetupError, nil + end + + Async::Signals.install(controller.instance_variable_get(:@signals)) do + Process.kill(:HUP, Process.pid) + + event = controller.instance_variable_get(:@events).pop(timeout: 1) + + expect{event.call}.not.to raise_exception + end + end + + it "handles setup errors when reloading from a signal" do + def controller.reload + raise Async::Container::SetupError, nil + end + + Async::Signals.install(controller.instance_variable_get(:@signals)) do + Process.kill(:USR1, Process.pid) + + event = controller.instance_variable_get(:@events).pop(timeout: 1) + + expect{event.call}.not.to raise_exception + end + end + + it "notifies when reload fails" do + notify = Object.new + + def notify.reloading! + end + + def notify.error!(message) + @message = message + end + + def notify.message + @message + end + + container = Object.new + + def container.wait_until_ready + end + + def container.failed? + true + end + + controller = Async::Container::Controller.new(notify: notify) + + controller.instance_variable_set(:@container, container) + + expect{controller.reload}.to raise_exception(Async::Container::SetupError) + expect(notify.message).to be == "Container failed to reload!" + end + it "restarts children when receiving SIGHUP" do expect(input.read(1)).to be == "." diff --git a/test/async/container/events.rb b/test/async/container/events.rb new file mode 100644 index 0000000..0f0f3c1 --- /dev/null +++ b/test/async/container/events.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "async/container/controller" + +describe Async::Container::Controller::SignalEvent do + it "calls the handler" do + applied = false + + event = subject.new(:USR1, proc{applied = true}) + + expect(event.signal).to be == :USR1 + + event.call + + expect(applied).to be == true + end +end diff --git a/test/async/container/policy.rb b/test/async/container/policy.rb index fd63799..5cdc229 100644 --- a/test/async/container/policy.rb +++ b/test/async/container/policy.rb @@ -254,14 +254,25 @@ def child_exit(container, child, status, name:, key:, **options) end.new container = Async::Container.best_container_class.new(policy: stop_policy) + trigger = IO.pipe 3.times do |i| container.spawn(name: "worker-#{i}") do |instance| instance.ready! - exit(1) + + if i.zero? + trigger.first.gets + exit(1) + else + sleep + end end end + container.wait_until_ready + + trigger.last.puts("exit") + container.wait expect(stop_policy.stop_count).to be == 1