Skip to content

Commit

Permalink
Merge pull request #155 from basecamp/cron-jobs-take-2
Browse files Browse the repository at this point in the history
Add support for recurring tasks (cron style jobs)
  • Loading branch information
rosa authored Mar 20, 2024
2 parents 53f0e48 + 4ea0e2a commit 7fca542
Show file tree
Hide file tree
Showing 35 changed files with 684 additions and 167 deletions.
4 changes: 0 additions & 4 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,3 @@ git_source(:github) { |repo| "https://github.com/#{repo}.git" }

# Specify your gem's dependencies in solid_queue.gemspec.
gemspec

gem "mysql2"
gem "pg"
gem "sqlite3"
8 changes: 8 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ PATH
solid_queue (0.2.2)
activejob (>= 7.1)
activerecord (>= 7.1)
concurrent-ruby (~> 1.2.2)
fugit (~> 1.9.0)
railties (>= 7.1)

GEM
Expand Down Expand Up @@ -55,6 +57,11 @@ GEM
drb (2.1.1)
ruby2_keywords
erubi (1.12.0)
et-orbi (1.2.7)
tzinfo
fugit (1.9.0)
et-orbi (~> 1, >= 1.2.7)
raabro (~> 1.4)
globalid (1.2.1)
activesupport (>= 6.1)
i18n (1.14.1)
Expand All @@ -81,6 +88,7 @@ GEM
pg (1.5.4)
puma (6.4.2)
nio4r (~> 2.0)
raabro (1.4.0)
racc (1.7.3)
rack (3.0.8)
rack-session (2.0.0)
Expand Down
51 changes: 49 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Solid Queue is a DB-based queuing backend for [Active Job](https://edgeguides.rubyonrails.org/active_job_basics.html), designed with simplicity and performance in mind.

Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). _Improvements to logging and instrumentation, a better CLI tool, a way to run within an existing process in "async" mode, unique jobs and recurring, cron-like tasks are coming very soon._
Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). _Improvements to logging and instrumentation, a better CLI tool, a way to run within an existing process in "async" mode, and some way of specifying unique jobs are coming very soon._

Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails multi-threading.

Expand Down Expand Up @@ -77,7 +77,7 @@ Besides Rails 7.1, Solid Queue works best with MySQL 8+ or PostgreSQL 9.5+, as t

We have three types of processes in Solid Queue:
- _Workers_ are in charge of picking jobs ready to run from queues and processing them. They work off the `solid_queue_ready_executions` table.
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They also do some maintenance work related to concurrency controls.
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They're also in charge of managing [recurring tasks](#recurring-tasks), dispatching jobs to process them according to their schedule. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls).
- The _supervisor_ forks workers and dispatchers according to the configuration, controls their heartbeats, and sends them signals to stop and start them when needed.

