From c946a307d4b75bf17b57f7d6a5c317d21a3bc578 Mon Sep 17 00:00:00 2001 From: Robert Pankowecki Date: Tue, 19 Jun 2018 15:35:21 +0200 Subject: [PATCH] Correlation between command_bus & event_store * commands are correlated with events which triggered them inside sync handlers * events are correlated with commands which trigger them via command_bus Async handlers not yet approached. Issue: #346 --- .../source/docs/correlation_causation.html.md | 47 ++++++- ruby_event_store/lib/ruby_event_store.rb | 1 + .../lib/ruby_event_store/client.rb | 19 +-- .../ruby_event_store/correlated_commands.rb | 37 ++++++ .../spec/correlated_commands_spec.rb | 125 ++++++++++++++++++ 5 files changed, 218 insertions(+), 11 deletions(-) create mode 100644 ruby_event_store/lib/ruby_event_store/correlated_commands.rb create mode 100644 ruby_event_store/spec/correlated_commands_spec.rb diff --git a/railseventstore.org/source/docs/correlation_causation.html.md b/railseventstore.org/source/docs/correlation_causation.html.md index 4485890295..82f0775e1a 100644 --- a/railseventstore.org/source/docs/correlation_causation.html.md +++ b/railseventstore.org/source/docs/correlation_causation.html.md @@ -55,8 +55,7 @@ new_event.metadata[:causation_id] ## Correlating an event with a command -If your command responds to `correlation_id` (can even always be `nil`) and `message_id` you can correlate your events -also with commands. +If your command responds to `correlation_id` (can even always be `nil`) and `message_id` you can correlate your events also with commands. ```ruby class ApproveOrder = < Struct.new(:order_id, :message_id, :correlation_id) @@ -85,4 +84,48 @@ class MyEventHandler end ``` +## Correlating together events with commands, and commands with events from sync handlers + +If you use event store and [command bus](/docs/command_bus/) you can correlate together both kinds of messages: events & commands. + +```ruby +config.to_prepare do + Rails.configuration.event_store = event_store = RailsEventStore::Client.new + # register handlers + + command_bus = Arkency::CommandBus.new + # register commands... + + # wire event_store and command_bus together + Rails.configuration.command_bus = RubyEventStore::CorrelatedCommands.new(event_store, command_bus) +end +``` + +Using `CorrelatedCommands` makes your events automatically correlated to the commands which triggered them (commands must respond to `message_id` method). + +If your commands respond to `correlate_with` method they will be correlated to events which triggered them inside sync handlers. + +Example: + +```ruby +module CorrelableCommand + attr_accessor :correlation_id, :causation_id + + def correlate_with(other_message) + self.correlation_id = other_message.correlation_id || other_message.message_id + self.causation_id = other_message.message_id + end +end + +class AddProductCommand < Struct.new(:message_id, :product_id) + include CorrelableCommand + + def initialize(product_id:, message_id: SecureRandom.uuid) + super(message_id, product_id) + end +end +``` + +## Thanks + Image thanks to [Arkency blog](https://blog.arkency.com/correlation-id-and-causation-id-in-evented-systems/) \ No newline at end of file diff --git a/ruby_event_store/lib/ruby_event_store.rb b/ruby_event_store/lib/ruby_event_store.rb index b420d5b348..bf8f145304 100644 --- a/ruby_event_store/lib/ruby_event_store.rb +++ b/ruby_event_store/lib/ruby_event_store.rb @@ -16,4 +16,5 @@ require 'ruby_event_store/mappers/protobuf' require 'ruby_event_store/mappers/null_mapper' require 'ruby_event_store/batch_enumerator' +require 'ruby_event_store/correlated_commands' require 'ruby_event_store/version' diff --git a/ruby_event_store/lib/ruby_event_store/client.rb b/ruby_event_store/lib/ruby_event_store/client.rb index 81a8417ba2..827974f301 100644 --- a/ruby_event_store/lib/ruby_event_store/client.rb +++ b/ruby_event_store/lib/ruby_event_store/client.rb @@ -188,12 +188,19 @@ def within(&block) def with_metadata(metadata, &block) previous_metadata = metadata() - self.metadata = (previous_metadata || {}).merge(metadata) + self.metadata = previous_metadata.merge(metadata) block.call if block_given? ensure self.metadata = previous_metadata end + def metadata + @metadata.value || EMPTY_HASH + end + + EMPTY_HASH = {}.freeze + private_constant :EMPTY_HASH + private def serialized_events(events) @@ -217,16 +224,10 @@ def enrich_event_metadata(event) event.metadata[:timestamp] ||= clock.call end - attr_reader :repository, :mapper, :event_broker, :clock, :page_size - - protected - - def metadata - @metadata.value - end - def metadata=(value) @metadata.value = value end + + attr_reader :repository, :mapper, :event_broker, :clock, :page_size end end diff --git a/ruby_event_store/lib/ruby_event_store/correlated_commands.rb b/ruby_event_store/lib/ruby_event_store/correlated_commands.rb new file mode 100644 index 0000000000..4f11a7f9fe --- /dev/null +++ b/ruby_event_store/lib/ruby_event_store/correlated_commands.rb @@ -0,0 +1,37 @@ +module RubyEventStore + class CorrelatedCommands + + def initialize(event_store, command_bus) + @event_store = event_store + @command_bus = command_bus + end + + class MiniEvent < Struct.new(:correlation_id, :message_id) + end + + def call(command) + if (correlation_id = event_store.metadata[:correlation_id]) && (causation_id = event_store.metadata[:causation_id]) + command.correlate_with(MiniEvent.new( + correlation_id, + causation_id, + )) if command.respond_to?(:correlate_with) + event_store.with_metadata( + causation_id: command.message_id, + ) do + command_bus.call(command) + end + else + event_store.with_metadata( + correlation_id: command.message_id, + causation_id: command.message_id, + ) do + command_bus.call(command) + end + end + end + + private + + attr_reader :event_store, :command_bus + end +end diff --git a/ruby_event_store/spec/correlated_commands_spec.rb b/ruby_event_store/spec/correlated_commands_spec.rb new file mode 100644 index 0000000000..ca32696dae --- /dev/null +++ b/ruby_event_store/spec/correlated_commands_spec.rb @@ -0,0 +1,125 @@ +require 'spec_helper' +require 'time' + +module RubyEventStore + RSpec.describe CorrelatedCommands do + + module CorrelableCommand + attr_accessor :correlation_id, :causation_id + + def correlate_with(other_message) + self.correlation_id = other_message.correlation_id || other_message.message_id + self.causation_id = other_message.message_id + end + end + + class AddProductCommand < Struct.new(:message_id, :product_id) + include CorrelableCommand + def initialize(product_id:, message_id: SecureRandom.uuid) + super(message_id, product_id) + end + end + + class TestCommand < Struct.new(:message_id) + include CorrelableCommand + def initialize(message_id: SecureRandom.uuid) + super(message_id) + end + end + + let(:event_store) do + RubyEventStore::Client.new(repository: InMemoryRepository.new) + end + let(:command_bus) do + -> (cmd) do + { + AddProductCommand => -> (c) do + event_store.publish_event(ProductAdded.new(data:{ + product_id: c.product_id, + })) + end, + TestCommand => -> (_c) do + event_store.publish_event(TestEvent.new()) + end, + }.fetch(cmd.class).call(cmd) + end + end + + specify "correlate produced events with current command" do + bus = CorrelatedCommands.new(event_store, command_bus) + bus.call(cmd = TestCommand.new) + event = event_store.read.each.first + expect(event.correlation_id).to eq(cmd.message_id) + expect(event.causation_id).to eq(cmd.message_id) + expect(cmd.message_id).to be_a(String) + end + + specify "correlate commands with events from sync handlers" do + cmd2 = nil + bus = CorrelatedCommands.new(event_store, command_bus) + event_store.subscribe(to: [ProductAdded]) do + bus.call(cmd2 = TestCommand.new) + end + bus.call(cmd1 = AddProductCommand.new(product_id: 20)) + + expect(cmd1.correlation_id).to be_nil + expect(cmd1.causation_id).to be_nil + + event1 = event_store.read.each.first + expect(event1.correlation_id).to eq(cmd1.message_id) + expect(event1.causation_id).to eq(cmd1.message_id) + + expect(cmd2.correlation_id).to eq(cmd1.message_id) + expect(cmd2.causation_id).to eq(event1.message_id) + + event2 = event_store.read.each.to_a.last + expect(event2.correlation_id).to eq(cmd1.message_id) + expect(event2.causation_id).to eq(cmd2.message_id) + end + + specify "correlate commands with events from sync handlers (missing correlate_with)" do + cmd2 = TestCommand.new + cmd2.instance_eval('undef :correlate_with') + + cmd1 = AddProductCommand.new(product_id: 20) + cmd1.instance_eval('undef :correlate_with') + + bus = CorrelatedCommands.new(event_store, command_bus) + event_store.subscribe(to: [ProductAdded]) do + bus.call(cmd2) + end + bus.call(cmd1) + + expect(cmd1.correlation_id).to be_nil + expect(cmd1.causation_id).to be_nil + + event1 = event_store.read.each.first + expect(event1.correlation_id).to eq(cmd1.message_id) + expect(event1.causation_id).to eq(cmd1.message_id) + + expect(cmd2.correlation_id).to be_nil + expect(cmd2.causation_id).to be_nil + + event2 = event_store.read.each.to_a.last + expect(event2.correlation_id).to eq(cmd1.message_id) + expect(event2.causation_id).to eq(cmd2.message_id) + end + + specify "both correlation_id and causation_id must be set to correlate command" do + event_store.with_metadata(correlation_id: "COID") do + bus = CorrelatedCommands.new(event_store, command_bus) + bus.call(cmd = TestCommand.new) + expect(cmd.correlation_id).to be_nil + expect(cmd.causation_id).to be_nil + end + + event_store.with_metadata(causation_id: "CAID") do + bus = CorrelatedCommands.new(event_store, command_bus) + bus.call(cmd = TestCommand.new) + expect(cmd.correlation_id).to be_nil + expect(cmd.causation_id).to be_nil + end + end + + end +end