-
Notifications
You must be signed in to change notification settings - Fork 125
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce a new AJ scheduler that carries only event_id
Pro: - refactoring-friendly (i.e. migrating event_type) because event data is not stored temporarily in redis, one moving part to care about less - constant size in redis as opposed to size dependent on data and metadata - simpler configuration (no need for marching scheduler and async handler serializers) Con: - handlers require database access (they usually do have it) - additional SQL query to load an event Serialization cost remains unchanged. Whether passing an id or full-serialized payload, the data and metadata need to be deserialized. Also there's no additional serialization step for scheduler, it reuses serialization for repository if that is needed. #755
- Loading branch information
1 parent
fdc18d5
commit 46dedc2
Showing
3 changed files
with
106 additions
and
0 deletions.
There are no files selected for viewing
15 changes: 15 additions & 0 deletions
15
rails_event_store/lib/rails_event_store/active_job_id_only_scheduler.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# frozen_string_literal: true | ||
|
||
require "active_job" | ||
|
||
module RailsEventStore | ||
class ActiveJobIdOnlyScheduler | ||
def call(klass, record) | ||
klass.perform_later({ "event_id" => record.event_id }) | ||
end | ||
|
||
def verify(subscriber) | ||
Class === subscriber && !!(subscriber < ActiveJob::Base) | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
90 changes: 90 additions & 0 deletions
90
rails_event_store/spec/active_job_id_only_scheduler_spec.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
require "spec_helper" | ||
require "ruby_event_store/spec/scheduler_lint" | ||
|
||
module RailsEventStore | ||
RSpec.describe ActiveJobIdOnlyScheduler do | ||
around do |example| | ||
begin | ||
original_logger = ActiveJob::Base.logger | ||
ActiveJob::Base.logger = nil | ||
|
||
original_adapter = ActiveJob::Base.queue_adapter | ||
ActiveJob::Base.queue_adapter = :test | ||
|
||
example.run | ||
ensure | ||
ActiveJob::Base.logger = original_logger | ||
ActiveJob::Base.queue_adapter = original_adapter | ||
end | ||
end | ||
|
||
before(:each) do | ||
MyAsyncHandler.reset | ||
end | ||
|
||
it_behaves_like :scheduler, ActiveJobIdOnlyScheduler.new | ||
it_behaves_like :scheduler, ActiveJobIdOnlyScheduler.new | ||
|
||
let(:event) { TimeEnrichment.with(Event.new(event_id: "83c3187f-84f6-4da7-8206-73af5aca7cc8"), timestamp: Time.utc(2019, 9, 30)) } | ||
let(:record) { RubyEventStore::Mappers::Default.new.event_to_record(event) } | ||
|
||
describe "#verify" do | ||
specify do | ||
scheduler = ActiveJobIdOnlyScheduler.new | ||
proper_handler = Class.new(ActiveJob::Base) | ||
expect(scheduler.verify(proper_handler)).to eq(true) | ||
end | ||
|
||
specify do | ||
scheduler = ActiveJobIdOnlyScheduler.new | ||
some_class = Class.new | ||
expect(scheduler.verify(some_class)).to eq(false) | ||
end | ||
|
||
specify do | ||
scheduler = ActiveJobIdOnlyScheduler.new | ||
expect(scheduler.verify(ActiveJob::Base)).to eq(false) | ||
end | ||
|
||
specify do | ||
scheduler = ActiveJobIdOnlyScheduler.new | ||
expect(scheduler.verify(Object.new)).to eq(false) | ||
end | ||
end | ||
|
||
describe "#call" do | ||
specify do | ||
scheduler = ActiveJobIdOnlyScheduler.new | ||
scheduler.call(MyAsyncHandler, record) | ||
|
||
enqueued_jobs = ActiveJob::Base.queue_adapter.enqueued_jobs | ||
expect(enqueued_jobs.size).to eq(1) | ||
expect(enqueued_jobs[0]).to include( | ||
{ | ||
job: MyAsyncHandler, | ||
args: [ | ||
{ | ||
"event_id" => "83c3187f-84f6-4da7-8206-73af5aca7cc8", | ||
"_aj_symbol_keys" => [], | ||
}, | ||
], | ||
queue: "default", | ||
}, | ||
) | ||
end | ||
end | ||
|
||
class MyAsyncHandler < ActiveJob::Base | ||
@@received = nil | ||
def self.reset | ||
@@received = nil | ||
end | ||
def self.received | ||
@@received | ||
end | ||
def perform(event) | ||
@@received = event | ||
end | ||
end | ||
end | ||
end |