By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG`. This is what this configuration looks like:
Expand Down Expand Up @@ -119,6 +119,8 @@ Everything is optional. If no configuration is provided, Solid Queue will run wi
Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names.
- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
- `recurring_tasks`: a list of recurring tasks the dispatcher will manage. Read more details about this one in the [Recurring tasks](#recurring-tasks) section.


### Queue order and priorities
Expand Down Expand Up @@ -265,3 +267,48 @@ Solid Queue has been inspired by [resque](https://github.com/resque/resque) and

## License
The gem is available as open source under the terms of the [MIT License](https://opensource.org/licenses/MIT).

## Recurring tasks
Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by dispatcher processes and as such, they can be defined in the dispatcher's configuration like this:
```yml
dispatchers:
- polling_interval: 1
batch_size: 500
recurring_tasks:
my_periodic_job:
class: MyJob
args: [ 42, { status: "custom_status" } ]
schedule: every second
```
`recurring_tasks` is a hash/dictionary, and the key will be the task key internally. Each task needs to have a class, which will be the job class to enqueue, and a schedule. The schedule is parsed using [Fugit](https://github.com/floraison/fugit), so it accepts anything [that Fugit accepts as a cron](https://github.com/floraison/fugit?tab=readme-ov-file#fugitcron). You can also provide arguments to be passed to the job, as a single argument, a hash, or an array of arguments that can also include kwargs as the last element in the array.

The job in the example configuration above will be enqueued every second as:
```ruby
MyJob.perform_later(42, status: "custom_status")
```

Tasks are enqueued at their corresponding times by the dispatcher that owns them, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb).

It's possible to run multiple dispatchers with the same `recurring_tasks` configuration. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around.

Finally, it's possible to configure jobs that aren't handled by Solid Queue. That's it, you can a have a job like this in your app:
```ruby
class MyResqueJob < ApplicationJob
self.queue_adapter = :resque
def perform(arg)
# ..
end
end
```

You can still configure this in Solid Queue:
```yml
dispatchers:
- recurring_tasks:
my_periodic_resque_job:
class: MyResqueJob
args: 22
schedule: "*/5 * * * *"
```
and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.
2 changes: 1 addition & 1 deletion app/models/solid_queue/blocked_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def release
promote_to_ready
destroy!

SolidQueue.logger.info("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
SolidQueue.logger.debug("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
end
end
end
Expand Down
1 change: 0 additions & 1 deletion app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def claiming(job_ids, process_id, &block)
insert_all!(job_data)
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
block.call(claimed)
SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/solid_queue/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module SolidQueue
class Job < Record
include Executable
include Executable, Clearable, Recurrable

serialize :arguments, coder: JSON

Expand Down
8 changes: 2 additions & 6 deletions app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Executable
extend ActiveSupport::Concern

included do
include Clearable, ConcurrencyControls, Schedulable
include ConcurrencyControls, Schedulable

has_one :ready_execution
has_one :claimed_execution
Expand Down Expand Up @@ -78,7 +78,7 @@ def dispatch_bypassing_concurrency_limits
end

def finished!
if preserve_finished_jobs?
if SolidQueue.preserve_finished_jobs?
touch(:finished_at)
else
destroy!
Expand Down Expand Up @@ -117,10 +117,6 @@ def ready
def execution
%w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") }
end

def preserve_finished_jobs?
SolidQueue.preserve_finished_jobs
end
end
end
end
13 changes: 13 additions & 0 deletions app/models/solid_queue/job/recurrable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

module SolidQueue
class Job
module Recurrable
extend ActiveSupport::Concern

included do
has_one :recurring_execution, dependent: :destroy
end
end
end
end
26 changes: 26 additions & 0 deletions app/models/solid_queue/recurring_execution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

module SolidQueue
class RecurringExecution < Execution
scope :clearable, -> { where.missing(:job) }

class << self
def record(task_key, run_at, &block)
transaction do
if job_id = block.call
create!(job_id: job_id, task_key: task_key, run_at: run_at)
end
end
rescue ActiveRecord::RecordNotUnique
SolidQueue.logger.info("[SolidQueue] Skipped recurring task #{task_key} at #{run_at} — already dispatched")
end

def clear_in_batches(batch_size: 500)
loop do
records_deleted = clearable.limit(batch_size).delete_all
break if records_deleted == 0
end
end
end
end
end
14 changes: 14 additions & 0 deletions db/migrate/20240218110712_create_recurring_executions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class CreateRecurringExecutions < ActiveRecord::Migration[7.1]
def change
create_table :solid_queue_recurring_executions do |t|
t.references :job, index: { unique: true }, null: false
t.string :task_key, null: false
t.datetime :run_at, null: false
t.datetime :created_at, null: false

t.index [ :task_key, :run_at ], unique: true
end

add_foreign_key :solid_queue_recurring_executions, :solid_queue_jobs, column: :job_id, on_delete: :cascade
end
end
44 changes: 21 additions & 23 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,16 @@
require "solid_queue/version"
require "solid_queue/engine"

require "active_job/queue_adapters/solid_queue_adapter"
require "active_job/concurrency_controls"

require "solid_queue/app_executor"
require "solid_queue/processes/supervised"
require "solid_queue/processes/registrable"
require "solid_queue/processes/interruptible"
require "solid_queue/processes/pidfile"
require "solid_queue/processes/procline"
require "solid_queue/processes/poller"
require "solid_queue/processes/base"
require "solid_queue/processes/runnable"
require "solid_queue/processes/signals"
require "solid_queue/configuration"
require "solid_queue/pool"
require "solid_queue/worker"
require "solid_queue/dispatcher"
require "solid_queue/supervisor"
require "active_job"
require "active_job/queue_adapters"

require "zeitwerk"

loader = Zeitwerk::Loader.for_gem(warn_on_extra_files: false)
loader.ignore("#{__dir__}/solid_queue/tasks.rb")
loader.ignore("#{__dir__}/generators")
loader.ignore("#{__dir__}/puma")
loader.setup

module SolidQueue
mattr_accessor :logger, default: ActiveSupport::Logger.new($stdout)
Expand All @@ -42,11 +34,17 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

def self.supervisor?
supervisor
end
class << self
def supervisor?
supervisor
end

def silence_polling?
silence_polling
end

def self.silence_polling?
silence_polling
def preserve_finished_jobs?
preserve_finished_jobs
end
end
end
17 changes: 13 additions & 4 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ class Configuration
DISPATCHER_DEFAULTS = {
batch_size: 500,
polling_interval: 1,
concurrency_maintenance_interval: 600
concurrency_maintenance: true,
concurrency_maintenance_interval: 600,
recurring_tasks: []
}

def initialize(mode: :work, load_from: nil)
Expand All @@ -33,7 +35,7 @@ def workers
if mode.in? %i[ work all]
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes.times.collect { SolidQueue::Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
end
else
[]
Expand All @@ -42,8 +44,10 @@ def workers

def dispatchers
if mode.in? %i[ dispatch all]
dispatchers_options.flat_map do |dispatcher_options|
SolidQueue::Dispatcher.new(**dispatcher_options)
dispatchers_options.map do |dispatcher_options|
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]

Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
end
end
end
Expand Down Expand Up @@ -73,6 +77,11 @@ def dispatchers_options
.map { |options| options.dup.symbolize_keys }
end

def parse_recurring_tasks(tasks)
Array(tasks).map do |id, options|
Dispatcher::RecurringTask.from_configuration(id, **options)
end.select(&:valid?)
end

def load_config_from(file_or_hash)
case file_or_hash
Expand Down
Loading

0 comments on commit 7fca542

Please sign in to comment.