Skip to content

Commit

Permalink
Introduce AsyncHandler and CorrelatedHandler helpers
Browse files Browse the repository at this point in the history
which help with correlating commands/events coming from async handlers.

Issue: #346
  • Loading branch information
paneq committed Jun 22, 2018
1 parent 4d1e462 commit 5eb19d9
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 63 deletions.
26 changes: 15 additions & 11 deletions rails_event_store/lib/rails_event_store/async_handler_helpers.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
module AsyncHandler
def perform(payload)
super(Rails.configuration.event_store.deserialize(payload))
module RailsEventStore

module AsyncHandler
def perform(payload)
super(Rails.configuration.event_store.deserialize(payload))
end
end
end

module CorrelatedHandler
def perform(event)
Rails.configuration.event_store.with_metadata(
correlation_id: event.metadata[:correlation_id] || event.event_id,
causation_id: event.event_id
) do
super
module CorrelatedHandler
def perform(event)
Rails.configuration.event_store.with_metadata(
correlation_id: event.metadata[:correlation_id] || event.event_id,
causation_id: event.event_id
) do
super
end
end
end

end
1 change: 0 additions & 1 deletion rails_event_store/rails_event_store.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rake', '~> 10.0'
spec.add_development_dependency 'rspec', '~> 3.6'
spec.add_development_dependency 'rails', '~> 5.2'
# spec.add_development_dependency 'railties', '~> 5.2'
spec.add_development_dependency 'sqlite3'
spec.add_development_dependency 'rack-test'
spec.add_development_dependency 'mutant-rspec', '~> 0.8.14'
Expand Down
80 changes: 40 additions & 40 deletions rails_event_store/spec/active_job_dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ module RailsEventStore

before(:each) do
CallableHandler.reset
AsyncHandler.reset
MyAsyncHandler.reset
end

let!(:event) { RailsEventStore::Event.new(event_id: "83c3187f-84f6-4da7-8206-73af5aca7cc8") }
let!(:serialized_event) { RubyEventStore::Mappers::Default.new.event_to_serialized_record(event) }

it "verification" do
expect do
ActiveJobDispatcher.new.verify(AsyncHandler)
ActiveJobDispatcher.new.verify(MyAsyncHandler)
end.not_to raise_error
expect do
ActiveJobDispatcher.new.verify(ActiveJob::Base)
Expand All @@ -40,12 +40,12 @@ module RailsEventStore
end

