Skip to content

Dynamic scheduled tasks #553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,17 @@ It is recommended to set this value less than or equal to the queue database's c
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.


### Scheduler polling interval

The scheduler process checks for due recurring tasks and reloads dynamic tasks at a configurable interval. You can set this interval using the `polling_interval` key under the `scheduler` section in your `config/queue.yml`:

```yaml
scheduler:
polling_interval: 5 # seconds
```

This controls how frequently the scheduler wakes up to enqueue due recurring jobs and reload dynamic tasks.

### Queue order and priorities

As mentioned above, if you specify a list of queues for a worker, these will be polled in the order given, such as for the list `real_time,background`, no jobs will be taken from `background` unless there aren't any more jobs waiting in `real_time`.
Expand Down Expand Up @@ -657,6 +668,47 @@ my_periodic_resque_job:

and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.


### Creating and Deleting Recurring Tasks Dynamically

You can create and delete recurring tasks at runtime, without editing the configuration file. Use the following methods:

#### Creating a recurring task

```ruby
SolidQueue.schedule_recurring_task(
"my_dynamic_task",
command: "puts 'Hello from a dynamic task!'",
schedule: "every 10 minutes"
)
```

This will create a dynamic recurring task with the given key, command, and schedule. You can also use the `class` and `args` options as in the configuration file.

#### Deleting a recurring task

```ruby
SolidQueue.delete_recurring_task(task_id)
```

This will delete a dynamically scheduled recurring task by its ID. If you attempt to delete a static (configuration-defined) recurring task, an error will be raised.

> **Note:** Static recurring tasks (those defined in `config/recurring.yml`) cannot be deleted at runtime. Attempting to do so will raise an error.

#### Example: Creating and deleting a recurring task

```ruby
# Create a new dynamic recurring task
recurring_task = SolidQueue.schedule_recurring_task(
"cleanup_temp_files",
command: "TempFileCleaner.clean!",
schedule: "every day at 2am"
)

# Delete the task later by ID
SolidQueue.delete_recurring_task(recurring_task.id)
```

## Inspiration

Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot.
Expand Down
1 change: 1 addition & 0 deletions app/models/solid_queue/recurring_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class RecurringTask < Record
validate :existing_job_class

scope :static, -> { where(static: true) }
scope :dynamic, -> { where(static: false) }

has_many :recurring_executions, foreign_key: :task_key, primary_key: :key

Expand Down
2 changes: 2 additions & 0 deletions lib/generators/solid_queue/install/templates/config/queue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ default: &default
threads: 3
processes: <%%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 0.1
scheduler:
polling_interval: 1

development:
<<: *default
Expand Down
9 changes: 9 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ module SolidQueue

delegate :on_start, :on_stop, :on_exit, to: Supervisor


def create_recurring_task(key, **attributes)
RecurringTask.create!(**attributes, key:, static: false)
end

def destroy_recurring_task(id)
RecurringTask.dynamic.find(id).destroy!
end

[ Dispatcher, Scheduler, Worker ].each do |process|
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
process.on_start(&block)
Expand Down
26 changes: 18 additions & 8 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def instantiate
concurrency_maintenance_interval: 600
}

SCHEDULER_DEFAULTS = {
polling_interval: 1
}

DEFAULT_CONFIG_FILE_PATH = "config/queue.yml"
DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml"

Expand Down Expand Up @@ -122,11 +126,9 @@ def dispatchers
end

def schedulers
if !skip_recurring_tasks? && recurring_tasks.any?
[ Process.new(:scheduler, recurring_tasks: recurring_tasks) ]
else
[]
end
return [] if skip_recurring_tasks?

[ Process.new(:scheduler, { recurring_tasks:, **scheduler_options.with_defaults(SCHEDULER_DEFAULTS) }) ]
end

def workers_options
Expand All @@ -139,6 +141,10 @@ def dispatchers_options
.map { |options| options.dup.symbolize_keys }
end

def scheduler_options
@scheduler_options ||= processes_config.fetch(:scheduler, {}).dup.symbolize_keys
end

def recurring_tasks
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule)
Expand All @@ -147,9 +153,13 @@ def recurring_tasks

def processes_config
@processes_config ||= config_from \
options.slice(:workers, :dispatchers).presence || options[:config_file],
keys: [ :workers, :dispatchers ],
fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] }
options.slice(:workers, :dispatchers, :scheduler).presence || options[:config_file],
keys: [ :workers, :dispatchers, :scheduler ],
fallback: {
workers: [ WORKER_DEFAULTS ],
dispatchers: [ DISPATCHER_DEFAULTS ],
scheduler: SCHEDULER_DEFAULTS
}
end

def recurring_tasks_config
Expand Down
4 changes: 4 additions & 0 deletions lib/solid_queue/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,9 @@ def heartbeat
self.process = nil
wake_up
end

def refresh_registered_process
process.update_columns(metadata: metadata.compact)
end
end
end
11 changes: 7 additions & 4 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class Scheduler < Processes::Base
include Processes::Runnable
include LifecycleHooks

attr_reader :recurring_schedule
attr_reader :recurring_schedule, :polling_interval

after_boot :run_start_hooks
after_boot :schedule_recurring_tasks
Expand All @@ -15,6 +15,8 @@ class Scheduler < Processes::Base

