|
2 | 2 |
|
3 | 3 | module Que |
4 | 4 | module ActiveRecord |
| 5 | + class << self |
| 6 | + def active_rails_executor? |
| 7 | + defined?(::Rails.application.executor) && ::Rails.application.executor.active? |
| 8 | + end |
| 9 | + |
| 10 | + def wrap_in_rails_executor(&block) |
| 11 | + if defined?(::Rails.application.executor) |
| 12 | + ::Rails.application.executor.wrap(&block) |
| 13 | + else |
| 14 | + yield |
| 15 | + end |
| 16 | + end |
| 17 | + end |
| 18 | + |
5 | 19 | module Connection |
6 | 20 | class << self |
7 | 21 | private |
8 | 22 |
|
9 | 23 | # Check out a PG::Connection object from ActiveRecord's pool. |
10 | 24 | def checkout |
11 | | - wrap_in_rails_executor do |
| 25 | + # Use Rails' executor (if present) to make sure that the connection |
| 26 | + # we're using isn't taken from us while the block runs. See |
| 27 | + # https://github.com/que-rb/que/issues/166#issuecomment-274218910 |
| 28 | + Que::ActiveRecord.wrap_in_rails_executor do |
12 | 29 | ::ActiveRecord::Base.connection_pool.with_connection do |conn| |
13 | 30 | yield conn.raw_connection |
14 | 31 | end |
15 | 32 | end |
16 | 33 | end |
17 | | - |
18 | | - # Use Rails' executor (if present) to make sure that the connection |
19 | | - # we're using isn't taken from us while the block runs. See |
20 | | - # https://github.com/que-rb/que/issues/166#issuecomment-274218910 |
21 | | - def wrap_in_rails_executor(&block) |
22 | | - if defined?(::Rails.application.executor) |
23 | | - ::Rails.application.executor.wrap(&block) |
24 | | - else |
25 | | - yield |
26 | | - end |
27 | | - end |
28 | 34 | end |
29 | 35 |
|
30 | 36 | module JobMiddleware |
31 | 37 | class << self |
32 | 38 | def call(job) |
33 | | - yield |
34 | | - |
35 | | - # ActiveRecord will check out connections to the current thread when |
36 | | - # queries are executed and not return them to the pool until |
37 | | - # explicitly requested to. I'm not wild about this API design, and |
38 | | - # it doesn't pose a problem for the typical case of workers using a |
39 | | - # single PG connection (since we ensure that connection is checked |
40 | | - # in and checked out responsibly), but since ActiveRecord supports |
41 | | - # connections to multiple databases, it's easy for people using that |
42 | | - # feature to unknowingly leak connections to other databases. So, |
43 | | - # take the additional step of telling ActiveRecord to check in all |
44 | | - # of the current thread's connections after each job is run. |
| 39 | + # Use Rails' executor (if present) to make sure that the connection |
| 40 | + # used by the job isn't returned to the pool prematurely. See |
| 41 | + # https://github.com/que-rb/que/issues/411 |
| 42 | + Que::ActiveRecord.wrap_in_rails_executor do |
| 43 | + yield |
| 44 | + end |
| 45 | + |
| 46 | + clear_active_connections_if_needed!(job) |
| 47 | + end |
| 48 | + |
| 49 | + private |
| 50 | + |
| 51 | + # ActiveRecord will check out connections to the current thread when |
| 52 | + # queries are executed and not return them to the pool until |
| 53 | + # explicitly requested to. I'm not wild about this API design, and |
| 54 | + # it doesn't pose a problem for the typical case of workers using a |
| 55 | + # single PG connection (since we ensure that connection is checked |
| 56 | + # in and checked out responsibly), but since ActiveRecord supports |
| 57 | + # connections to multiple databases, it's easy for people using that |
| 58 | + # feature to unknowingly leak connections to other databases. So, |
| 59 | + # take the additional step of telling ActiveRecord to check in all |
| 60 | + # of the current thread's connections after each job is run. |
| 61 | + def clear_active_connections_if_needed!(job) |
| 62 | + # don't clean in synchronous mode |
| 63 | + # see https://github.com/que-rb/que/pull/393 |
45 | 64 | return if job.class.resolve_que_setting(:run_synchronously) |
| 65 | + |
| 66 | + # don't clear connections in nested jobs executed synchronously |
| 67 | + # i.e. while we're still inside of the rails executor |
| 68 | + # see https://github.com/que-rb/que/pull/412#issuecomment-2194412783 |
| 69 | + return if Que::ActiveRecord.active_rails_executor? |
| 70 | + |
46 | 71 | if ::ActiveRecord.version >= Gem::Version.new('7.1') |
47 | 72 | ::ActiveRecord::Base.connection_handler.clear_active_connections!(:all) |
48 | 73 | else |
|
0 commit comments