Skip to content

Commit

Permalink
Merge pull request #382 from RailsEventStore/linking_metadata
Browse files Browse the repository at this point in the history
LinkByMetadata, LinkByCorrelationId, LinkByCausationId, LinkByEventType introduced
  • Loading branch information
paneq authored Jul 3, 2018
2 parents 74397d8 + 40357a3 commit 99787d8
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 0 deletions.
2 changes: 2 additions & 0 deletions rails_event_store/lib/rails_event_store/all.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'rails_event_store/async_proxy_strategy'
require 'rails_event_store/active_job_dispatcher'
require 'rails_event_store/async_handler_helpers'
require 'rails_event_store/link_by_metadata'
require 'rails_event_store/client'
require 'rails_event_store/version'
require 'rails_event_store/railtie'
Expand All @@ -20,6 +21,7 @@ module RailsEventStore
InvalidHandler = RubyEventStore::InvalidHandler
InvalidPageStart = RubyEventStore::InvalidPageStart
InvalidPageSize = RubyEventStore::InvalidPageSize
CorrelatedCommands = RubyEventStore::CorrelatedCommands
GLOBAL_STREAM = RubyEventStore::GLOBAL_STREAM
PAGE_SIZE = RubyEventStore::PAGE_SIZE
AsyncProxyStrategy::Inline = RubyEventStore::AsyncProxyStrategy::Inline
Expand Down
27 changes: 27 additions & 0 deletions rails_event_store/lib/rails_event_store/link_by_metadata.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module RailsEventStore

class LinkByMetadata < RubyEventStore::LinkByMetadata
def initialize(event_store: Rails.configuration.event_store, key:, prefix: nil)
super(event_store: event_store, key: key, prefix: prefix)
end
end

class LinkByCorrelationId < RubyEventStore::LinkByCorrelationId
def initialize(event_store: Rails.configuration.event_store, prefix: nil)
super(event_store: event_store, prefix: prefix)
end
end

class LinkByCausationId < RubyEventStore::LinkByCausationId
def initialize(event_store: Rails.configuration.event_store, prefix: nil)
super(event_store: event_store, prefix: prefix)
end
end

class LinkByEventType < RubyEventStore::LinkByEventType
def initialize(event_store: Rails.configuration.event_store, prefix: nil)
super(event_store: event_store, prefix: prefix)
end
end

end
115 changes: 115 additions & 0 deletions rails_event_store/spec/link_by_metadata_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
require 'spec_helper'
require 'action_controller/railtie'

module RailsEventStore
RSpec.describe LinkByMetadata do

before do
rails = double("Rails", configuration: Rails::Application::Configuration.new)
stub_const("Rails", rails)
Rails.configuration.event_store = event_store
end

let(:event_store) { RailsEventStore::Client.new }

specify "links" do
event_store.subscribe_to_all_events(LinkByMetadata.new(key: :city))
event_store.publish(ev = OrderCreated.new(metadata:{
city: "Paris",
}))
expect(event_store.read.stream("$by_city_Paris").each.to_a).to eq([ev])
end

specify "defaults to Rails.configuration.event_store and passes rest of options" do
event_store.subscribe_to_all_events(LinkByMetadata.new(
key: :city,
prefix: "sweet+")
)
event_store.publish(ev = OrderCreated.new(metadata:{
city: "Paris",
}))
expect(event_store.read.stream("sweet+Paris").each.to_a).to eq([ev])
end

end

RSpec.describe LinkByCorrelationId do
before do
rails = double("Rails", configuration: Rails::Application::Configuration.new)
stub_const("Rails", rails)
Rails.configuration.event_store = event_store
end

let(:event_store) { RailsEventStore::Client.new }
let(:event) do
OrderCreated.new.tap do |ev|
ev.correlation_id = "COR"
ev.causation_id = "CAU"
end
end

specify "links" do
event_store.subscribe_to_all_events(LinkByCorrelationId.new)
event_store.publish(event)
expect(event_store.read.stream("$by_correlation_id_COR").each.to_a).to eq([event])
end

specify "defaults to Rails.configuration.event_store and passes rest of options" do
event_store.subscribe_to_all_events(LinkByCorrelationId.new(prefix: "sweet+"))
event_store.publish(event)
expect(event_store.read.stream("sweet+COR").each.to_a).to eq([event])
end
end

