Skip to content

Commit 691dd68

Browse files
committed
Add bulk enqueue support to ActiveJob adapter
When using ActiveJob, you can now enqueue jobs in bulk using either `Que.bulk_enqueue`: ```ruby Que.bulk_enqueue do TestJobClass.perform_later(1, 2) OtherTestJobClass.set(wait: 1.minute, queue: 'custom', priority: 200).perform_later(3, 4) end ``` Or when running Rails >= 7.1 using `ActiveJob.perform_all_later`: ```ruby ActiveJob.perform_all_later([ TestJobClass.new(1, 2), OtherTestJobClass.new(3, 4).set(wait: 1.minute, queue: 'custom', priority: 200), ]) ```
1 parent 4835936 commit 691dd68

File tree

4 files changed

+105
-26
lines changed

4 files changed

+105
-26
lines changed

docs/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,6 @@ The jobs are only actually enqueued at the end of the block, at which point they
853853

854854
Limitations:
855855

856-
- ActiveJob is not supported
857856
- The `que_attrs` of a job instance returned from `.enqueue` is empty (`{}`)
858857
- The notify trigger is not run by default, so jobs will only be picked up by a worker upon its next poll
859858

lib/que/active_job/extensions.rb

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,19 +118,32 @@ module QueueAdapters
118118

119119
class QueAdapter
120120
def enqueue(job)
121-
job_options = { priority: job.priority, queue: job.queue_name }
122-
que_job = JobWrapper.enqueue job.serialize, **job_options
123-
job.provider_job_id = que_job.attrs["job_id"]
124-
que_job
121+
enqueue_at(job, nil)
125122
end
126123

127124
def enqueue_at(job, timestamp)
128-
job_options = { priority: job.priority, queue: job.queue_name, run_at: Time.at(timestamp) }
125+
job_options = {
126+
priority: job.priority,
127+
queue: job.queue_name,
128+
run_at: timestamp && Time.at(timestamp)
129+
}
129130
que_job = JobWrapper.enqueue job.serialize, **job_options
130-
job.provider_job_id = que_job.attrs["job_id"]
131+
job.provider_job_id = que_job.que_attrs[:id]
131132
que_job
132133
end
133134

135+
def enqueue_all(jobs)
136+
que_jobs = Que.bulk_enqueue do
137+
jobs.each do |job|
138+
enqueue_at(job, job.scheduled_at)
139+
end
140+
end
141+
que_jobs.zip(jobs).each do |que_job, job|
142+
job.provider_job_id = que_job.que_attrs[:id]
143+
end
144+
que_jobs
145+
end
146+
134147
private
135148

136149
class JobWrapper < Que::Job

lib/que/job.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,6 @@ def enqueue(*args)
109109
klass: self,
110110
}
111111

112-
if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
113-
raise Que::Error, "Que.bulk_enqueue does not support ActiveJob."
114-
end
115-
116112
Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs
117113
return new({})
118114
end

spec/que/active_job/extensions_spec.rb

Lines changed: 86 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,22 @@
33
require 'spec_helper'
44

55
if defined?(::ActiveJob)
6-
describe "running jobs via ActiveJob" do
6+
describe "enqueueing/running jobs via ActiveJob" do
77
before do
88
class TestJobClass < ActiveJob::Base
99
def perform(*args, **kwargs)
1010
$args = args
1111
$kwargs = kwargs
1212
end
1313
end
14+
class OtherTestJobClass < ActiveJob::Base; end
1415
end
1516

1617
after do
1718
Object.send :remove_const, :TestJobClass
1819
$args = nil
1920
$kwargs = nil
21+
Object.send :remove_const, :OtherTestJobClass
2022
end
2123

2224
def execute(&perform_later_block)
@@ -173,27 +175,96 @@ def perform(*args)
173175
end
174176
end
175177

