Skip to content

Commit

Permalink
Merge pull request #399 from mpraglowski/read-spec-last
Browse files Browse the repository at this point in the history
Add first/last to read specification
  • Loading branch information
mpraglowski authored Jul 30, 2018
2 parents a609202 + 9d0b696 commit 6cc5ad5
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def matches?(event_proc)
raise_event_store_not_set unless @event_store
spec = @event_store.read
spec = spec.stream(@stream) if @stream
last_event_before_block = last_event(spec)
last_event_before_block = spec.last
event_proc.call
spec = spec.from(last_event_before_block.event_id) if last_event_before_block
@published_events = spec.each.to_a
Expand Down Expand Up @@ -78,10 +78,6 @@ def match_events?
!@expected.empty?
end

def last_event(spec)
spec.backward.limit(1).each.first
end

def raise_event_store_not_set
raise SyntaxError, "You have to set the event store instance with `in`, e.g. `expect { ... }.to publish(an_event(MyEvent)).in(event_store)`"
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ def matcher(*expected)
}.not_to matcher(matchers.an_event(FooEvent)).in(event_store).in_stream("Foo$1")
end

specify do
event_store.publish(FooEvent.new)
event_store.publish(FooEvent.new)
event_store.publish(FooEvent.new)
expect {
event_store.publish(BarEvent.new)
}.to matcher(matchers.an_event(BarEvent)).in(event_store)
expect {
event_store.publish(BarEvent.new)
}.not_to matcher(matchers.an_event(FooEvent)).in(event_store)
end

specify do
foo_event = FooEvent.new
bar_event = BarEvent.new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@ def read_event(event_id)

def read(spec)
stream = LegacyEvent.order(id: order(spec.direction))
stream = stream.limit(spec.count) if spec.limit?
stream = stream.limit(spec.limit) if spec.limit?
stream = stream.where(start_condition(spec)) unless spec.head?
stream = stream.where(stream: spec.stream_name) unless spec.global_stream?

if spec.batched?
batch_reader = ->(offset, limit) { stream.offset(offset).limit(limit).map(&method(:build_event_entity)) }
RubyEventStore::BatchEnumerator.new(spec.batch_size, total_limit(spec), batch_reader).each
RubyEventStore::BatchEnumerator.new(spec.batch_size, spec.limit, batch_reader).each
elsif spec.first?
build_event_entity(stream.first)
elsif spec.last?
build_event_entity(stream.last)
else
stream.map(&method(:build_event_entity)).each
end
end

private

def total_limit(specification)
specification.limit? ? specification.count : Float::INFINITY
end

