Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(IoT): Fixing a potential race condition in the timer ring queue #5461

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#import "AWSMQTTEncoder.h"
#import "AWSMQttTxFlow.h"
#import "AWSIoTMessage.h"
#import "AWSMQTTTimerRing.h"
#import "AWSIoTMessage+AWSMQTTMessage.h"

@interface AWSMQTTSession () <AWSMQTTDecoderDelegate,AWSMQTTEncoderDelegate> {
Expand Down Expand Up @@ -58,7 +59,7 @@ - (void)send:(AWSMQTTMessage*)msg;
- (UInt16)nextMsgId;

@property (strong,atomic) NSMutableArray* queue; //Queue to temporarily hold messages if encoder is busy sending another message
@property (strong,atomic) NSMutableArray* timerRing; // circular array of 60. Each element is a set that contains the messages that need to be retried.
@property (strong,atomic) AWSMQTTTimerRing* timerRing; // A collection of messages that need to be retried.
@property (nonatomic, strong) dispatch_queue_t drainSenderSerialQueue;
@property (nonatomic, strong) AWSMQTTEncoder* encoder; //Low level protocol handler that converts a message into out bound network data
@property (nonatomic, strong) AWSMQTTDecoder* decoder; //Low level protocol handler that converts in bound network data into a Message
Expand Down Expand Up @@ -103,11 +104,7 @@ - (id)initWithClientId:(NSString*)theClientId
txMsgId = 1;
txFlows = [[NSMutableDictionary alloc] init];
rxFlows = [[NSMutableDictionary alloc] init];
self.timerRing = [[NSMutableArray alloc] initWithCapacity:60];
int i;
for (i = 0; i < 60; i++) {
[self.timerRing addObject:[NSMutableSet new]];
}
self.timerRing = [[AWSMQTTTimerRing alloc] init];
serialQueue = dispatch_queue_create("com.amazon.aws.iot.test-queue", DISPATCH_QUEUE_SERIAL);
ticks = 0;
status = AWSMQTTSessionStatusCreated;
Expand Down Expand Up @@ -233,7 +230,7 @@ - (UInt16)publishDataAtLeastOnce:(NSData*)data
AWSMQttTxFlow *flow = [AWSMQttTxFlow flowWithMsg:msg
deadline:deadline];
[txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]];
[[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]];
[self.timerRing addMsgId:[NSNumber numberWithUnsignedInt:msgId] atTick:[flow deadline]];
AWSDDLogDebug(@"Published message %hu for QOS 1", msgId);
[self send:msg];
return msgId;
Expand Down Expand Up @@ -267,7 +264,7 @@ - (UInt16)publishDataExactlyOnce:(NSData*)data
AWSMQttTxFlow *flow = [AWSMQttTxFlow flowWithMsg:msg
deadline:(ticks + 60)];
[txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]];
[[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]];
[self.timerRing addMsgId:[NSNumber numberWithUnsignedInt:msgId] atTick:[flow deadline]];
[self send:msg];
return msgId;
}
Expand Down Expand Up @@ -299,7 +296,7 @@ - (void)timerHandler:(NSTimer*)theTimer {
dispatch_sync(serialQueue, ^{
ticks++;
});
NSEnumerator *e = [[[self.timerRing objectAtIndex:(ticks % 60)] allObjects] objectEnumerator];
NSEnumerator *e = [[self.timerRing allMsgIdsAtTick:ticks] objectEnumerator];
id msgId;

//Stay under the throttle here and move the work to the next tick if throttle is breached.
Expand All @@ -321,8 +318,8 @@ - (void)timerHandler:(NSTimer*)theTimer {
while ((msgId = [e nextObject])) {
AWSMQttTxFlow *flow = [txFlows objectForKey:msgId];
[flow setDeadline:((ticks +1) % 60)];
[[self.timerRing objectAtIndex:((ticks + 1) % 60)] addObject:msgId];
[[self.timerRing objectAtIndex:(ticks % 60)] removeObject:msgId];
[self.timerRing addMsgId:msgId atTick:(ticks + 1)];
[self.timerRing removeMsgId:msgId atTick:ticks];
}

if (count > 0 ) {
Expand Down Expand Up @@ -567,8 +564,8 @@ - (void)handlePuback:(AWSMQTTMessage*)msg {
if ([[flow msg] type] != AWSMQTTPublish || [[flow msg] qos] != 1) {
return;
}
[[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];

[self.timerRing removeMsgId:msgId atTick:[flow deadline]];
[txFlows removeObjectForKey:msgId];
AWSDDLogDebug(@"Removing msgID %@ from internal store for QOS1 guarantee", msgId);
[self.delegate session:self newAckForMessageId:msgId.unsignedShortValue];
Expand All @@ -594,10 +591,10 @@ - (void)handlePubrec:(AWSMQTTMessage*)msg {
}
msg = [AWSMQTTMessage pubrelMessageWithMessageId:[msgId unsignedIntValue]];
[flow setMsg:msg];
[[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];
[self.timerRing removeMsgId:msgId atTick:[flow deadline]];
[flow setDeadline:(ticks + 60)];
[[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:msgId];
[self.timerRing addMsgId:msgId atTick:[flow deadline]];

[self send:msg];
}

Expand Down Expand Up @@ -638,8 +635,8 @@ - (void)handlePubcomp:(AWSMQTTMessage*)msg {
if (flow == nil || [[flow msg] type] != AWSMQTTPubrel) {
return;
}
[[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];

[self.timerRing removeMsgId:msgId atTick:[flow deadline]];
[txFlows removeObjectForKey:msgId];

AWSDDLogDebug(@"Removing msgID %@ from internal store for QOS2 guarantee", msgId);
Expand Down
30 changes: 30 additions & 0 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// A copy of the License is located at
//
// http://aws.amazon.com/apache2.0
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
//

#import <Foundation/Foundation.h>

NS_ASSUME_NONNULL_BEGIN

/// A circular collection containing the messages that need to be retried at a given clock tick.
/// The maximum number of ticks is 60
@interface AWSMQTTTimerRing: NSObject

- (void)addMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick;
- (void)removeMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick;
- (NSArray<NSNumber *> *)allMsgIdsAtTick:(NSUInteger)tick;

@end

NS_ASSUME_NONNULL_END
60 changes: 60 additions & 0 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.m
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//
// Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// A copy of the License is located at
//
// http://aws.amazon.com/apache2.0
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
//

#import "AWSMQTTTimerRing.h"

@interface AWSMQTTTimerRing()

@property (nonatomic, strong) NSLock *lock;
// Array of 60, with each index being a tick, and its value a set containing the messages that need to be retried.
@property (strong,atomic) NSMutableArray<NSMutableSet *>* timerRing;

@end

@implementation AWSMQTTTimerRing

- (instancetype)init
{
self = [super init];
if (self) {
_lock = [[NSLock alloc] init];
_timerRing = [[NSMutableArray alloc] initWithCapacity:60];
int i;
for (i = 0; i < 60; i++) {
[_timerRing addObject:[NSMutableSet new]];
}
}
return self;
}
- (void)addMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick {
[self.lock lock];
[[self.timerRing objectAtIndex:(tick % 60)] addObject:msgId];
[self.lock unlock];
}

- (void)removeMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick {
[self.lock lock];
[[self.timerRing objectAtIndex:(tick % 60)] removeObject:msgId];
[self.lock unlock];
}

- (NSArray<NSNumber *> *)allMsgIdsAtTick:(NSUInteger)tick {
[self.lock lock];
NSArray<NSNumber *> *result = [[self.timerRing objectAtIndex:(tick % 60)] allObjects];
[self.lock unlock];
return result;
}

@end
8 changes: 8 additions & 0 deletions AWSiOSSDKv2.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@
5C1590172755727C00F88085 /* AWSCore.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = CE0D416D1C6A66E5006B91B5 /* AWSCore.framework */; };
5C1978DD2702364800F9C11E /* AWSLocationTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5C1978DC2702364800F9C11E /* AWSLocationTests.swift */; };
5C71F33F295672B8001183A4 /* guten_tag.wav in Resources */ = {isa = PBXBuildFile; fileRef = 5C71F33E295672B8001183A4 /* guten_tag.wav */; };
685AA2112CDA7843008EFC7B /* AWSMQTTTimerRing.h in Headers */ = {isa = PBXBuildFile; fileRef = 685AA20F2CDA7843008EFC7B /* AWSMQTTTimerRing.h */; };
685AA2122CDA7843008EFC7B /* AWSMQTTTimerRing.m in Sources */ = {isa = PBXBuildFile; fileRef = 685AA2102CDA7843008EFC7B /* AWSMQTTTimerRing.m */; };
687952932B8FE2C5001E8990 /* AWSDDLog+Optional.swift in Sources */ = {isa = PBXBuildFile; fileRef = 687952922B8FE2C5001E8990 /* AWSDDLog+Optional.swift */; };
6883619E2B72D1C200D74FF4 /* AWSS3PreSignedURLBuilderUnitTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6883619D2B72D1C200D74FF4 /* AWSS3PreSignedURLBuilderUnitTests.swift */; };
688361A12B73D25B00D74FF4 /* AWSIoTStreamThreadTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 688361A02B73D25B00D74FF4 /* AWSIoTStreamThreadTests.m */; };
Expand Down Expand Up @@ -3215,6 +3217,8 @@
5C1978DB2702364800F9C11E /* AWSLocationTests-Bridging-Header.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = "AWSLocationTests-Bridging-Header.h"; sourceTree = "<group>"; };
5C1978DC2702364800F9C11E /* AWSLocationTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSLocationTests.swift; sourceTree = "<group>"; };
5C71F33E295672B8001183A4 /* guten_tag.wav */ = {isa = PBXFileReference; lastKnownFileType = audio.wav; path = guten_tag.wav; sourceTree = "<group>"; };
685AA20F2CDA7843008EFC7B /* AWSMQTTTimerRing.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = AWSMQTTTimerRing.h; sourceTree = "<group>"; };
685AA2102CDA7843008EFC7B /* AWSMQTTTimerRing.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = AWSMQTTTimerRing.m; sourceTree = "<group>"; };
687952922B8FE2C5001E8990 /* AWSDDLog+Optional.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "AWSDDLog+Optional.swift"; sourceTree = "<group>"; };
6883619D2B72D1C200D74FF4 /* AWSS3PreSignedURLBuilderUnitTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSS3PreSignedURLBuilderUnitTests.swift; sourceTree = "<group>"; };
688361A02B73D25B00D74FF4 /* AWSIoTStreamThreadTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = AWSIoTStreamThreadTests.m; sourceTree = "<group>"; };
Expand Down Expand Up @@ -7244,6 +7248,8 @@
CE9DE6461C6A78D70060793F /* AWSMQTTSession.m */,
CE9DE6471C6A78D70060793F /* AWSMQttTxFlow.h */,
CE9DE6481C6A78D70060793F /* AWSMQttTxFlow.m */,
685AA20F2CDA7843008EFC7B /* AWSMQTTTimerRing.h */,
685AA2102CDA7843008EFC7B /* AWSMQTTTimerRing.m */,
);
path = MQTTSDK;
sourceTree = "<group>";
Expand Down Expand Up @@ -8455,6 +8461,7 @@
files = (
CE9DE6521C6A78D70060793F /* AWSIoTDataResources.h in Headers */,
CE9DE65A1C6A78D70060793F /* AWSIoTResources.h in Headers */,
685AA2112CDA7843008EFC7B /* AWSMQTTTimerRing.h in Headers */,
68EE1A6C2B713D8100B7CF41 /* AWSIoTStreamThread.h in Headers */,
CE9DE6231C6A78AF0060793F /* AWSIoT.h in Headers */,
CE9DE6561C6A78D70060793F /* AWSIoTManager.h in Headers */,
Expand Down Expand Up @@ -13398,6 +13405,7 @@
CE9DE66D1C6A78D70060793F /* AWSMQTTSession.m in Sources */,
CE9DE6551C6A78D70060793F /* AWSIoTDataService.m in Sources */,
0342776A269D185200379263 /* AWSIoTMessage+AWSMQTTMessage.m in Sources */,
685AA2122CDA7843008EFC7B /* AWSMQTTTimerRing.m in Sources */,
CE9DE66B1C6A78D70060793F /* AWSMQTTMessage.m in Sources */,
CE9DE65D1C6A78D70060793F /* AWSIoTService.m in Sources */,
68DD11872C5AF52B004E1C37 /* AWSIoTAtomicDictionary.m in Sources */,
Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

## Unreleased

-Features for next release
### Bug Fixes

- **AWSIoT**
- Fixing a potential race condition in the timer ring queue (#5461)

## 2.38.0

Expand Down
Loading