it "builds async proxy for ActiveJob::Base ancestors" do
expect_to_have_enqueued_job(AsyncHandler) do
ActiveJobDispatcher.new.call(AsyncHandler, event, serialized_event)
expect_to_have_enqueued_job(MyAsyncHandler) do
ActiveJobDispatcher.new.call(MyAsyncHandler, event, serialized_event)
end
expect(AsyncHandler.received).to be_nil
perform_enqueued_jobs(AsyncHandler.queue_adapter)
expect(AsyncHandler.received).to eq({
expect(MyAsyncHandler.received).to be_nil
perform_enqueued_jobs(MyAsyncHandler.queue_adapter)
expect(MyAsyncHandler.received).to eq({
"event_id" => "83c3187f-84f6-4da7-8206-73af5aca7cc8",
"data" => "--- {}\n",
"metadata" => "--- {}\n",
Expand All @@ -56,12 +56,12 @@ module RailsEventStore

it "async proxy for defined adapter enqueue job immediately when no transaction is open" do
dispatcher = ActiveJobDispatcher.new(proxy_strategy: AsyncProxyStrategy::AfterCommit.new)
expect_to_have_enqueued_job(AsyncHandler) do
dispatcher.call(AsyncHandler, event, serialized_event)
expect_to_have_enqueued_job(MyAsyncHandler) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
end
expect(AsyncHandler.received).to be_nil
perform_enqueued_jobs(AsyncHandler.queue_adapter)
expect(AsyncHandler.received).to eq({
expect(MyAsyncHandler.received).to be_nil
perform_enqueued_jobs(MyAsyncHandler.queue_adapter)
expect(MyAsyncHandler.received).to eq({
"event_id" => "83c3187f-84f6-4da7-8206-73af5aca7cc8",
"data" => "--- {}\n",
"metadata" => "--- {}\n",
Expand All @@ -72,16 +72,16 @@ module RailsEventStore

it "async proxy for defined adapter enqueue job only after transaction commit" do
dispatcher = ActiveJobDispatcher.new(proxy_strategy: AsyncProxyStrategy::AfterCommit.new)
expect_to_have_enqueued_job(AsyncHandler) do
expect_to_have_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job do
dispatcher.call(AsyncHandler, event, serialized_event)
dispatcher.call(MyAsyncHandler, event, serialized_event)
end
end
end
expect(AsyncHandler.received).to be_nil
perform_enqueued_jobs(AsyncHandler.queue_adapter)
expect(AsyncHandler.received).to eq({
expect(MyAsyncHandler.received).to be_nil
perform_enqueued_jobs(MyAsyncHandler.queue_adapter)
expect(MyAsyncHandler.received).to eq({
"event_id" => "83c3187f-84f6-4da7-8206-73af5aca7cc8",
"data" => "--- {}\n",
"metadata" => "--- {}\n",
Expand All @@ -92,14 +92,14 @@ module RailsEventStore

it "async proxy for defined adapter do not enqueue job after transaction rollback" do
dispatcher = ActiveJobDispatcher.new(proxy_strategy: AsyncProxyStrategy::AfterCommit.new)
expect_no_enqueued_job(AsyncHandler) do
expect_no_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
dispatcher.call(AsyncHandler, event, serialized_event)
dispatcher.call(MyAsyncHandler, event, serialized_event)
raise ActiveRecord::Rollback
end
end
perform_enqueued_jobs(AsyncHandler.queue_adapter)
expect(AsyncHandler.received).to be_nil
perform_enqueued_jobs(MyAsyncHandler.queue_adapter)
expect(MyAsyncHandler.received).to be_nil
end

it "async proxy for defined adapter does not enqueue job after transaction rollback (with raises)" do
Expand All @@ -108,33 +108,33 @@ module RailsEventStore
ActiveRecord::Base.raise_in_transactional_callbacks = true

dispatcher = ActiveJobDispatcher.new(proxy_strategy: AsyncProxyStrategy::AfterCommit.new)
expect_no_enqueued_job(AsyncHandler) do
expect_no_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
dispatcher.call(AsyncHandler, event, serialized_event)
dispatcher.call(MyAsyncHandler, event, serialized_event)
raise ActiveRecord::Rollback
end
end
perform_enqueued_jobs(AsyncHandler.queue_adapter)
expect(AsyncHandler.received).to be_nil
perform_enqueued_jobs(MyAsyncHandler.queue_adapter)
expect(MyAsyncHandler.received).to be_nil
ensure
ActiveRecord::Base.raise_in_transactional_callbacks = was
end
end if ActiveRecord::Base.respond_to?(:raise_in_transactional_callbacks)

it "async proxy for defined adapter enqueue job only after top-level transaction (nested is not new) commit" do
dispatcher = ActiveJobDispatcher.new(proxy_strategy: AsyncProxyStrategy::AfterCommit.new)
expect_to_have_enqueued_job(AsyncHandler) do
expect_to_have_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job do
ActiveRecord::Base.transaction(requires_new: false) do
dispatcher.call(AsyncHandler, event, serialized_event)
dispatcher.call(MyAsyncHandler, event, serialized_event)
end
end
end
end
expect(AsyncHandler.received).to be_nil
perform_enqueued_jobs(AsyncHandler.queue_adapter)
expect(AsyncHandler.received).to eq({
expect(MyAsyncHandler.received).to be_nil
perform_enqueued_jobs(MyAsyncHandler.queue_adapter)
expect(MyAsyncHandler.received).to eq({
"event_id" => "83c3187f-84f6-4da7-8206-73af5aca7cc8",
"data" => "--- {}\n",
"metadata" => "--- {}\n",
Expand All @@ -145,18 +145,18 @@ module RailsEventStore

it "async proxy for defined adapter enqueue job only after top-level transaction commit" do
dispatcher = ActiveJobDispatcher.new(proxy_strategy: AsyncProxyStrategy::AfterCommit.new)
expect_to_have_enqueued_job(AsyncHandler) do
expect_to_have_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job do
ActiveRecord::Base.transaction(requires_new: true) do
dispatcher.call(AsyncHandler, event, serialized_event)
dispatcher.call(MyAsyncHandler, event, serialized_event)
end
end
end
end
expect(AsyncHandler.received).to be_nil
perform_enqueued_jobs(AsyncHandler.queue_adapter)
expect(AsyncHandler.received).to eq({
expect(MyAsyncHandler.received).to be_nil
perform_enqueued_jobs(MyAsyncHandler.queue_adapter)
expect(MyAsyncHandler.received).to eq({
"event_id" => "83c3187f-84f6-4da7-8206-73af5aca7cc8",
"data" => "--- {}\n",
"metadata" => "--- {}\n",
Expand All @@ -167,18 +167,18 @@ module RailsEventStore

it "async proxy for defined adapter do not enqueue job after nested transaction rollback" do
dispatcher = ActiveJobDispatcher.new(proxy_strategy: AsyncProxyStrategy::AfterCommit.new)
expect_no_enqueued_job(AsyncHandler) do
expect_no_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job do
ActiveRecord::Base.transaction(requires_new: true) do
dispatcher.call(AsyncHandler, event, serialized_event)
dispatcher.call(MyAsyncHandler, event, serialized_event)
raise ActiveRecord::Rollback
end
end
end
end
perform_enqueued_jobs(AsyncHandler.queue_adapter)
expect(AsyncHandler.received).to be_nil
perform_enqueued_jobs(MyAsyncHandler.queue_adapter)
expect(MyAsyncHandler.received).to be_nil
end

def with_queue_adapter(job, queue_adapter = :test, &proc)
Expand Down Expand Up @@ -222,7 +222,7 @@ def call(event)
end
end

class AsyncHandler < ActiveJob::Base
class MyAsyncHandler < ActiveJob::Base
@@received = nil
def self.reset
@@received = nil
Expand Down
39 changes: 28 additions & 11 deletions rails_event_store/spec/rails_event_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,32 @@
require 'action_controller/railtie'

RSpec.describe RailsEventStore do
class MyAsyncHandler < ActiveJob::Base
self.queue_adapter = :inline
cattr_accessor :event, :event_store
class MyLovelyAsyncHandler < ActiveJob::Base
self.queue_adapter = :async
cattr_accessor :event

def perform(payload)
self.class.event = Rails.configuration.event_store.deserialize(payload)
end
end

class HandlerWithHelper < ActiveJob::Base
self.queue_adapter = :inline
self.queue_adapter = :async
cattr_accessor :event

prepend AsyncHandler
prepend RailsEventStore::AsyncHandler

def perform(event)
self.class.event = event
end
end

class MetadataHandler < ActiveJob::Base
self.queue_adapter = :inline
self.queue_adapter = :async
cattr_accessor :metadata

prepend CorrelatedHandler
prepend AsyncHandler
prepend RailsEventStore::CorrelatedHandler
prepend RailsEventStore::AsyncHandler

def perform(event)
self.metadata = Rails.configuration.event_store.metadata
Expand All @@ -42,23 +43,26 @@ def perform(event)
end

specify 'default dispatcher can into ActiveJob' do
expect(MyAsyncHandler.event).to eq(nil)
event_store.subscribe_to_all_events(MyAsyncHandler)
expect(MyLovelyAsyncHandler.event).to eq(nil)
event_store.subscribe_to_all_events(MyLovelyAsyncHandler)
event_store.publish_event(ev = RailsEventStore::Event.new)
expect(MyAsyncHandler.event).to eq(ev)
wait_until{ MyLovelyAsyncHandler.event }
expect(MyLovelyAsyncHandler.event).to eq(ev)
end

specify 'ActiveJob with AsyncHandler prepended' do
expect(HandlerWithHelper.event).to eq(nil)
event_store.subscribe_to_all_events(HandlerWithHelper)
event_store.publish_event(ev = RailsEventStore::Event.new)
wait_until{ HandlerWithHelper.event }
expect(HandlerWithHelper.event).to eq(ev)
end

specify 'ActiveJob with CorrelatedHandler prepended' do
MetadataHandler.metadata = nil
event_store.subscribe_to_all_events(MetadataHandler)
event_store.publish_event(ev = RailsEventStore::Event.new)
wait_until{ MetadataHandler.metadata }
expect(MetadataHandler.metadata).to eq({
correlation_id: ev.event_id,
causation_id: ev.event_id,
Expand All @@ -74,9 +78,22 @@ def perform(event)
causation_id: "CAID",
}
))
wait_until{ MetadataHandler.metadata }
expect(MetadataHandler.metadata).to eq({
correlation_id: "COID",
causation_id: ev.event_id,
})
end

private

def wait_until(&block)
Timeout.timeout(1) do
loop do
break if block.call
sleep(0.001)
end
end
end

end

0 comments on commit 5eb19d9

Please sign in to comment.