Skip to content

Allow enqueuing scheduled jobs and recurring tasks right away #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ GEM
rack-session (>= 2.0.0, < 3)
tilt (~> 2.0)
smart_properties (1.17.0)
solid_queue (1.0.0)
solid_queue (1.0.1)
activejob (>= 7.1)
activerecord (>= 7.1)
concurrent-ruby (>= 1.3.1)
Expand Down Expand Up @@ -306,7 +306,7 @@ DEPENDENCIES
rubocop-performance
rubocop-rails-omakase
selenium-webdriver
solid_queue (~> 1.0)
solid_queue (~> 1.0.1)
sprockets-rails
sqlite3

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ This library extends Active Job with a querying interface and the following sett
## Adapter Specifics

- **Resque**: Queue pausing is supported only if you have `resque-pause` installed in your project
- **Solid Queue**: Requires version >= 0.9.
- **Solid Queue**: Requires version >= 1.0.1.

## Advanced configuration

Expand Down
2 changes: 1 addition & 1 deletion app/assets/stylesheets/mission_control/jobs/jobs.css
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ table.jobs {
width: 45%;
}
&.scheduled th.job-header {
width: 60%;
width: 40%;
}
&.finished th.job-header {
width: 65%;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def create

private
def jobs_relation
ActiveJob.jobs.failed
ActiveJob.jobs
end

def redirect_location
Expand Down
13 changes: 9 additions & 4 deletions app/controllers/mission_control/jobs/dispatches_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ class MissionControl::Jobs::DispatchesController < MissionControl::Jobs::Applica

def create
@job.dispatch
redirect_to application_jobs_url(@application, :blocked), notice: "Dispatched job with id #{@job.job_id}"
redirect_to redirect_location, notice: "Dispatched job with id #{@job.job_id}"
end

private
def jobs_relation
ApplicationJob.jobs.blocked
end
def jobs_relation
ActiveJob.jobs
end

def redirect_location
status = @job.status.presence_in(supported_job_statuses) || :blocked
application_jobs_url(@application, status)
end
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
class MissionControl::Jobs::RecurringTasksController < MissionControl::Jobs::ApplicationController
before_action :ensure_supported_recurring_tasks
before_action :set_recurring_task, only: :show
before_action :set_recurring_task, only: [ :show, :update ]

def index
@recurring_tasks = MissionControl::Jobs::Current.server.recurring_tasks
Expand All @@ -10,6 +10,14 @@ def show
@jobs_page = MissionControl::Jobs::Page.new(@recurring_task.jobs, page: params[:page].to_i)
end

def update
if (job = @recurring_task.enqueue) && job.successfully_enqueued?
redirect_to application_job_path(@application, job.job_id), notice: "Enqueued recurring task #{@recurring_task.id}"
else
redirect_to application_recurring_task_path(@application, @recurring_task), alert: "Something went wrong enqueuing this recurring task"
end
end

private
def ensure_supported_recurring_tasks
unless recurring_tasks_supported?
Expand Down
2 changes: 1 addition & 1 deletion app/helpers/mission_control/jobs/jobs_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def failed_job_backtrace(job, server)
def attribute_names_for_job_status(status)
case status.to_s
when "failed" then [ "Error", "" ]
when "blocked" then [ "Queue", "Blocked by", "Block expiry", "" ]
when "blocked" then [ "Queue", "Blocked by", "" ]
when "finished" then [ "Queue", "Finished" ]
when "scheduled" then [ "Queue", "Scheduled", "" ]
when "in_progress" then [ "Queue", "Run by", "Running since" ]
Expand Down
4 changes: 4 additions & 0 deletions app/models/mission_control/jobs/recurring_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ def jobs
ActiveJob::JobsRelation.new(queue_adapter: queue_adapter).where(recurring_task_id: id)
end

def enqueue
queue_adapter.enqueue_recurring_task(id)
end

private
attr_reader :queue_adapter
end
11 changes: 11 additions & 0 deletions app/views/mission_control/jobs/jobs/_general_information.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@
<%= formatted_time(job.enqueued_at.to_datetime) %>
</td>
</tr>
<% if job.scheduled? %>
<tr>
<th>Scheduled</th>
<td>
<%= formatted_time(job.scheduled_at) %>
<% if job_delayed?(job) %>
<div class="is-danger tag ml-4">delayed</div>
<% end %>
</td>
</tr>
<% end %>
<% if job.failed? %>
<tr>
<th>Failed</th>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
<div class="buttons is-right">
<%= button_to "Dispatch", application_job_dispatch_path(@application, job.job_id), class: "button is-warning is-light mr-0" %>
<%= button_to "Run now", application_job_dispatch_path(@application, job.job_id), class: "button is-warning is-light mr-0" %>
</div>
5 changes: 3 additions & 2 deletions app/views/mission_control/jobs/jobs/blocked/_job.html.erb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<td><%= link_to job.queue_name, application_queue_path(@application, job.queue) %></td>
<td><div class="is-family-monospace is-size-7"><%= job.blocked_by %></div></td>
<td><%= formatted_time(job.blocked_until) %></td>
<td><div class="is-family-monospace is-size-7"><%= job.blocked_by %></div>
<div class="has-text-grey is-size-7">Until <%= formatted_time(job.blocked_until) %></div>
</td>
<td class="pr-0">
<%= render "mission_control/jobs/jobs/blocked/actions", job: job %>
</td>
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<div class="buttons is-right">
<%= button_to "Run now", application_job_dispatch_path(@application, job.job_id), class: "button is-warning is-light mr-0" %>
<%= button_to "Discard", application_job_discard_path(@application, job.job_id), class: "button is-danger is-light mr-0",
form: { data: { turbo_confirm: "This will delete the job and can't be undone. Are you sure?" } } %>
</div>
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<div class="buttons is-right">
<%= button_to "Run now", application_recurring_task_path(@application, recurring_task.id), class: "button is-warning is-light mr-0", method: :put %>
</div>
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@
<td> <%= recurring_task.schedule %> </td>
<td><div class="has-text-grey"><%= recurring_task.last_enqueued_at ? formatted_time(recurring_task.last_enqueued_at) : "Never" %></div></td>
<td class="next_time"><div class="has-text-grey"><%= formatted_time(recurring_task.next_time) %></div></td>
<td class="pr-0">
<%= render "mission_control/jobs/recurring_tasks/actions", recurring_task: recurring_task %>
</td>
</tr>
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
<div class="level-left">
<%= recurring_task.id %>
</div>
<div class="level-right">
<%= render "mission_control/jobs/recurring_tasks/actions", recurring_task: recurring_task %>
</div>
</div>
</h1>
5 changes: 3 additions & 2 deletions app/views/mission_control/jobs/recurring_tasks/index.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
<th></th>
<th>Job</th>
<th>Schedule</th>
<th>Last enqueued at</th>
<th>Next run</th>
<th>Last enqueued</th>
<th>Next</th>
<th></th>
</tr>
</thead>

Expand Down
2 changes: 1 addition & 1 deletion config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
resources :jobs, only: :index, path: ":status/jobs"

resources :workers, only: [ :index, :show ]
resources :recurring_tasks, only: [ :index, :show ]
resources :recurring_tasks, only: [ :index, :show, :update ]
end

# Allow referencing urls without providing an application_id. It will default to the first one.
Expand Down
2 changes: 1 addition & 1 deletion lib/active_job/executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def discard
end

def dispatch
ActiveJob.jobs.blocked.dispatch_job(self)
ActiveJob.jobs.dispatch_job(self)
end

private
Expand Down
5 changes: 5 additions & 0 deletions lib/active_job/jobs_relation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ def discard_job(job)
end

# Dispatch the provided job.
#
# This operation is only valid for blocked or scheduled jobs. It will
# raise an error +ActiveJob::Errors::InvalidOperation+ otherwise.
def dispatch_job(job)
raise ActiveJob::Errors::InvalidOperation, "This operation can only be performed on blocked or scheduled jobs, but this job is #{job.status}" unless job.blocked? || job.scheduled?

queue_adapter.dispatch_job(job, self)
end

Expand Down
10 changes: 7 additions & 3 deletions lib/active_job/queue_adapters/solid_queue_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,13 @@ def execution_error_from_solid_queue_job(solid_queue_job)
end

def dispatch_immediately(job)
SolidQueue::Job.transaction do
job.dispatch_bypassing_concurrency_limits
job.blocked_execution.destroy!
if job.blocked?
SolidQueue::Job.transaction do
job.dispatch_bypassing_concurrency_limits
job.blocked_execution.destroy!
end
else
job.scheduled_execution.update!(scheduled_at: Time.now)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ def find_recurring_task(task_id)
end
end

def enqueue_recurring_task(task_id)
if task = SolidQueue::RecurringTask.find_by(key: task_id)
task.enqueue(at: Time.now)
end
end

private
def recurring_task_attributes_from_solid_queue_recurring_task(task)
{
Expand Down
2 changes: 1 addition & 1 deletion mission_control-jobs.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "irb", "~> 1.13"

spec.add_development_dependency "resque"
spec.add_development_dependency "solid_queue", "~> 1.0"
spec.add_development_dependency "solid_queue", "~> 1.0.1"
spec.add_development_dependency "selenium-webdriver"
spec.add_development_dependency "resque-pause"
spec.add_development_dependency "mocha"
Expand Down
13 changes: 13 additions & 0 deletions test/controllers/recurring_tasks_controller_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,17 @@ class MissionControl::Jobs::RecurringTasksControllerTest < ActionDispatch::Integ
assert_select "article.is-danger", /Recurring task with id 'invalid_key' not found/
end
end

test "enqueue recurring task successfully" do
schedule_recurring_tasks_async(wait: 0.1.seconds)

assert_difference -> { ActiveJob.jobs.pending.count } do
put mission_control_jobs.application_recurring_task_url(@application, "periodic_pause_job")
assert_response :redirect
end

job = ActiveJob.jobs.pending.last
assert_equal "PauseJob", job.job_class_name
assert_match /jobs\/#{job.job_id}\?server_id=solid_queue\z/, response.location
end
end
13 changes: 12 additions & 1 deletion test/dummy/db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def clean_database
end

class JobsLoader
attr_reader :application, :server, :failed_jobs_count, :pending_jobs_count, :finished_jobs_count, :blocked_jobs_count
attr_reader :application, :server, :failed_jobs_count, :pending_jobs_count, :finished_jobs_count, :blocked_jobs_count, :scheduled_jobs_count

def initialize(application, server, failed_jobs_count: 100, pending_jobs_count: 50)
@application = application
Expand All @@ -19,6 +19,7 @@ def initialize(application, server, failed_jobs_count: 100, pending_jobs_count:
@pending_jobs_count = randomize(pending_jobs_count)
@finished_jobs_count = randomize(pending_jobs_count)
@blocked_jobs_count = randomize(pending_jobs_count)
@scheduled_jobs_count = randomize(pending_jobs_count)
end

def load
Expand All @@ -27,6 +28,7 @@ def load
load_failed_jobs
load_pending_jobs
load_blocked_jobs
load_scheduled_jobs
load_recurring_tasks
end
end
Expand Down Expand Up @@ -64,6 +66,15 @@ def load_blocked_jobs
end
end

def load_scheduled_jobs
return unless supported_status?(:scheduled)

puts "Generating #{scheduled_jobs_count} scheduled jobs for #{application} - #{server}..."
scheduled_jobs_count.times do |index|
DummyJob.set(wait: randomize(60).minutes).perform_later(index)
end
end

def load_recurring_tasks
return unless server.queue_adapter.supports_recurring_tasks?

Expand Down