Skip to content

Commit

Permalink
Correlation between command_bus & event_store
Browse files Browse the repository at this point in the history
* commands are correlated with events which triggered them inside sync
  handlers
* events are correlated with commands which trigger them via command_bus

Async handlers not yet approached.

Issue: #346
  • Loading branch information
paneq committed Jun 19, 2018
1 parent 369c2b8 commit c946a30
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 11 deletions.
47 changes: 45 additions & 2 deletions railseventstore.org/source/docs/correlation_causation.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ new_event.metadata[:causation_id]

## Correlating an event with a command

If your command responds to `correlation_id` (can even always be `nil`) and `message_id` you can correlate your events
also with commands.
If your command responds to `correlation_id` (can even always be `nil`) and `message_id` you can correlate your events also with commands.

```ruby
class ApproveOrder = < Struct.new(:order_id, :message_id, :correlation_id)
Expand Down Expand Up @@ -85,4 +84,48 @@ class MyEventHandler
end
```

## Correlating together events with commands, and commands with events from sync handlers

If you use event store and [command bus](/docs/command_bus/) you can correlate together both kinds of messages: events & commands.

```ruby
config.to_prepare do
Rails.configuration.event_store = event_store = RailsEventStore::Client.new
# register handlers

command_bus = Arkency::CommandBus.new
# register commands...

# wire event_store and command_bus together
Rails.configuration.command_bus = RubyEventStore::CorrelatedCommands.new(event_store, command_bus)
end
```

Using `CorrelatedCommands` makes your events automatically correlated to the commands which triggered them (commands must respond to `message_id` method).

If your commands respond to `correlate_with` method they will be correlated to events which triggered them inside sync handlers.

Example:

```ruby
module CorrelableCommand
attr_accessor :correlation_id, :causation_id

def correlate_with(other_message)
self.correlation_id = other_message.correlation_id || other_message.message_id
self.causation_id = other_message.message_id
end
end

class AddProductCommand < Struct.new(:message_id, :product_id)
include CorrelableCommand

def initialize(product_id:, message_id: SecureRandom.uuid)
super(message_id, product_id)
end
end
```

## Thanks

