Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update RTP2 for 0.9 #563

Merged
merged 16 commits into from
Feb 5, 2017
6 changes: 5 additions & 1 deletion Source/ARTPresenceMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

@class ARTPresenceMessage;
@class ARTErrorInfo;
@class ARTLog;

ART_ASSUME_NONNULL_BEGIN

Expand All @@ -26,7 +27,10 @@ ART_ASSUME_NONNULL_BEGIN
@property (readonly, nonatomic, assign) BOOL syncComplete;
@property (readonly, nonatomic, getter=getSyncInProgress) BOOL syncInProgress;

- (void)put:(ARTPresenceMessage *)message;
- (instancetype)init UNAVAILABLE_ATTRIBUTE;
- (instancetype)initWithLogger:(ARTLog *)logger;

- (BOOL)add:(ARTPresenceMessage *)message;
- (void)clean;

- (void)startSync;
Expand Down
68 changes: 63 additions & 5 deletions Source/ARTPresenceMap.m
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#import "ARTPresenceMap.h"
#import "ARTPresenceMessage.h"
#import "ARTEventEmitter.h"
#import "ARTLog.h"

typedef NS_ENUM(NSUInteger, ARTPresenceSyncState) {
ARTPresenceSyncStarted, //ItemType: nil
Expand All @@ -25,11 +26,14 @@ @interface ARTPresenceMap () {

@end

@implementation ARTPresenceMap
@implementation ARTPresenceMap {
__weak ARTLog *_logger;
}

- (id)init {
- (instancetype)initWithLogger:(ARTLog *)logger {
self = [super init];
if(self) {
_logger = logger;
_recentMembers = [NSMutableDictionary dictionary];
_syncStarted = false;
_syncComplete = false;
Expand All @@ -42,11 +46,65 @@ - (id)init {
return self.recentMembers;
}

- (void)put:(ARTPresenceMessage *)message {
- (BOOL)add:(ARTPresenceMessage *)message {
ARTPresenceMessage *latest = [self.recentMembers objectForKey:message.clientId];
if (!latest || !message.timestamp || [latest.timestamp timeIntervalSince1970] <= [message.timestamp timeIntervalSince1970]) {
[self.recentMembers setObject:message forKey:message.clientId];
if ([self isNewestPresence:message comparingWith:latest]) {
ARTPresenceMessage *messageCopy = [message copy];
switch (message.action) {
case ARTPresenceEnter:
case ARTPresenceUpdate:
messageCopy.action = ARTPresencePresent;
break;
case ARTPresenceLeave:
if (self.syncInProgress) {
messageCopy.action = ARTPresenceAbsent;
}
break;
default:
break;
}
[self.recentMembers setObject:messageCopy forKey:message.clientId];
return YES;
}
return NO;
}

- (BOOL)isNewestPresence:(nonnull ARTPresenceMessage *)received comparingWith:(ARTPresenceMessage *)latest __attribute__((warn_unused_result)) {
if (latest == nil) {
return YES;
}

NSArray<NSString *> *receivedMessageIdParts = [received.id componentsSeparatedByCharactersInSet:[NSCharacterSet characterSetWithCharactersInString:@":"]];
if (receivedMessageIdParts.count != 3) {
[_logger error:@"Received presence message id is invalid %@", received.id];
return !received.timestamp ||
[latest.timestamp timeIntervalSince1970] <= [received.timestamp timeIntervalSince1970];
}
NSString *receivedConnectionId = [receivedMessageIdParts objectAtIndex:0];
NSInteger receivedMsgSerial = [[receivedMessageIdParts objectAtIndex:1] integerValue];
NSInteger receivedIndex = [[receivedMessageIdParts objectAtIndex:2] integerValue];

if ([receivedConnectionId isEqualToString:received.connectionId]) {
NSArray<NSString *> *latestRegisteredIdParts = [latest.id componentsSeparatedByCharactersInSet:[NSCharacterSet characterSetWithCharactersInString:@":"]];
if (latestRegisteredIdParts.count != 3) {
[_logger error:@"Latest registered presence message id is invalid %@", latest.id];
return !received.timestamp ||
[latest.timestamp timeIntervalSince1970] <= [received.timestamp timeIntervalSince1970];
}
NSInteger latestRegisteredMsgSerial = [[latestRegisteredIdParts objectAtIndex:1] integerValue];
NSInteger latestRegisteredIndex = [[latestRegisteredIdParts objectAtIndex:2] integerValue];

if (receivedMsgSerial > latestRegisteredMsgSerial) {
return YES;
}
else if (receivedMsgSerial == latestRegisteredMsgSerial && receivedIndex > latestRegisteredIndex) {
return YES;
}
return NO;
}

return !received.timestamp ||
[latest.timestamp timeIntervalSince1970] <= [received.timestamp timeIntervalSince1970];
}

- (void)clean {
Expand Down
2 changes: 2 additions & 0 deletions Source/ARTPresenceMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ typedef NS_ENUM(NSUInteger, ARTPresenceAction) {

- (NSString *)memberKey;

- (BOOL)isEqualToPresenceMessage:(ARTPresenceMessage *)presence;

@end
29 changes: 29 additions & 0 deletions Source/ARTPresenceMessage.m
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,33 @@ - (NSString *)memberKey {
return [NSString stringWithFormat:@"%@:%@", self.connectionId, self.clientId];
}

- (BOOL)isEqualToPresenceMessage:(ARTPresenceMessage *)presence {
if (!presence) {
return NO;
}

BOOL haveEqualConnectionId = (!self.connectionId && !presence.connectionId) || [self.connectionId isEqualToString:presence.connectionId];
BOOL haveEqualCliendId = (!self.clientId && !presence.clientId) || [self.clientId isEqualToString:presence.clientId];

return haveEqualConnectionId && haveEqualCliendId;
}

#pragma mark - NSObject

- (BOOL)isEqual:(id)object {
if (self == object) {
return YES;
}

if (![object isKindOfClass:[ARTPresenceMessage class]]) {
return NO;
}

return [self isEqualToPresenceMessage:(ARTPresenceMessage *)object];
}

- (NSUInteger)hash {
return [self.connectionId hash] ^ [self.clientId hash];
}

@end
25 changes: 15 additions & 10 deletions Source/ARTRealtimeChannel.m
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ - (instancetype)initWithRealtime:(ARTRealtime *)realtime andName:(NSString *)nam
_state = ARTRealtimeChannelInitialized;
_queuedMessages = [NSMutableArray array];
_attachSerial = nil;
_presenceMap = [[ARTPresenceMap alloc] init];
_presenceMap = [[ARTPresenceMap alloc] initWithLogger:self.logger];
_lastPresenceAction = ARTPresenceAbsent;

_statesEventEmitter = [[ARTEventEmitter alloc] init];
Expand All @@ -64,6 +64,10 @@ + (instancetype)channelWithRealtime:(ARTRealtime *)realtime andName:(NSString *)
return [[ARTRealtimeChannel alloc] initWithRealtime:realtime andName:name withOptions:options];
}

- (ARTLog *)getLogger {
return _realtime.logger;
}

- (ARTRealtimePresence *)getPresence {
if (!_realtimePresence) {
_realtimePresence = [[ARTRealtimePresence alloc] initWithChannel:self];
Expand All @@ -85,8 +89,8 @@ - (void)internalPostMessages:(id)data callback:(void (^)(ARTErrorInfo *__art_nul
}

- (void)requestContinueSync {
[self.logger info:@"R:%p C:%p ARTRealtime requesting to continue sync operation after reconnect", _realtime, self];
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p ARTRealtime requesting to continue sync operation after reconnect", _realtime, self];

ARTProtocolMessage * msg = [[ARTProtocolMessage alloc] init];
msg.action = ARTProtocolMessageSync;
msg.msgSerial = self.presenceMap.syncMsgSerial;
Expand Down Expand Up @@ -541,12 +545,12 @@ - (void)onPresence:(ARTProtocolMessage *)message {
presence.id = [NSString stringWithFormat:@"%@:%d", message.id, i];
}

[self.presenceMap onceSyncEnds:^(__GENERIC(NSArray, ARTPresenceMessage *) *msgs) {
[self.presenceMap put:presence];
[self.presenceMap clean];

if ([self.presenceMap add:presence]) {
[self broadcastPresence:presence];
}];
}
if (!self.presenceMap.syncInProgress) {
[self.presenceMap clean];
}

++i;
}
Expand All @@ -561,8 +565,9 @@ - (void)onSync:(ARTProtocolMessage *)message {

for (int i=0; i<[message.presence count]; i++) {
ARTPresenceMessage *presence = [message.presence objectAtIndex:i];
[self.presenceMap put:presence];
[self broadcastPresence:presence];
if ([self.presenceMap add:presence]) {
[self broadcastPresence:presence];
}
}

if ([self isLastChannelSerial:message.channelSerial]) {
Expand Down
8 changes: 4 additions & 4 deletions Spec/RealtimeClientChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class RealtimeClientChannel: QuickSpec {
it("should process all incoming messages and presence messages as soon as a Channel becomes attached") {
let options = AblyTests.commonAppSetup()
let client1 = AblyTests.newRealtime(options)
defer { client1.close() }
defer { client1.dispose(); client1.close() }
let channel1 = client1.channels.get("room")

waitUntil(timeout: testTimeout) { done in
Expand All @@ -31,7 +31,7 @@ class RealtimeClientChannel: QuickSpec {

options.clientId = "Client 2"
let client2 = AblyTests.newRealtime(options)
defer { client2.close() }
defer { client2.dispose(); client2.close() }
let channel2 = client2.channels.get(channel1.name)

channel2.subscribe("Client 1") { message in
Expand Down Expand Up @@ -68,12 +68,12 @@ class RealtimeClientChannel: QuickSpec {

expect(channel1.presenceMap.members).toEventually(haveCount(2), timeout: testTimeout)
expect(channel1.presenceMap.members).to(allKeysPass({ $0.hasPrefix("Client") }))
expect(channel1.presenceMap.members).to(allValuesPass({ $0.action == .Enter }))
expect(channel1.presenceMap.members).to(allValuesPass({ $0.action == .Present }))

expect(channel2.presenceMap.members).toEventually(haveCount(2), timeout: testTimeout)
expect(channel2.presenceMap.members).to(allKeysPass({ $0.hasPrefix("Client") }))
expect(channel2.presenceMap.members["Client 1"]!.action).to(equal(ARTPresenceAction.Present))
expect(channel2.presenceMap.members["Client 2"]!.action).to(equal(ARTPresenceAction.Enter))
expect(channel2.presenceMap.members["Client 2"]!.action).to(equal(ARTPresenceAction.Present))
}

// RTL2
Expand Down
Loading