RSpec.describe LinkByCausationId do
before do
rails = double("Rails", configuration: Rails::Application::Configuration.new)
stub_const("Rails", rails)
Rails.configuration.event_store = event_store
end

let(:event_store) { RailsEventStore::Client.new }
let(:event) do
OrderCreated.new.tap do |ev|
ev.correlation_id = "COR"
ev.causation_id = "CAU"
end
end

specify "links" do
event_store.subscribe_to_all_events(LinkByCausationId.new)
event_store.publish(event)
expect(event_store.read.stream("$by_causation_id_CAU").each.to_a).to eq([event])
end

specify "defaults to Rails.configuration.event_store and passes rest of options" do
event_store.subscribe_to_all_events(LinkByCausationId.new(prefix: "sweet+"))
event_store.publish(event)
expect(event_store.read.stream("sweet+CAU").each.to_a).to eq([event])
end
end

RSpec.describe LinkByEventType do
let(:event_store) { RailsEventStore::Client.new }
let(:event) { OrderCreated.new }

before do
rails = double("Rails", configuration: Rails::Application::Configuration.new)
stub_const("Rails", rails)
Rails.configuration.event_store = event_store
end

specify "default prefix" do
event_store.subscribe_to_all_events(LinkByEventType.new)
event_store.publish(event)
expect(event_store.read.stream("$by_type_OrderCreated").each.to_a).to eq([event])
end

specify "custom prefix" do
event_store.subscribe_to_all_events(LinkByEventType.new(prefix: "e-"))
event_store.publish(event)
expect(event_store.read.stream("e-OrderCreated").each.to_a).to eq([event])
end
end

end
1 change: 1 addition & 0 deletions ruby_event_store/lib/ruby_event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require 'ruby_event_store/mappers/null_mapper'
require 'ruby_event_store/batch_enumerator'
require 'ruby_event_store/correlated_commands'
require 'ruby_event_store/link_by_metadata'
require 'ruby_event_store/async_proxy_strategy'
require 'ruby_event_store/async_dispatcher'
require 'ruby_event_store/version'
55 changes: 55 additions & 0 deletions ruby_event_store/lib/ruby_event_store/link_by_metadata.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module RubyEventStore
class LinkByMetadata

def initialize(event_store:, key:, prefix: nil)
@event_store = event_store
@key = key
@prefix = prefix || ["$by", key, nil].join("_")
end

def call(event)
return unless event.metadata.has_key?(@key)

@event_store.link_to_stream(
[event.message_id],
stream_name: "#{@prefix}#{event.metadata.fetch(@key)}"
)
end

end

class LinkByCorrelationId < LinkByMetadata
def initialize(event_store:, prefix: nil)
super(
event_store: event_store,
prefix: prefix,
key: :correlation_id,
)
end
end

class LinkByCausationId < LinkByMetadata
def initialize(event_store:, prefix: nil)
super(
event_store: event_store,
prefix: prefix,
key: :causation_id,
)
end
end

class LinkByEventType
def initialize(event_store:, prefix: nil)
@event_store = event_store
@prefix = prefix || "$by_type_"
end

def call(event)
@event_store.link_to_stream(
[event.message_id],
stream_name: "#{@prefix}#{event.type}"
)
end
end

end
147 changes: 147 additions & 0 deletions ruby_event_store/spec/link_by_metadata_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
require 'spec_helper'

module RubyEventStore
RSpec.describe LinkByMetadata do

let(:event_store) do
RubyEventStore::Client.new(repository: InMemoryRepository.new)
end

specify 'links to stream based on selected metadata' do
event_store.subscribe_to_all_events(LinkByMetadata.new(event_store: event_store, key: :string))
event_store.subscribe_to_all_events(LinkByMetadata.new(event_store: event_store, key: :float))
event_store.subscribe_to_all_events(LinkByMetadata.new(event_store: event_store, key: :int))
event_store.subscribe_to_all_events(LinkByMetadata.new(event_store: event_store, key: :missing))

event_store.publish(ev = OrderCreated.new(metadata:{
string: "city",
float: 1.5,
int: 2,
}))