176-
describe 'with bulk_enqueue' do
177-
describe 'ActiveJobClass.perform_later' do
178-
it "is not supported" do
179-
assert_raises_with_message(
180-
Que::Error,
181-
/Que\.bulk_enqueue does not support ActiveJob\./
182-
) do
183-
Que.bulk_enqueue { TestJobClass.perform_later(1, 2) }
178+
def assert_enqueue_active_job(
179+
expected_queues:,
180+
expected_priorities:,
181+
expected_run_ats:,
182+
expected_active_job_classes:,
183+
expected_active_job_arguments:,
184+
expected_count:,
185+
assert_results:,
186+
&enqueue_block
187+
)
188+
189+
assert_equal 0, jobs_dataset.count
190+
191+
results = enqueue_block.call
192+
193+
assert_equal expected_count, jobs_dataset.count
194+
195+
if assert_results
196+
results.each_with_index do |result, i|
197+
assert_kind_of Que::Job, result
198+
assert_instance_of ActiveJob::QueueAdapters::QueAdapter::JobWrapper, result
199+
200+
assert_equal expected_priorities[i], result.que_attrs[:priority]
201+
assert_equal expected_active_job_classes[i], result.que_attrs[:args].first[:job_class]
202+
assert_equal expected_active_job_arguments[i], result.que_attrs[:args].first[:arguments]
203+
assert_equal ({}), result.que_attrs[:kwargs]
204+
assert_equal ({}), result.que_attrs[:data]
205+
end
206+
end
207+
208+
jobs_dataset.order(:id).each_with_index do |job, i|
209+
assert_equal expected_queues[i], job[:queue]
210+
assert_equal expected_priorities[i], job[:priority]
211+
assert_equal expected_active_job_classes[i], job[:args].first[:job_class]
212+
assert_equal expected_active_job_arguments[i], job[:args].first[:arguments]
213+
assert_in_delta job[:run_at], expected_run_ats[i], QueSpec::TIME_SKEW
214+
assert_equal 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper', job[:job_class]
215+
assert_equal ({}), job[:kwargs]
216+
assert_equal ({}), job[:data]
217+
end
218+
219+
jobs_dataset.delete
220+
end
221+
222+
describe "with Que.bulk_enqueue" do
223+
it "should be able to queue multiple jobs with different classes and job options" do
224+
assert_enqueue_active_job(
225+
expected_count: 2,
226+
expected_queues: ['default', 'custom'],
227+
expected_priorities: [100, 200],
228+
expected_run_ats: [Time.now, Time.now + 60],
229+
expected_active_job_classes: ['TestJobClass', 'OtherTestJobClass'],
230+
expected_active_job_arguments: [[1, 2], [3, 4]],
231+
assert_results: true,
232+
) do
233+
Que.bulk_enqueue do
234+
TestJobClass.perform_later(1, 2)
235+
OtherTestJobClass.set(wait: 1.minute, queue: 'custom', priority: 200).perform_later(3, 4)
184236
end
185237
end
186238
end
239+
end
187240

188-
describe 'active_job#enqueue' do
189-
it "is not supported" do
190-
assert_raises_with_message(
191-
Que::Error,
192-
/Que\.bulk_enqueue does not support ActiveJob\./
241+
if ActiveJob.gem_version >= Gem::Version.new('7.1')
242+
describe "with ActiveJob.perform_all_later" do
243+
it "should be able to queue multiple jobs with different classes and job options" do
244+
assert_enqueue_active_job(
245+
expected_count: 2,
246+
expected_queues: ['default', 'custom'],
247+
expected_priorities: [100, 200],
248+
expected_run_ats: [Time.now, Time.now + 60],
249+
expected_active_job_classes: ['TestJobClass', 'OtherTestJobClass'],
250+
expected_active_job_arguments: [[1, 2], [3, 4]],
251+
assert_results: false,
193252
) do
194-
Que.bulk_enqueue { TestJobClass.new.enqueue }
253+
ActiveJob.perform_all_later([
254+
TestJobClass.new(1, 2),
255+
OtherTestJobClass.new(3, 4).set(wait: 1.minute, queue: 'custom', priority: 200),
256+
])
195257
end
196258
end
259+
260+
it "should set provider_job_id on job instances" do
261+
jobs = [TestJobClass.new, TestJobClass.new]
262+
ActiveJob.perform_all_later(jobs)
263+
que_jobs = jobs_dataset.order(:id).to_a
264+
265+
assert_equal jobs[0].provider_job_id, que_jobs[0][:id]
266+
assert_equal jobs[1].provider_job_id, que_jobs[1][:id]
267+
end
197268
end
198269
end
199270
end

0 commit comments

Comments
 (0)