diff --git a/rails_event_store-rspec/lib/rails_event_store/rspec/publish.rb b/rails_event_store-rspec/lib/rails_event_store/rspec/publish.rb index 53f6b003c9..00ad0216d0 100644 --- a/rails_event_store-rspec/lib/rails_event_store/rspec/publish.rb +++ b/rails_event_store-rspec/lib/rails_event_store/rspec/publish.rb @@ -17,7 +17,7 @@ def matches?(event_proc) raise_event_store_not_set unless @event_store spec = @event_store.read spec = spec.stream(@stream) if @stream - last_event_before_block = last_event(spec) + last_event_before_block = spec.last event_proc.call spec = spec.from(last_event_before_block.event_id) if last_event_before_block @published_events = spec.each.to_a @@ -78,10 +78,6 @@ def match_events? !@expected.empty? end - def last_event(spec) - spec.backward.limit(1).each.first - end - def raise_event_store_not_set raise SyntaxError, "You have to set the event store instance with `in`, e.g. `expect { ... }.to publish(an_event(MyEvent)).in(event_store)`" end diff --git a/rails_event_store-rspec/spec/rails_event_store/rspec/publish_spec.rb b/rails_event_store-rspec/spec/rails_event_store/rspec/publish_spec.rb index 0e09711f41..7daac231c1 100644 --- a/rails_event_store-rspec/spec/rails_event_store/rspec/publish_spec.rb +++ b/rails_event_store-rspec/spec/rails_event_store/rspec/publish_spec.rb @@ -71,6 +71,18 @@ def matcher(*expected) }.not_to matcher(matchers.an_event(FooEvent)).in(event_store).in_stream("Foo$1") end + specify do + event_store.publish(FooEvent.new) + event_store.publish(FooEvent.new) + event_store.publish(FooEvent.new) + expect { + event_store.publish(BarEvent.new) + }.to matcher(matchers.an_event(BarEvent)).in(event_store) + expect { + event_store.publish(BarEvent.new) + }.not_to matcher(matchers.an_event(FooEvent)).in(event_store) + end + specify do foo_event = FooEvent.new bar_event = BarEvent.new diff --git a/rails_event_store_active_record-legacy/lib/rails_event_store_active_record/legacy/event_repository.rb b/rails_event_store_active_record-legacy/lib/rails_event_store_active_record/legacy/event_repository.rb index e6e1855be4..8abdf57146 100644 --- a/rails_event_store_active_record-legacy/lib/rails_event_store_active_record/legacy/event_repository.rb +++ b/rails_event_store_active_record-legacy/lib/rails_event_store_active_record/legacy/event_repository.rb @@ -61,13 +61,17 @@ def read_event(event_id) def read(spec) stream = LegacyEvent.order(id: order(spec.direction)) - stream = stream.limit(spec.count) if spec.limit? + stream = stream.limit(spec.limit) if spec.limit? stream = stream.where(start_condition(spec)) unless spec.head? stream = stream.where(stream: spec.stream_name) unless spec.global_stream? if spec.batched? batch_reader = ->(offset, limit) { stream.offset(offset).limit(limit).map(&method(:build_event_entity)) } - RubyEventStore::BatchEnumerator.new(spec.batch_size, total_limit(spec), batch_reader).each + RubyEventStore::BatchEnumerator.new(spec.batch_size, spec.limit, batch_reader).each + elsif spec.first? + build_event_entity(stream.first) + elsif spec.last? + build_event_entity(stream.last) else stream.map(&method(:build_event_entity)).each end @@ -75,10 +79,6 @@ def read(spec) private - def total_limit(specification) - specification.limit? ? specification.count : Float::INFINITY - end - def start_condition(specification) event_record = LegacyEvent.find_by!(event_id: specification.start) diff --git a/rails_event_store_active_record/lib/rails_event_store_active_record/event_repository_reader.rb b/rails_event_store_active_record/lib/rails_event_store_active_record/event_repository_reader.rb index d0ffefa7b2..8910f02df5 100644 --- a/rails_event_store_active_record/lib/rails_event_store_active_record/event_repository_reader.rb +++ b/rails_event_store_active_record/lib/rails_event_store_active_record/event_repository_reader.rb @@ -28,13 +28,19 @@ def read(spec) stream = EventInStream.preload(:event).where(stream: normalize_stream_name(spec)) stream = stream.order(position: order(spec.direction)) unless spec.global_stream? - stream = stream.limit(spec.count) if spec.limit? + stream = stream.limit(spec.limit) if spec.limit? stream = stream.where(start_condition(spec)) unless spec.head? stream = stream.order(id: order(spec.direction)) if spec.batched? batch_reader = ->(offset, limit) { stream.offset(offset).limit(limit).map(&method(:build_event_instance)) } RubyEventStore::BatchEnumerator.new(spec.batch_size, total_limit(spec), batch_reader).each + elsif spec.first? + record = stream.first + build_event_instance(record) if record + elsif spec.last? + record = stream.last + build_event_instance(record) if record else stream.map(&method(:build_event_instance)).each end @@ -43,7 +49,7 @@ def read(spec) private def total_limit(specification) - specification.limit? ? specification.count : Float::INFINITY + specification.limit end def normalize_stream_name(specification) diff --git a/ruby_event_store-rom/lib/ruby_event_store/rom/event_repository.rb b/ruby_event_store-rom/lib/ruby_event_store/rom/event_repository.rb index 569fb8a154..b611ecf87d 100644 --- a/ruby_event_store-rom/lib/ruby_event_store/rom/event_repository.rb +++ b/ruby_event_store-rom/lib/ruby_event_store/rom/event_repository.rb @@ -11,7 +11,7 @@ class EventRepository def initialize(rom: ROM.env) raise ArgumentError, "Must specify rom" unless rom && rom.instance_of?(Env) - + @rom = rom @events = Repositories::Events.new(rom.container) @stream_entries = Repositories::StreamEntries.new(rom.container) @@ -72,13 +72,7 @@ def has_event?(event_id) end def last_stream_event(stream) - @events.read( - :backward, - stream, - from: :head, - limit: 1, - batch_size: nil - ).first + @events.last_stream_event(stream) end def read_event(event_id) @@ -90,13 +84,7 @@ def read_event(event_id) def read(specification) raise ReservedInternalName if specification.stream_name.eql?(@stream_entries.stream_entries.class::SERIALIZED_GLOBAL_STREAM_NAME) - @events.read( - specification.direction, - specification.stream, - from: specification.start, - limit: (specification.count if specification.limit?), - batch_size: (specification.batch_size if specification.batched?) - ) + @events.read(specification) end private diff --git a/ruby_event_store-rom/lib/ruby_event_store/rom/repositories/events.rb b/ruby_event_store-rom/lib/ruby_event_store/rom/repositories/events.rb index 9ca6c9cd35..5454255a4a 100644 --- a/ruby_event_store-rom/lib/ruby_event_store/rom/repositories/events.rb +++ b/ruby_event_store-rom/lib/ruby_event_store/rom/repositories/events.rb @@ -35,31 +35,54 @@ def by_id(event_id) events.map_with(:event_to_serialized_record).by_pk(event_id).one! end - def read(direction, stream, from:, limit:, batch_size:) - unless from.equal?(:head) - offset_entry_id = stream_entries.by_stream_and_event_id(stream, from).fetch(:id) + def last_stream_event(stream) + query = stream_entries.ordered(:backward, stream) + query = query_builder(query, limit: 1) + query.first + end + + def read(specification) + unless specification.head? + offset_entry_id = stream_entries.by_stream_and_event_id(specification.stream, specification.start).fetch(:id) end - if batch_size + direction = specification.direction + limit = specification.limit if specification.limit? + if specification.last? && specification.head? + direction = specification.forward? ? :backward : :forward + end + + query = stream_entries.ordered(direction, specification.stream, offset_entry_id) + + if specification.batched? reader = ->(offset, limit) do - stream_entries - .ordered(direction, stream, offset_entry_id) - .offset(offset) - .take(limit) - .combine(:event) - .map_with(:stream_entry_to_serialized_record, auto_struct: false) - .to_ary + query_builder(query, offset: offset, limit: limit).to_ary end - BatchEnumerator.new(batch_size, limit || Float::INFINITY, reader).each + BatchEnumerator.new(specification.batch_size, limit || Float::INFINITY, reader).each else - stream_entries - .ordered(direction, stream, offset_entry_id) - .take(limit) - .combine(:event) - .map_with(:stream_entry_to_serialized_record, auto_struct: false) - .each + query = query_builder(query, limit: limit) + if specification.head? + specification.first? || specification.last? ? query.first : query.each + else + if specification.last? + query.to_ary.last + else + specification.first? ? query.first : query.each + end + end end end + + protected + + def query_builder(query, offset: nil, limit: nil) + query = query.offset(offset) if offset + query = query.take(limit) if limit + + query + .combine(:event) + .map_with(:stream_entry_to_serialized_record, auto_struct: false) + end end end end diff --git a/ruby_event_store/lib/ruby_event_store/in_memory_repository.rb b/ruby_event_store/lib/ruby_event_store/in_memory_repository.rb index 521275d1b0..328e90c0af 100644 --- a/ruby_event_store/lib/ruby_event_store/in_memory_repository.rb +++ b/ruby_event_store/lib/ruby_event_store/in_memory_repository.rb @@ -39,10 +39,14 @@ def read(spec) events = spec.global_stream? ? global : stream_of(spec.stream_name) events = events.reverse if spec.backward? events = events.drop(index_of(events, spec.start) + 1) unless spec.head? - events = events[0...spec.count] if spec.limit? + events = events[0...spec.limit] if spec.limit? if spec.batched? batch_reader = ->(offset, limit) { events.drop(offset).take(limit) } BatchEnumerator.new(spec.batch_size, events.size, batch_reader).each + elsif spec.first? + events.first + elsif spec.last? + events.last else events.each end diff --git a/ruby_event_store/lib/ruby_event_store/spec/event_repository_lint.rb b/ruby_event_store/lib/ruby_event_store/spec/event_repository_lint.rb index d343856201..d40fd0654f 100644 --- a/ruby_event_store/lib/ruby_event_store/spec/event_repository_lint.rb +++ b/ruby_event_store/lib/ruby_event_store/spec/event_repository_lint.rb @@ -1100,4 +1100,37 @@ def read_all_streams_backward(repository, start, count) expect(batches[0].size).to eq(99) expect(batches[0]).to eq(events[101..199]) end + + specify do + expect(repository.read(specification.read_first.result)).to be_nil + expect(repository.read(specification.read_last.result)).to be_nil + + events = Array.new(5) { SRecord.new } + repository.append_to_stream( + events, + RubyEventStore::Stream.new(RubyEventStore::GLOBAL_STREAM), + RubyEventStore::ExpectedVersion.any + ) + + expect(repository.read(specification.stream("Any").read_first.result)).to be_nil + expect(repository.read(specification.stream("Any").read_last.result)).to be_nil + + expect(repository.read(specification.read_first.result)).to eq(events[0]) + expect(repository.read(specification.read_last.result)).to eq(events[4]) + + expect(repository.read(specification.backward.read_first.result)).to eq(events[4]) + expect(repository.read(specification.backward.read_last.result)).to eq(events[0]) + + expect(repository.read(specification.from(events[2].event_id).read_first.result)).to eq(events[3]) + expect(repository.read(specification.from(events[2].event_id).read_last.result)).to eq(events[4]) + + expect(repository.read(specification.from(events[2].event_id).backward.read_first.result)).to eq(events[1]) + expect(repository.read(specification.from(events[2].event_id).backward.read_last.result)).to eq(events[0]) + + expect(repository.read(specification.from(events[4].event_id).read_first.result)).to be_nil + expect(repository.read(specification.from(events[4].event_id).read_last.result)).to be_nil + + expect(repository.read(specification.from(events[0].event_id).backward.read_first.result)).to be_nil + expect(repository.read(specification.from(events[0].event_id).backward.read_last.result)).to be_nil + end end diff --git a/ruby_event_store/lib/ruby_event_store/specification.rb b/ruby_event_store/lib/ruby_event_store/specification.rb index 5d7efe6990..f506d28774 100644 --- a/ruby_event_store/lib/ruby_event_store/specification.rb +++ b/ruby_event_store/lib/ruby_event_store/specification.rb @@ -2,17 +2,15 @@ module RubyEventStore # Used for building and executing the query specification. class Specification - # @private - # @api private - NO_LIMIT = Object.new.freeze - # @private - # @api private - NO_BATCH = Object.new.freeze DEFAULT_BATCH_SIZE = 100 - class Result < Struct.new(:direction, :start, :count, :stream, :batch_size) + class Result < Struct.new(:direction, :start, :count, :stream, :read_as, :batch_size) def limit? - !count.equal?(NO_LIMIT) + !count.nil? + end + + def limit + count || Float::INFINITY end def global_stream? @@ -36,7 +34,15 @@ def backward? end def batched? - !batch_size.equal?(NO_BATCH) + read_as.equal?(:batch) + end + + def first? + read_as.equal?(:first) + end + + def last? + read_as.equal?(:last) end end private_constant :Result @@ -47,7 +53,7 @@ def batched? # @api private # @private - def initialize(repository, mapper, result = Result.new(:forward, :head, NO_LIMIT, Stream.new(GLOBAL_STREAM), NO_BATCH)) + def initialize(repository, mapper, result = Result.new(:forward, :head, nil, Stream.new(GLOBAL_STREAM), :all, DEFAULT_BATCH_SIZE)) @mapper = mapper @repository = repository @result = result @@ -115,8 +121,7 @@ def limit(count) def each_batch return to_enum(:each_batch) unless block_given? - result_ = result.batched? ? result : result.tap { |r| r.batch_size = DEFAULT_BATCH_SIZE } - repository.read(result_).each do |batch| + repository.read(result.tap { |r| r.read_as = :batch }).each do |batch| yield batch.map { |serialized_record| mapper.serialized_record_to_event(serialized_record) } end end @@ -149,10 +154,46 @@ def each # @param batch_size [Integer] number of events to read in a single batch # @return [Specification] def in_batches(batch_size = DEFAULT_BATCH_SIZE) - Specification.new(repository, mapper, result.dup.tap { |r| r.batch_size = batch_size }) + Specification.new(repository, mapper, result.dup.tap { |r| r.read_as = :batch; r.batch_size = batch_size }) end alias :in_batches_of :in_batches + # Specifies that only first event should be read. + # {http://railseventstore.org/docs/read/ Find out more}. + # + # @return [Specification] + def read_first + Specification.new(repository, mapper, result.dup.tap { |r| r.read_as = :first }) + end + + # Specifies that only last event should be read. + # {http://railseventstore.org/docs/read/ Find out more}. + # + # @return [Specification] + def read_last + Specification.new(repository, mapper, result.dup.tap { |r| r.read_as = :last }) + end + + # Executes the query based on the specification built up to this point. + # Returns the first event in specified collection of events. + # {http://railseventstore.org/docs/read/ Find out more}. + # + # @return [Event, nil] + def first + record = repository.read(read_first.result) + mapper.serialized_record_to_event(record) if record + end + + # Executes the query based on the specification built up to this point. + # Returns the last event in specified collection of events. + # {http://railseventstore.org/docs/read/ Find out more}. + # + # @return [Event, nil] + def last + record = repository.read(read_last.result) + mapper.serialized_record_to_event(record) if record + end + private attr_reader :repository, :mapper end diff --git a/ruby_event_store/spec/specification_spec.rb b/ruby_event_store/spec/specification_spec.rb index af96fd0f91..6fc59fa881 100644 --- a/ruby_event_store/spec/specification_spec.rb +++ b/ruby_event_store/spec/specification_spec.rb @@ -8,16 +8,20 @@ module RubyEventStore specify { expect(specification).to match_result({ start: :head }) } - specify { expect(specification).to match_result({ count: Specification::NO_LIMIT }) } + specify { expect(specification).to match_result({ count: nil }) } specify { expect(specification).to match_result({ stream_name: GLOBAL_STREAM }) } + specify { expect(specification).to match_result({ limit: Float::INFINITY }) } + specify { expect{specification.limit(nil) }.to raise_error(InvalidPageSize) } specify { expect{specification.limit(0)}.to raise_error(InvalidPageSize) } specify { expect(specification.limit(1)).to match_result({ count: 1 }) } + specify { expect(specification.limit(1)).to match_result({ limit: 1 }) } + specify { expect(specification.forward).to match_result({ direction: :forward }) } specify { expect(specification.backward).to match_result({ direction: :backward }) } @@ -132,63 +136,120 @@ module RubyEventStore end end + specify do + with_event_of_id(event_id) do + specs = [ + specification.forward, + specification.backward, + specification.in_batches, + specification.read_first, + specification.read_last, + specification.limit(10), + specification.from(event_id), + specification.stream(stream_name), + ] + expect(specs.map{|s| s.send(:repository)}.uniq).to eq([repository]) + expect(specs.map{|s| s.send(:mapper)}.uniq).to eq([mapper]) + end + end + specify 'immutable specification' do with_event_of_id(event_id) do expect(backward_specifcation = specification.backward).to match_result({ direction: :backward, start: :head, - count: Specification::NO_LIMIT, + count: nil, stream_name: GLOBAL_STREAM, - batch_size: Specification::NO_BATCH + read_as: :all, + batch_size: Specification::DEFAULT_BATCH_SIZE }) expect(specification.from(event_id)).to match_result({ direction: :forward, start: event_id, - count: Specification::NO_LIMIT, + count: nil, stream_name: GLOBAL_STREAM, - batch_size: Specification::NO_BATCH + read_as: :all, + batch_size: Specification::DEFAULT_BATCH_SIZE }) expect(specification.limit(10)).to match_result({ direction: :forward, start: :head, count: 10, stream_name: GLOBAL_STREAM, - batch_size: Specification::NO_BATCH + read_as: :all, + batch_size: Specification::DEFAULT_BATCH_SIZE }) expect(specification.stream(stream_name)).to match_result({ direction: :forward, start: :head, - count: Specification::NO_LIMIT, + count: nil, stream_name: stream_name, - batch_size: Specification::NO_BATCH + read_as: :all, + batch_size: Specification::DEFAULT_BATCH_SIZE }) expect(specification.in_batches).to match_result({ direction: :forward, start: :head, - count: Specification::NO_LIMIT, + count: nil, stream_name: GLOBAL_STREAM, + read_as: :batch, batch_size: 100 }) expect(specification).to match_result({ direction: :forward, start: :head, - count: Specification::NO_LIMIT, + count: nil, stream_name: GLOBAL_STREAM, - batch_size: Specification::NO_BATCH + read_as: :all, + batch_size: Specification::DEFAULT_BATCH_SIZE }) expect(backward_specifcation.forward).to match_result({ direction: :forward, start: :head, - count: Specification::NO_LIMIT, + count: nil, stream_name: GLOBAL_STREAM, - batch_size: Specification::NO_BATCH + read_as: :all, + batch_size: Specification::DEFAULT_BATCH_SIZE }) expect(backward_specifcation).to match_result({ direction: :backward, start: :head, - count: Specification::NO_LIMIT, + count: nil, + stream_name: GLOBAL_STREAM, + read_as: :all, + batch_size: Specification::DEFAULT_BATCH_SIZE + }) + expect(specification.read_first).to match_result({ + direction: :forward, + start: :head, + count: nil, stream_name: GLOBAL_STREAM, - batch_size: Specification::NO_BATCH + read_as: :first, + batch_size: 100 + }) + expect(specification).to match_result({ + direction: :forward, + start: :head, + count: nil, + stream_name: GLOBAL_STREAM, + read_as: :all, + batch_size: Specification::DEFAULT_BATCH_SIZE + }) + expect(specification.read_last).to match_result({ + direction: :forward, + start: :head, + count: nil, + stream_name: GLOBAL_STREAM, + read_as: :last, + batch_size: 100 + }) + expect(specification).to match_result({ + direction: :forward, + start: :head, + count: nil, + stream_name: GLOBAL_STREAM, + read_as: :all, + batch_size: Specification::DEFAULT_BATCH_SIZE }) end end @@ -248,7 +309,7 @@ module RubyEventStore specify { expect(specification.in_batches).to match_result(batch_size: 100) } - specify { expect(specification).to match_result(batch_size: Specification::NO_BATCH) } + specify { expect(specification).to match_result(batch_size: Specification::DEFAULT_BATCH_SIZE) } specify { expect(specification.in_batches(1000)).to match_result(batch_size: 1000) } @@ -295,6 +356,52 @@ module RubyEventStore expect(specification.each_batch.to_a).not_to eq(specification.in_batches(1000).each_batch.to_a) end + specify do + expect(specification.first).to be_nil + expect(specification.last).to be_nil + + records = 5.times.map { test_record } + repository.append_to_stream(records, Stream.new("Dummy"), ExpectedVersion.none) + + expect(specification.stream("Another").first).to be_nil + expect(specification.stream("Another").last).to be_nil + + expect(specification.first).to eq(TestEvent.new(event_id: records[0].event_id)) + expect(specification.last).to eq(TestEvent.new(event_id: records[4].event_id)) + + expect(specification.from(records[2].event_id).first).to eq(TestEvent.new(event_id: records[3].event_id)) + expect(specification.from(records[2].event_id).last).to eq(TestEvent.new(event_id: records[4].event_id)) + + expect(specification.from(records[2].event_id).backward.first).to eq(TestEvent.new(event_id: records[1].event_id)) + expect(specification.from(records[2].event_id).backward.last).to eq(TestEvent.new(event_id: records[0].event_id)) + + expect(specification.from(records[4].event_id).first).to be_nil + expect(specification.from(records[4].event_id).last).to be_nil + + expect(specification.from(records[0].event_id).backward.first).to be_nil + expect(specification.from(records[0].event_id).backward.last).to be_nil + end + + specify do + repository.append_to_stream([test_record], Stream.new("Dummy"), ExpectedVersion.none) + + expect(specification.result.batched?).to eq(false) + expect(specification.result.first?).to eq(false) + expect(specification.result.last?).to eq(false) + + expect(specification.read_first.result.batched?).to eq(false) + expect(specification.read_first.result.first?).to eq(true) + expect(specification.read_first.result.last?).to eq(false) + + expect(specification.read_last.result.batched?).to eq(false) + expect(specification.read_last.result.first?).to eq(false) + expect(specification.read_last.result.last?).to eq(true) + + expect(specification.in_batches.result.batched?).to eq(true) + expect(specification.in_batches.result.first?).to eq(false) + expect(specification.in_batches.result.last?).to eq(false) + end + let(:repository) { InMemoryRepository.new } let(:mapper) { Mappers::Default.new } let(:specification) { Specification.new(repository, mapper) }