|
2 | 2 |
|
3 | 3 | module SolidQueue::Processes
|
4 | 4 | module Interruptible
|
5 |
| - include SolidQueue::AppExecutor |
6 |
| - |
7 | 5 | def wake_up
|
8 | 6 | interrupt
|
9 | 7 | end
|
10 | 8 |
|
11 | 9 | private
|
| 10 | + SELF_PIPE_BLOCK_SIZE = 11 |
12 | 11 |
|
13 | 12 | def interrupt
|
14 |
| - queue << true |
| 13 | + self_pipe[:writer].write_nonblock(".") |
| 14 | + rescue Errno::EAGAIN, Errno::EINTR |
| 15 | + # Ignore writes that would block and retry |
| 16 | + # if another signal arrived while writing |
| 17 | + retry |
15 | 18 | end
|
16 | 19 |
|
17 |
| - # Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up. |
18 |
| - # @param time [Numeric, Duration] the time to sleep. 0 returns immediately. |
19 | 20 | def interruptible_sleep(time)
|
20 |
| - # Invoking this from the main thread may result in significant slowdown. |
21 |
| - # Utilizing asynchronous execution (Futures) addresses this performance issue. |
22 |
| - Concurrent::Promises.future(time) do |timeout| |
23 |
| - queue.clear unless queue.pop(timeout:).nil? |
24 |
| - end.on_rejection! do |e| |
25 |
| - wrapped_exception = RuntimeError.new("Interruptible#interruptible_sleep - #{e.class}: #{e.message}") |
26 |
| - wrapped_exception.set_backtrace(e.backtrace) |
27 |
| - handle_thread_error(wrapped_exception) |
28 |
| - end.value |
| 21 | + if time > 0 && self_pipe[:reader].wait_readable(time) |
| 22 | + loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) } |
| 23 | + end |
| 24 | + rescue Errno::EAGAIN, Errno::EINTR |
| 25 | + end |
29 | 26 |
|
30 |
| - nil |
| 27 | + # Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html) |
| 28 | + def self_pipe |
| 29 | + @self_pipe ||= create_self_pipe |
31 | 30 | end
|
32 | 31 |
|
33 |
| - def queue |
34 |
| - @queue ||= Queue.new |
| 32 | + def create_self_pipe |
| 33 | + reader, writer = IO.pipe |
| 34 | + { reader: reader, writer: writer } |
35 | 35 | end
|
36 | 36 | end
|
37 | 37 | end
|
0 commit comments