def start_condition(specification)
event_record =
LegacyEvent.find_by!(event_id: specification.start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ def read(spec)

stream = EventInStream.preload(:event).where(stream: normalize_stream_name(spec))
stream = stream.order(position: order(spec.direction)) unless spec.global_stream?
stream = stream.limit(spec.count) if spec.limit?
stream = stream.limit(spec.limit) if spec.limit?
stream = stream.where(start_condition(spec)) unless spec.head?
stream = stream.order(id: order(spec.direction))

if spec.batched?
batch_reader = ->(offset, limit) { stream.offset(offset).limit(limit).map(&method(:build_event_instance)) }
RubyEventStore::BatchEnumerator.new(spec.batch_size, total_limit(spec), batch_reader).each
elsif spec.first?
record = stream.first
build_event_instance(record) if record
elsif spec.last?
record = stream.last
build_event_instance(record) if record
else
stream.map(&method(:build_event_instance)).each
end
Expand All @@ -43,7 +49,7 @@ def read(spec)
private

def total_limit(specification)
specification.limit? ? specification.count : Float::INFINITY
specification.limit
end

def normalize_stream_name(specification)
Expand Down
18 changes: 3 additions & 15 deletions ruby_event_store-rom/lib/ruby_event_store/rom/event_repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class EventRepository

def initialize(rom: ROM.env)
raise ArgumentError, "Must specify rom" unless rom && rom.instance_of?(Env)

@rom = rom
@events = Repositories::Events.new(rom.container)
@stream_entries = Repositories::StreamEntries.new(rom.container)
Expand Down Expand Up @@ -72,13 +72,7 @@ def has_event?(event_id)
end

def last_stream_event(stream)
@events.read(
:backward,
stream,
from: :head,
limit: 1,
batch_size: nil
).first
@events.last_stream_event(stream)
end

def read_event(event_id)
Expand All @@ -90,13 +84,7 @@ def read_event(event_id)
def read(specification)
raise ReservedInternalName if specification.stream_name.eql?(@stream_entries.stream_entries.class::SERIALIZED_GLOBAL_STREAM_NAME)

@events.read(
specification.direction,
specification.stream,
from: specification.start,
limit: (specification.count if specification.limit?),
batch_size: (specification.batch_size if specification.batched?)
)
@events.read(specification)
end

private
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,54 @@ def by_id(event_id)
events.map_with(:event_to_serialized_record).by_pk(event_id).one!
end

def read(direction, stream, from:, limit:, batch_size:)
unless from.equal?(:head)
offset_entry_id = stream_entries.by_stream_and_event_id(stream, from).fetch(:id)
def last_stream_event(stream)
query = stream_entries.ordered(:backward, stream)
query = query_builder(query, limit: 1)
query.first
end

def read(specification)
unless specification.head?
offset_entry_id = stream_entries.by_stream_and_event_id(specification.stream, specification.start).fetch(:id)
end

if batch_size
direction = specification.direction
limit = specification.limit if specification.limit?
if specification.last? && specification.head?
direction = specification.forward? ? :backward : :forward
end

query = stream_entries.ordered(direction, specification.stream, offset_entry_id)

if specification.batched?
reader = ->(offset, limit) do
stream_entries
.ordered(direction, stream, offset_entry_id)
.offset(offset)
.take(limit)
.combine(:event)
.map_with(:stream_entry_to_serialized_record, auto_struct: false)
.to_ary
query_builder(query, offset: offset, limit: limit).to_ary
end
BatchEnumerator.new(batch_size, limit || Float::INFINITY, reader).each
BatchEnumerator.new(specification.batch_size, limit || Float::INFINITY, reader).each
else
stream_entries
.ordered(direction, stream, offset_entry_id)
.take(limit)
.combine(:event)
.map_with(:stream_entry_to_serialized_record, auto_struct: false)
.each
query = query_builder(query, limit: limit)
if specification.head?
specification.first? || specification.last? ? query.first : query.each
else
if specification.last?
query.to_ary.last
else
specification.first? ? query.first : query.each
end
end
end
end

protected

def query_builder(query, offset: nil, limit: nil)
query = query.offset(offset) if offset
query = query.take(limit) if limit

query
.combine(:event)
.map_with(:stream_entry_to_serialized_record, auto_struct: false)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ def read(spec)
events = spec.global_stream? ? global : stream_of(spec.stream_name)
events = events.reverse if spec.backward?
events = events.drop(index_of(events, spec.start) + 1) unless spec.head?
events = events[0...spec.count] if spec.limit?
events = events[0...spec.limit] if spec.limit?
if spec.batched?
batch_reader = ->(offset, limit) { events.drop(offset).take(limit) }
BatchEnumerator.new(spec.batch_size, events.size, batch_reader).each
elsif spec.first?
events.first
elsif spec.last?
events.last
else
events.each
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,4 +1100,37 @@ def read_all_streams_backward(repository, start, count)
expect(batches[0].size).to eq(99)
expect(batches[0]).to eq(events[101..199])
end

specify do
expect(repository.read(specification.read_first.result)).to be_nil
expect(repository.read(specification.read_last.result)).to be_nil

events = Array.new(5) { SRecord.new }
repository.append_to_stream(
events,
RubyEventStore::Stream.new(RubyEventStore::GLOBAL_STREAM),
RubyEventStore::ExpectedVersion.any
)

expect(repository.read(specification.stream("Any").read_first.result)).to be_nil
expect(repository.read(specification.stream("Any").read_last.result)).to be_nil

expect(repository.read(specification.read_first.result)).to eq(events[0])
expect(repository.read(specification.read_last.result)).to eq(events[4])

expect(repository.read(specification.backward.read_first.result)).to eq(events[4])
expect(repository.read(specification.backward.read_last.result)).to eq(events[0])

expect(repository.read(specification.from(events[2].event_id).read_first.result)).to eq(events[3])
expect(repository.read(specification.from(events[2].event_id).read_last.result)).to eq(events[4])

expect(repository.read(specification.from(events[2].event_id).backward.read_first.result)).to eq(events[1])
expect(repository.read(specification.from(events[2].event_id).backward.read_last.result)).to eq(events[0])

expect(repository.read(specification.from(events[4].event_id).read_first.result)).to be_nil
expect(repository.read(specification.from(events[4].event_id).read_last.result)).to be_nil

expect(repository.read(specification.from(events[0].event_id).backward.read_first.result)).to be_nil
expect(repository.read(specification.from(events[0].event_id).backward.read_last.result)).to be_nil
end
end
67 changes: 54 additions & 13 deletions ruby_event_store/lib/ruby_event_store/specification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@ module RubyEventStore

# Used for building and executing the query specification.
class Specification
# @private
# @api private
NO_LIMIT = Object.new.freeze
# @private
# @api private
NO_BATCH = Object.new.freeze
DEFAULT_BATCH_SIZE = 100

class Result < Struct.new(:direction, :start, :count, :stream, :batch_size)
class Result < Struct.new(:direction, :start, :count, :stream, :read_as, :batch_size)
def limit?
!count.equal?(NO_LIMIT)
!count.nil?
end

def limit
count || Float::INFINITY
end

def global_stream?
Expand All @@ -36,7 +34,15 @@ def backward?
end

def batched?
!batch_size.equal?(NO_BATCH)
read_as.equal?(:batch)
end

def first?
read_as.equal?(:first)
end

def last?
read_as.equal?(:last)
end
end
private_constant :Result
Expand All @@ -47,7 +53,7 @@ def batched?

# @api private
# @private
def initialize(repository, mapper, result = Result.new(:forward, :head, NO_LIMIT, Stream.new(GLOBAL_STREAM), NO_BATCH))
def initialize(repository, mapper, result = Result.new(:forward, :head, nil, Stream.new(GLOBAL_STREAM), :all, DEFAULT_BATCH_SIZE))
@mapper = mapper
@repository = repository
@result = result
Expand Down Expand Up @@ -115,8 +121,7 @@ def limit(count)
def each_batch
return to_enum(:each_batch) unless block_given?

result_ = result.batched? ? result : result.tap { |r| r.batch_size = DEFAULT_BATCH_SIZE }
repository.read(result_).each do |batch|
repository.read(result.tap { |r| r.read_as = :batch }).each do |batch|
yield batch.map { |serialized_record| mapper.serialized_record_to_event(serialized_record) }
end
end
Expand Down Expand Up @@ -149,10 +154,46 @@ def each
# @param batch_size [Integer] number of events to read in a single batch
# @return [Specification]
def in_batches(batch_size = DEFAULT_BATCH_SIZE)
Specification.new(repository, mapper, result.dup.tap { |r| r.batch_size = batch_size })
Specification.new(repository, mapper, result.dup.tap { |r| r.read_as = :batch; r.batch_size = batch_size })
end
alias :in_batches_of :in_batches

# Specifies that only first event should be read.
# {http://railseventstore.org/docs/read/ Find out more}.
#
# @return [Specification]
def read_first
Specification.new(repository, mapper, result.dup.tap { |r| r.read_as = :first })
end

# Specifies that only last event should be read.
# {http://railseventstore.org/docs/read/ Find out more}.
#
# @return [Specification]
def read_last
Specification.new(repository, mapper, result.dup.tap { |r| r.read_as = :last })
end

# Executes the query based on the specification built up to this point.
# Returns the first event in specified collection of events.
# {http://railseventstore.org/docs/read/ Find out more}.
#
# @return [Event, nil]
def first
record = repository.read(read_first.result)
mapper.serialized_record_to_event(record) if record
end

# Executes the query based on the specification built up to this point.
# Returns the last event in specified collection of events.
# {http://railseventstore.org/docs/read/ Find out more}.
#
# @return [Event, nil]
def last
record = repository.read(read_last.result)
mapper.serialized_record_to_event(record) if record
end

private
attr_reader :repository, :mapper
end
Expand Down
Loading

0 comments on commit 6cc5ad5

Please sign in to comment.