diff --git a/lib/ably/models/presence_message.rb b/lib/ably/models/presence_message.rb index 2e5851cd9..acf428df5 100644 --- a/lib/ably/models/presence_message.rb +++ b/lib/ably/models/presence_message.rb @@ -123,7 +123,6 @@ def to_json(*args) end.to_json end - # Assign this presence message to a ProtocolMessage before delivery to the Ably system # @api private def assign_to_protocol_message(protocol_message) @@ -145,6 +144,17 @@ def protocol_message @protocol_message end + # Create a static shallow clone of this object with the optional attributes to overide existing values + # Shallow clones have no dependency on the originating ProtocolMessage as all field values are stored as opposed to calculated + # Clones are useful when the original PresenceMessage needs to be mutated, such as storing in a PresenceMap with action :present + def shallow_clone(new_attributes = {}) + self.class.new(attributes.to_hash.merge( + id: id, + connection_id: connection_id, + timestamp: as_since_epoch(timestamp) + ).merge(new_attributes)) + end + private def raw_hash_object @raw_hash_object diff --git a/lib/ably/realtime/presence.rb b/lib/ably/realtime/presence.rb index 1641d7ba4..96292b39a 100644 --- a/lib/ably/realtime/presence.rb +++ b/lib/ably/realtime/presence.rb @@ -210,7 +210,7 @@ def update_client(client_id, data = nil, &success_block) send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Update, client_id, data, &success_block) end - # Get the presence state for this Channel. + # Get the presence members for this Channel. # # @param (see Ably::Realtime::Presence::MembersMap#get) # @option options (see Ably::Realtime::Presence::MembersMap#get) @@ -222,9 +222,9 @@ def get(options = {}, &block) ensure_channel_attached(deferrable) do members.get(options).tap do |members_map_deferrable| - members_map_deferrable.callback do |*args| - safe_yield(block, *args) if block_given? - deferrable.succeed(*args) + members_map_deferrable.callback do |members| + safe_yield(block, members) if block_given? + deferrable.succeed(members) end members_map_deferrable.errback do |*args| deferrable.fail(*args) diff --git a/lib/ably/realtime/presence/members_map.rb b/lib/ably/realtime/presence/members_map.rb index bf93a8e68..b39ac1622 100644 --- a/lib/ably/realtime/presence/members_map.rb +++ b/lib/ably/realtime/presence/members_map.rb @@ -228,16 +228,41 @@ def ensure_presence_message_is_valid(presence_message) end # If the message received is older than the last known event for presence - # then skip. This can occur during a SYNC operation. For example: + # then skip (return false). This can occur during a SYNC operation. For example: # - SYNC starts # - LEAVE event received for clientId 5 # - SYNC present even received for clientId 5 with a timestamp before LEAVE event because the LEAVE occured before the SYNC operation completed # - # @return [Boolean] + # @return [Boolean] true when +new_message+ is newer than the existing member in the PresenceMap # - def should_update_member?(presence_message) - if members[presence_message.member_key] - members[presence_message.member_key].fetch(:message).timestamp < presence_message.timestamp + def should_update_member?(new_message) + if members[new_message.member_key] + existing_message = members[new_message.member_key].fetch(:message) + + # If both are messages published by clients (not fabricated), use the ID to determine newness, see #RTP2b2 + if new_message.id.start_with?(new_message.connection_id) && existing_message.id.start_with?(existing_message.connection_id) + new_message_parts = new_message.id.match(/(\d+):(\d+)$/) + existing_message_parts = existing_message.id.match(/(\d+):(\d+)$/) + + if !new_message_parts || !existing_message_parts + logger.fatal { "#{self.class.name}: Message IDs for new message #{new_message.id} or old message #{existing_message.id} are invalid. \nNew message: #{new_message.to_json}" } + return existing_message.timestamp < new_message.timestamp + end + + # ID is in the format "connid:msgSerial:index" such as "aaaaaa:0:0" + # if msgSerial is greater then the new_message should update the member + # if msgSerial is equal and index is greater, then update the member + if new_message_parts[1].to_i > existing_message_parts[1].to_i # msgSerial + true + elsif new_message_parts[1].to_i == existing_message_parts[1].to_i # msgSerial equal + new_message_parts[2].to_i > existing_message_parts[2].to_i # compare index + else + false + end + else + # This message is fabricated or could not be validated so rely on timestamps, see #RTP2b1 + new_message.timestamp > existing_message.timestamp + end else true end @@ -245,7 +270,10 @@ def should_update_member?(presence_message) def add_presence_member(presence_message) logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' for event '#{presence_message.action}' #{members.has_key?(presence_message.member_key) ? 'updated' : 'added'}.\n#{presence_message.to_json}" } - members[presence_message.member_key] = { present: true, message: presence_message } + present_action = + # Mutate the PresenceMessage so that the action is :present, see #RTP2d + present_presence_message = presence_message.shallow_clone(action: Ably::Models::PresenceMessage::ACTION.Present) + members[presence_message.member_key] = { present: true, message: present_presence_message } presence.emit_message presence_message.action, presence_message end diff --git a/spec/acceptance/realtime/presence_spec.rb b/spec/acceptance/realtime/presence_spec.rb index 04dc659b0..9a2e7ceb2 100644 --- a/spec/acceptance/realtime/presence_spec.rb +++ b/spec/acceptance/realtime/presence_spec.rb @@ -410,7 +410,7 @@ def presence_action(method_name, data) end context 'when attached (but not present) on a presence channel with an anonymous client (no client ID)' do - it 'maintains state as other clients enter and leave the channel' do + it 'maintains state as other clients enter and leave the channel (#RTP2e)' do channel_anonymous_client.attach do presence_anonymous_client.subscribe(:enter) do |presence_message| expect(presence_message.client_id).to eql(client_one.client_id) @@ -437,7 +437,7 @@ def presence_action(method_name, data) end end - context '#members map', api_private: true do + context '#members map / PresenceMap (#RTP2)', api_private: true do it 'is available once the channel is created' do expect(presence_anonymous_client.members).to_not be_nil stop_reactor @@ -482,26 +482,196 @@ def presence_action(method_name, data) end end end + + context 'the map is based on the member_key (connection_id & client_id)' do + # 2 unqiue client_id with client_id "b" being on two connections + let(:enter_action) { 2 } + let(:presence_data) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action }, + { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: enter_action }, + { client_id: 'a', connection_id: 'one', id: 'one:0:2', action: enter_action }, + { client_id: 'b', connection_id: 'one', id: 'one:0:3', action: enter_action }, + { client_id: 'b', connection_id: 'two', id: 'two:0:4', action: enter_action } + ] + end + + it 'ensures uniqueness from this member_key (#RTP2a)' do + channel_anonymous_client.attach do + presence_anonymous_client.get(wait_for_sync: true) do |members| + expect(members.length).to eql(0) + + ## Fabricate members + action = Ably::Models::ProtocolMessage::ACTION.Presence + presence_msg = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: 20, + channel: channel_name, + presence: presence_data, + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, presence_msg + + EventMachine.add_timer(0.5) do + presence_anonymous_client.get do |members| + expect(members.length).to eql(3) + expect(members.map { |member| member.client_id }.uniq).to contain_exactly('a', 'b') + stop_reactor + end + end + end + end + end + end + + context 'newness is compared based on PresenceMessage#id unless synthesized' do + let(:page_size) { 100 } + let(:enter_expected_count) { page_size + 1 } # 100 per page, this ensures we have more than one page so that we can test newness during sync + let(:enter_action) { 2 } + let(:leave_action) { 3 } + let(:now) { Time.now.to_i * 1000 } + let(:entered) { [] } + let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(auth_callback: wildcard_token)) } + + def setup_members_on(presence) + enter_expected_count.times do |indx| + # 10 messages per second max rate on simulation accounts + rate = indx.to_f / 10 + EventMachine.add_timer(rate) do + presence.enter_client("client:#{indx}") do |message| + entered << message + next unless entered.count == enter_expected_count + yield + end + end + end + end + + def allow_sync_fabricate_data_final_sync_and_assert_members + setup_members_on(presence_client_one) do + sync_pages_received = [] + anonymous_client.connection.once(:connected) do + anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :sync + sync_pages_received << protocol_message + if sync_pages_received.count == 1 + action = Ably::Models::ProtocolMessage::ACTION.Presence + presence_msg = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: protocol_message.connection_serial + 1, + channel: channel_name, + presence: presence_data, + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, presence_msg + + # Now simulate an end to the sync + action = Ably::Models::ProtocolMessage::ACTION.Sync + sync_msg = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: protocol_message.connection_serial + 2, + channel: channel_name, + channel_serial: 'validserialprefix:', # with no part after the `:` this indicates the end to the SYNC + presence: [], + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_msg + + # Stop the next SYNC arriving + anonymous_client.connection.__incoming_protocol_msgbus__.unsubscribe + end + end + end + + presence_anonymous_client.get(wait_for_sync: true) do |members| + expect(members.length).to eql(page_size + 2) + expect(members.find { |member| member.client_id == 'a' }).to be_nil + expect(members.find { |member| member.client_id == 'b' }.timestamp.to_i).to eql(now / 1000) + expect(members.find { |member| member.client_id == 'c' }.timestamp.to_i).to eql(now / 1000) + stop_reactor + end + end + end + end + + context 'when presence messages are synthesized' do + let(:presence_data) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action, timestamp: now }, # first messages from client, second fabricated + { client_id: 'a', connection_id: 'one', id: 'fabricated:0:1', action: leave_action, timestamp: now + 1 }, # leave after enter based on timestamp + { client_id: 'b', connection_id: 'one', id: 'one:0:2', action: enter_action, timestamp: now }, # first messages from client, second fabricated + { client_id: 'b', connection_id: 'one', id: 'fabricated:0:3', action: leave_action, timestamp: now - 1 }, # leave before enter based on timestamp + { client_id: 'c', connection_id: 'one', id: 'fabricated:0:4', action: enter_action, timestamp: now }, # both messages fabricated + { client_id: 'c', connection_id: 'one', id: 'fabricated:0:5', action: leave_action, timestamp: now - 1 } # leave before enter based on timestamp + ] + end + + it 'compares based on timestamp if either has a connectionId not part of the presence message id (#RTP2b1)' do + allow_sync_fabricate_data_final_sync_and_assert_members + end + end + + context 'when presence messages are not synthesized (events sent from clients)' do + let(:presence_data) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action, timestamp: now }, # first messages from client + { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action, timestamp: now - 1 }, # leave after enter based on msgSerial part of ID + { client_id: 'b', connection_id: 'one', id: 'one:2:2', action: enter_action, timestamp: now }, # first messages from client + { client_id: 'b', connection_id: 'one', id: 'one:2:1', action: leave_action, timestamp: now + 1 }, # leave before enter based on index part of ID + { client_id: 'c', connection_id: 'one', id: 'one:4:4', action: enter_action, timestamp: now }, # first messages from client + { client_id: 'c', connection_id: 'one', id: 'one:3:5', action: leave_action, timestamp: now + 1 } # leave before enter based on msgSerial part of ID + ] + end + + it 'compares based on timestamp if either has a connectionId not part of the presence message id (#RTP2b2)' do + allow_sync_fabricate_data_final_sync_and_assert_members + end + end + end end - context '#sync_complete?' do + context '#sync_complete? and SYNC flags (#RTP1)' do context 'when attaching to a channel without any members present' do - it 'is true and the presence channel is considered synced immediately' do + it 'sync_complete? is true, there is no presence flag, and the presence channel is considered synced immediately (#RTP1)' do + flag_checked = false + + anonymous_client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :attached + flag_checked = true + expect(protocol_message.has_presence_flag?).to eql(false) + end + end + channel_anonymous_client.attach do expect(channel_anonymous_client.presence).to be_sync_complete - stop_reactor + EventMachine.next_tick do + expect(flag_checked).to eql(true) + stop_reactor + end end end end context 'when attaching to a channel with members present' do - it 'is false and the presence channel will subsequently be synced' do + it 'sync_complete? is false, there is a presence flag, and the presence channel is subsequently synced (#RTP1)' do + flag_checked = false + + anonymous_client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :attached + flag_checked = true + expect(protocol_message.has_presence_flag?).to eql(true) + end + end + presence_client_one.enter do channel_anonymous_client.attach do expect(channel_anonymous_client.presence).to_not be_sync_complete channel_anonymous_client.presence.get(wait_for_sync: true) do expect(channel_anonymous_client.presence).to be_sync_complete - stop_reactor + EventMachine.next_tick do + expect(flag_checked).to eql(true) + stop_reactor + end end end end @@ -509,20 +679,20 @@ def presence_action(method_name, data) end end - context '250 existing (present) members on a channel (3 SYNC pages)' do - context 'requires at least 3 SYNC ProtocolMessages', em_timeout: 30 do - let(:enter_expected_count) { 250 } + context '201 existing (present) members on a channel (3 SYNC pages)' do + context 'requiring at least 3 SYNC ProtocolMessages', em_timeout: 40 do + let(:enter_expected_count) { 201 } let(:present) { [] } let(:entered) { [] } let(:sync_pages_received) { [] } let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } def setup_members_on(presence) - enter_expected_count.times do |index| + enter_expected_count.times do |indx| # 10 messages per second max rate on simulation accounts - rate = index.to_f / 10 + rate = indx.to_f / 10 EventMachine.add_timer(rate) do - presence.enter_client("client:#{index}") do |message| + presence.enter_client("client:#{indx}") do |message| entered << message next unless entered.count == enter_expected_count yield @@ -545,8 +715,47 @@ def setup_members_on(presence) end end + context 'and a member enters before the SYNC operation is complete' do + let(:enter_client_id) { random_str } + + it 'emits a :enter immediately and the member is :present once the sync is complete (#RTP2g)' do + setup_members_on(presence_client_one) do + member_entered = false + + anonymous_client.connect do + presence_anonymous_client.subscribe(:enter) do |member| + expect(member.client_id).to eql(enter_client_id) + member_entered = true + end + + presence_anonymous_client.get(wait_for_sync: true) do |members| + expect(members.find { |member| member.client_id == enter_client_id }.action).to eq(:present) + stop_reactor + end + + anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :sync + sync_pages_received << protocol_message + if sync_pages_received.count == 1 + enter_action = Ably::Models::PresenceMessage::ACTION.Enter + enter_member = Ably::Models::PresenceMessage.new( + 'id' => "#{client_one.connection.id}:#{random_str}:0", + 'clientId' => enter_client_id, + 'connectionId' => client_one.connection.id, + 'timestamp' => as_since_epoch(Time.now), + 'action' => enter_action + ) + presence_anonymous_client.__incoming_msgbus__.publish :presence, enter_member + end + end + end + end + end + end + end + context 'and a member leaves before the SYNC operation is complete' do - it 'emits :leave immediately as the member leaves' do + it 'emits :leave immediately as the member leaves (#RTP2f, #RTP2g)' do all_client_ids = enter_expected_count.times.map { |id| "client:#{id}" } setup_members_on(presence_client_one) do @@ -573,7 +782,7 @@ def setup_members_on(presence) if sync_pages_received.count == 1 leave_action = Ably::Models::PresenceMessage::ACTION.Leave leave_member = Ably::Models::PresenceMessage.new( - 'id' => "#{client_one.connection.id}-#{all_client_ids.first}:0", + 'id' => "#{client_one.connection.id}:#{all_client_ids.first}:0", 'clientId' => all_client_ids.first, 'connectionId' => client_one.connection.id, 'timestamp' => as_since_epoch(Time.now), @@ -587,7 +796,7 @@ def setup_members_on(presence) end end - it 'ignores presence events with timestamps prior to the current :present event in the MembersMap' do + it 'ignores presence events with timestamps / identifiers prior to the current :present event in the MembersMap (#RTP2c)' do started_at = Time.now setup_members_on(presence_client_one) do @@ -632,7 +841,7 @@ def setup_members_on(presence) end end - it 'does not emit :present after the :leave event has been emitted, and that member is not included in the list of members via #get with :wait_for_sync' do + it 'does not emit :present after the :leave event has been emitted, and that member is not included in the list of members via #get with :wait_for_sync (#RTP2f)' do left_client = 10 left_client_id = "client:#{left_client}" @@ -667,7 +876,7 @@ def setup_members_on(presence) channel_anonymous_client.attach do leave_action = Ably::Models::PresenceMessage::ACTION.Leave fake_leave_presence_message = Ably::Models::PresenceMessage.new( - 'id' => "#{client_one.connection.id}-#{left_client_id}:0", + 'id' => "#{client_one.connection.id}:#{left_client_id}:0", 'clientId' => left_client_id, 'connectionId' => client_one.connection.id, 'timestamp' => as_since_epoch(Time.now), @@ -683,9 +892,9 @@ def setup_members_on(presence) context '#get' do context 'with :wait_for_sync option set to true' do it 'waits until sync is complete', em_timeout: 30 do # allow for slow connections and lots of messages - enter_expected_count.times do |index| - EventMachine.add_timer(index / 10) do - presence_client_one.enter_client("client:#{index}") do |message| + enter_expected_count.times do |indx| + EventMachine.add_timer(indx / 10) do + presence_client_one.enter_client("client:#{indx}") do |message| entered << message next unless entered.count == enter_expected_count @@ -702,9 +911,9 @@ def setup_members_on(presence) context 'by default' do it 'it does not wait for sync', em_timeout: 30 do # allow for slow connections and lots of messages - enter_expected_count.times do |index| - EventMachine.add_timer(index / 10) do - presence_client_one.enter_client("client:#{index}") do |message| + enter_expected_count.times do |indx| + EventMachine.add_timer(indx / 10) do + presence_client_one.enter_client("client:#{indx}") do |message| entered << message next unless entered.count == enter_expected_count @@ -1378,6 +1587,18 @@ def connect_members_deferrables end end + context 'when a member enters and the presence map is updated' do + it 'adds the member as being :present (#RTP2d)' do + presence_client_one.enter do + presence_client_one.get do |members| + expect(members.count).to eq(1) + expect(members.first.action).to eq(:present) + stop_reactor + end + end + end + end + context 'with lots of members on different clients' do let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } let(:client_two) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } @@ -1386,9 +1607,9 @@ def connect_members_deferrables let(:total_members) { members_per_client * 2 } it 'returns a complete list of members on all clients' do - members_per_client.times do |index| - presence_client_one.enter_client("client_1:#{index}") - presence_client_two.enter_client("client_2:#{index}") + members_per_client.times do |indx| + presence_client_one.enter_client("client_1:#{indx}") + presence_client_two.enter_client("client_2:#{indx}") end presence_client_one.subscribe(:enter) do @@ -1747,9 +1968,9 @@ def connect_members_deferrables let(:sync_pages_received) { [] } let(:client_options) { default_options.merge(log_level: :error) } - it 'resumes the SYNC operation', em_timeout: 15 do - when_all(*members_count.times.map do |index| - presence_anonymous_client.enter_client("client:#{index}") + it 'resumes the SYNC operation (#RTP3)', em_timeout: 15 do + when_all(*members_count.times.map do |indx| + presence_anonymous_client.enter_client("client:#{indx}") end) do channel_client_two.attach do client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| diff --git a/spec/unit/models/presence_message_spec.rb b/spec/unit/models/presence_message_spec.rb index 7edd3416e..5c0927c79 100644 --- a/spec/unit/models/presence_message_spec.rb +++ b/spec/unit/models/presence_message_spec.rb @@ -504,4 +504,52 @@ end end end + + context '#shallow_clone' do + context 'with inherited attributes from ProtocolMessage' do + let(:protocol_message) { + Ably::Models::ProtocolMessage.new('id' => 'fooId', 'connectionId' => protocol_connection_id, 'action' => 1, 'timestamp' => protocol_message_timestamp) + } + let(:protocol_connection_id) { random_str } + let(:model) { subject.new({ 'action' => 2 }, protocol_message: protocol_message) } + + it 'creates a duplicate of the message without any ProtocolMessage dependency' do + clone = model.shallow_clone + expect(clone.id).to match(/fooId/) + expect(clone.connection_id).to eql(protocol_connection_id) + expect(as_since_epoch(clone.timestamp)).to eq(protocol_message_timestamp) + expect(clone.action).to eq(2) + end + end + + context 'with embedded attributes for all fields' do + let(:message_timestamp) { as_since_epoch(Time.now) + 100 } + let(:connection_id) { random_str } + let(:model) { subject.new({ 'action' => 3, 'id' => 'fooId', 'connectionId' => connection_id, 'timestamp' => message_timestamp }) } + + it 'creates a duplicate of the message without any ProtocolMessage dependency' do + clone = model.shallow_clone + expect(clone.id).to eql('fooId') + expect(clone.connection_id).to eql(connection_id) + expect(as_since_epoch(clone.timestamp)).to eq(message_timestamp) + expect(clone.action).to eq(3) + end + end + + context 'with new attributes passed in to the method' do + let(:protocol_message) { + Ably::Models::ProtocolMessage.new('id' => 'fooId', 'connectionId' => protocol_connection_id, 'action' => 1, 'timestamp' => protocol_message_timestamp) + } + let(:protocol_connection_id) { random_str } + let(:model) { subject.new({ 'action' => 2 }, protocol_message: protocol_message) } + + it 'creates a duplicate of the message without any ProtocolMessage dependency' do + clone = model.shallow_clone(id: 'newId', action: 1, timestamp: protocol_message_timestamp + 1000) + expect(clone.id).to match(/newId/) + expect(clone.connection_id).to eql(protocol_connection_id) + expect(as_since_epoch(clone.timestamp)).to eq(protocol_message_timestamp + 1000) + expect(clone.action).to eq(1) + end + end + end end