def initialize(recurring_tasks:, **options)
@recurring_schedule = RecurringSchedule.new(recurring_tasks)
options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS)
@polling_interval = options[:polling_interval]

super(**options)
end
Expand All @@ -24,13 +26,14 @@ def metadata
end

private
SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks

def run
loop do
break if shutting_down?

interruptible_sleep(SLEEP_INTERVAL)
recurring_schedule.reload!
refresh_registered_process if recurring_schedule.changed?

interruptible_sleep(polling_interval)
end
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
Expand Down
58 changes: 48 additions & 10 deletions lib/solid_queue/scheduler/recurring_schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ module SolidQueue
class Scheduler::RecurringSchedule
include AppExecutor

attr_reader :configured_tasks, :scheduled_tasks
attr_reader :static_tasks, :configured_tasks, :scheduled_tasks, :changes

def initialize(tasks)
@configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
@static_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
@configured_tasks = @static_tasks + dynamic_tasks
@scheduled_tasks = Concurrent::Hash.new
@changes = Concurrent::Hash.new
end

def empty?
Expand All @@ -17,15 +19,32 @@ def empty?

def schedule_tasks
wrap_in_app_executor do
persist_tasks
reload_tasks
persist_static_tasks
reload_static_tasks
end

configured_tasks.each do |task|
schedule_task(task)
end
end

def dynamic_tasks
SolidQueue::RecurringTask.dynamic
end

def schedule_new_dynamic_tasks
dynamic_tasks.where.not(key: scheduled_tasks.keys).each do |task|
schedule_task(task)
end
end

def unschedule_old_dynamic_tasks
(scheduled_tasks.keys - SolidQueue::RecurringTask.pluck(:key)).each do |key|
scheduled_tasks[key].cancel
scheduled_tasks.delete(key)
end
end

def schedule_task(task)
scheduled_tasks[task.key] = schedule(task)
end
Expand All @@ -35,18 +54,37 @@ def unschedule_tasks
scheduled_tasks.clear
end

def static_task_keys
static_tasks.map(&:key)
end

def task_keys
configured_tasks.map(&:key)
static_task_keys + dynamic_tasks.map(&:key)
end

def reload!
{ added_tasks: schedule_new_dynamic_tasks,
removed_tasks: unschedule_old_dynamic_tasks }.each do |key, values|
if values.any?
changes[key] = values
else
changes.delete(key)
end
end
end

def changed?
@changes.any?
end

private
def persist_tasks
SolidQueue::RecurringTask.static.where.not(key: task_keys).delete_all
SolidQueue::RecurringTask.create_or_update_all configured_tasks
def persist_static_tasks
SolidQueue::RecurringTask.static.where.not(key: static_task_keys).delete_all
SolidQueue::RecurringTask.create_or_update_all static_tasks
end

def reload_tasks
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys)
def reload_static_tasks
@static_tasks = SolidQueue::RecurringTask.static.where(key: static_task_keys)
end

def schedule(task)
Expand Down
27 changes: 27 additions & 0 deletions test/solid_queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,31 @@ class SolidQueueTest < ActiveSupport::TestCase
test "it has a version number" do
assert SolidQueue::VERSION
end

test "creates recurring tasks" do
SolidQueue.create_recurring_task("test 1", command: "puts 1", schedule: "every hour")
SolidQueue.create_recurring_task("test 2", command: "puts 2", schedule: "every minute", static: true)

assert SolidQueue::RecurringTask.exists?(key: "test 1", command: "puts 1", schedule: "every hour", static: false)
assert SolidQueue::RecurringTask.exists?(key: "test 2", command: "puts 2", schedule: "every minute", static: false)
end

test "destroys recurring tasks" do
dynamic_task = SolidQueue::RecurringTask.create!(
key: "dynamic", command: "puts 'd'", schedule: "every day", static: false
)

static_task = SolidQueue::RecurringTask.create!(
key: "static", command: "puts 's'", schedule: "every week", static: true
)

SolidQueue.destroy_recurring_task(dynamic_task.id)

assert_raises(ActiveRecord::RecordNotFound) do
SolidQueue.destroy_recurring_task(static_task.id)
end

assert_not SolidQueue::RecurringTask.exists?(key: "dynamic", static: false)
assert SolidQueue::RecurringTask.exists?(key: "static", static: true)
end
end
6 changes: 3 additions & 3 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ConfigurationTest < ActiveSupport::TestCase
test "default configuration when config given is empty" do
configuration = SolidQueue::Configuration.new(config_file: config_file_path(:empty_configuration), recurring_schedule_file: config_file_path(:empty_configuration))

assert_equal 2, configuration.configured_processes.count
assert_equal 3, configuration.configured_processes.count # includes scheduler for dynamic tasks
assert_processes configuration, :worker, 1, queues: "*"
assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size]
end
Expand Down Expand Up @@ -101,11 +101,11 @@ class ConfigurationTest < ActiveSupport::TestCase

configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_production_only))
assert configuration.valid?
assert_processes configuration, :scheduler, 0
assert_processes configuration, :scheduler, 1 # Starts in case of dynamic tasks

configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_empty))
assert configuration.valid?
assert_processes configuration, :scheduler, 0
assert_processes configuration, :scheduler, 1 # Starts in case of dynamic tasks

# No processes
configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [])
Expand Down
Loading
Loading