expect(event_store.read.stream("$by_string_city").each.to_a).to eq([ev])
expect(event_store.read.stream("$by_float_1.5").each.to_a).to eq([ev])
expect(event_store.read.stream("$by_int_2").each.to_a).to eq([ev])

expect(event_store.read.stream("$by_missing").each.to_a).to eq([])
expect(event_store.read.stream("$by_missing_").each.to_a).to eq([])
expect(event_store.read.stream("$by_missing_nil").each.to_a).to eq([])
end

specify 'links to stream based on selected metadata (proto)' do
event_store = RubyEventStore::Client.new(
mapper: RubyEventStore::Mappers::Protobuf.new,
repository: InMemoryRepository.new
)
event_store.subscribe_to_all_events(LinkByMetadata.new(event_store: event_store, key: :city))
ev = RubyEventStore::Proto.new(
data: ResTesting::OrderCreated.new(
customer_id: 123,
order_id: "K3THNX9",
),
metadata: { city: "Chicago" }
)
event_store.publish(ev)

expect(event_store.read.stream("$by_city_Chicago").each.to_a).to eq([ev])
end

specify "custom prefix" do
event_store.subscribe_to_all_events(LinkByMetadata.new(
event_store: event_store,
key: :city,
prefix: "sweet+")
)

event_store.publish(ev = OrderCreated.new(metadata:{
city: "Paris",
}))

expect(event_store.read.stream("sweet+Paris").each.to_a).to eq([ev])
end

specify "explicitly passes array of ids instead of a single id" do
event_store.subscribe_to_all_events(LinkByMetadata.new(event_store: event_store, key: :city))
expect(event_store).to receive(:link_to_stream).with(instance_of(Array), any_args)
event_store.publish(ev = OrderCreated.new(metadata:{
city: "Paris",
}))
end

end

RSpec.describe LinkByCorrelationId do
let(:event_store) do
RubyEventStore::Client.new(repository: InMemoryRepository.new)
end
let(:event) do
OrderCreated.new.tap do |ev|
ev.correlation_id = "COR"
ev.causation_id = "CAU"
end
end

specify "default prefix" do
event_store.subscribe_to_all_events(LinkByCorrelationId.new(event_store: event_store))
event_store.publish(event)
expect(event_store.read.stream("$by_correlation_id_COR").each.to_a).to eq([event])
end

specify "custom prefix" do
event_store.subscribe_to_all_events(LinkByCorrelationId.new(event_store: event_store, prefix: "c-"))
event_store.publish(event)
expect(event_store.read.stream("c-COR").each.to_a).to eq([event])
end
end

RSpec.describe LinkByCausationId do
let(:event_store) do
RubyEventStore::Client.new(repository: InMemoryRepository.new)
end
let(:event) do
OrderCreated.new.tap do |ev|
ev.correlation_id = "COR"
ev.causation_id = "CAU"
end
end

specify "default prefix" do
event_store.subscribe_to_all_events(LinkByCausationId.new(event_store: event_store))
event_store.publish(event)
expect(event_store.read.stream("$by_causation_id_CAU").each.to_a).to eq([event])
end

specify "custom prefix" do
event_store.subscribe_to_all_events(LinkByCausationId.new(event_store: event_store, prefix: "c-"))
event_store.publish(event)
expect(event_store.read.stream("c-CAU").each.to_a).to eq([event])
end
end

RSpec.describe LinkByEventType do
let(:event_store) do
RubyEventStore::Client.new(repository: InMemoryRepository.new)
end
let(:event) { OrderCreated.new }

specify "default prefix" do
event_store.subscribe_to_all_events(LinkByEventType.new(event_store: event_store))
event_store.publish(event)
expect(event_store.read.stream("$by_type_OrderCreated").each.to_a).to eq([event])
end

specify "custom prefix" do
event_store.subscribe_to_all_events(LinkByEventType.new(event_store: event_store, prefix: "e-"))
event_store.publish(event)
expect(event_store.read.stream("e-OrderCreated").each.to_a).to eq([event])
end

specify "explicitly passes array of ids instead of a single id" do
event_store.subscribe_to_all_events(LinkByEventType.new(event_store: event_store))
expect(event_store).to receive(:link_to_stream).with(instance_of(Array), any_args)
event_store.publish(ev = OrderCreated.new())
end
end

end

0 comments on commit 99787d8

Please sign in to comment.