Image thanks to [Arkency blog](https://blog.arkency.com/correlation-id-and-causation-id-in-evented-systems/)
1 change: 1 addition & 0 deletions ruby_event_store/lib/ruby_event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
require 'ruby_event_store/mappers/protobuf'
require 'ruby_event_store/mappers/null_mapper'
require 'ruby_event_store/batch_enumerator'
require 'ruby_event_store/correlated_commands'
require 'ruby_event_store/version'
19 changes: 10 additions & 9 deletions ruby_event_store/lib/ruby_event_store/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,19 @@ def within(&block)

def with_metadata(metadata, &block)
previous_metadata = metadata()
self.metadata = (previous_metadata || {}).merge(metadata)
self.metadata = previous_metadata.merge(metadata)
block.call if block_given?
ensure
self.metadata = previous_metadata
end

def metadata
@metadata.value || EMPTY_HASH
end

EMPTY_HASH = {}.freeze
private_constant :EMPTY_HASH

private

def serialized_events(events)
Expand All @@ -217,16 +224,10 @@ def enrich_event_metadata(event)
event.metadata[:timestamp] ||= clock.call
end

attr_reader :repository, :mapper, :event_broker, :clock, :page_size

protected

def metadata
@metadata.value
end

def metadata=(value)
@metadata.value = value
end

attr_reader :repository, :mapper, :event_broker, :clock, :page_size
end
end
37 changes: 37 additions & 0 deletions ruby_event_store/lib/ruby_event_store/correlated_commands.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
module RubyEventStore
class CorrelatedCommands

def initialize(event_store, command_bus)
@event_store = event_store
@command_bus = command_bus
end

class MiniEvent < Struct.new(:correlation_id, :message_id)
end

def call(command)
if (correlation_id = event_store.metadata[:correlation_id]) && (causation_id = event_store.metadata[:causation_id])
command.correlate_with(MiniEvent.new(
correlation_id,
causation_id,
)) if command.respond_to?(:correlate_with)
event_store.with_metadata(
causation_id: command.message_id,
) do
command_bus.call(command)
end
else
event_store.with_metadata(
correlation_id: command.message_id,
causation_id: command.message_id,
) do
command_bus.call(command)
end
end
end

private

attr_reader :event_store, :command_bus
end
end
125 changes: 125 additions & 0 deletions ruby_event_store/spec/correlated_commands_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
require 'spec_helper'
require 'time'

module RubyEventStore
RSpec.describe CorrelatedCommands do

module CorrelableCommand
attr_accessor :correlation_id, :causation_id

def correlate_with(other_message)
self.correlation_id = other_message.correlation_id || other_message.message_id
self.causation_id = other_message.message_id
end
end

class AddProductCommand < Struct.new(:message_id, :product_id)
include CorrelableCommand
def initialize(product_id:, message_id: SecureRandom.uuid)
super(message_id, product_id)
end
end

class TestCommand < Struct.new(:message_id)
include CorrelableCommand
def initialize(message_id: SecureRandom.uuid)
super(message_id)
end
end

let(:event_store) do
RubyEventStore::Client.new(repository: InMemoryRepository.new)
end
let(:command_bus) do
-> (cmd) do
{
AddProductCommand => -> (c) do
event_store.publish_event(ProductAdded.new(data:{
product_id: c.product_id,
}))
end,
TestCommand => -> (_c) do
event_store.publish_event(TestEvent.new())
end,
}.fetch(cmd.class).call(cmd)
end
end

specify "correlate produced events with current command" do
bus = CorrelatedCommands.new(event_store, command_bus)
bus.call(cmd = TestCommand.new)
event = event_store.read.each.first
expect(event.correlation_id).to eq(cmd.message_id)
expect(event.causation_id).to eq(cmd.message_id)
expect(cmd.message_id).to be_a(String)
end

specify "correlate commands with events from sync handlers" do
cmd2 = nil
bus = CorrelatedCommands.new(event_store, command_bus)
event_store.subscribe(to: [ProductAdded]) do
bus.call(cmd2 = TestCommand.new)
end
bus.call(cmd1 = AddProductCommand.new(product_id: 20))

expect(cmd1.correlation_id).to be_nil
expect(cmd1.causation_id).to be_nil

event1 = event_store.read.each.first
expect(event1.correlation_id).to eq(cmd1.message_id)
expect(event1.causation_id).to eq(cmd1.message_id)

expect(cmd2.correlation_id).to eq(cmd1.message_id)
expect(cmd2.causation_id).to eq(event1.message_id)

event2 = event_store.read.each.to_a.last
expect(event2.correlation_id).to eq(cmd1.message_id)
expect(event2.causation_id).to eq(cmd2.message_id)
end

specify "correlate commands with events from sync handlers (missing correlate_with)" do
cmd2 = TestCommand.new
cmd2.instance_eval('undef :correlate_with')

cmd1 = AddProductCommand.new(product_id: 20)
cmd1.instance_eval('undef :correlate_with')

bus = CorrelatedCommands.new(event_store, command_bus)
event_store.subscribe(to: [ProductAdded]) do
bus.call(cmd2)
end
bus.call(cmd1)

expect(cmd1.correlation_id).to be_nil
expect(cmd1.causation_id).to be_nil

event1 = event_store.read.each.first
expect(event1.correlation_id).to eq(cmd1.message_id)
expect(event1.causation_id).to eq(cmd1.message_id)

expect(cmd2.correlation_id).to be_nil
expect(cmd2.causation_id).to be_nil

event2 = event_store.read.each.to_a.last
expect(event2.correlation_id).to eq(cmd1.message_id)
expect(event2.causation_id).to eq(cmd2.message_id)
end

specify "both correlation_id and causation_id must be set to correlate command" do
event_store.with_metadata(correlation_id: "COID") do
bus = CorrelatedCommands.new(event_store, command_bus)
bus.call(cmd = TestCommand.new)
expect(cmd.correlation_id).to be_nil
expect(cmd.causation_id).to be_nil
end

event_store.with_metadata(causation_id: "CAID") do
bus = CorrelatedCommands.new(event_store, command_bus)
bus.call(cmd = TestCommand.new)
expect(cmd.correlation_id).to be_nil
expect(cmd.causation_id).to be_nil
end
end

end
end

0 comments on commit c946a30

Please sign in to comment.