Skip to content

Commit

Permalink
Clean up subscriptions
Browse files Browse the repository at this point in the history
Hook in to AsyncStream’s termination notification mechanism, so that
when the user discards a subscription or cancels the task in which
they’re iterating over a subscription, we:

- remove this subscription from our internal data structures
- remove any corresponding ably-cocoa listeners that drive this
  subscription

I’m sure that there will turn out to be a bunch of wrong stuff that I’ve
done here, due to my still-shaky knowledge of concurrency stuff and
AsyncSequence best practices, but it’s a start.

Resolves #36.
  • Loading branch information
lawrence-forooghian committed Jan 8, 2025
1 parent 432fa11 commit 2554e1a
Show file tree
Hide file tree
Showing 22 changed files with 348 additions and 207 deletions.
100 changes: 37 additions & 63 deletions Example/AblyChatExample/Mocks/MockClients.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ actor MockRoom: Room {

var status: RoomStatus = .initialized

private var mockSubscriptions: [MockSubscription<RoomStatusChange>] = []
private let mockSubscriptions = MockSubscriptionStorage<RoomStatusChange>()

func attach() async throws {
print("Mock client attached to room with roomID: \(roomID)")
Expand All @@ -75,11 +75,9 @@ actor MockRoom: Room {
}

private func createSubscription() -> MockSubscription<RoomStatusChange> {
let subscription = MockSubscription<RoomStatusChange>(randomElement: {
mockSubscriptions.create(randomElement: {
RoomStatusChange(current: [.attached, .attached, .attached, .attached, .attaching(error: nil), .attaching(error: nil), .suspended(error: .createUnknownError())].randomElement()!, previous: .attaching(error: nil))
}, interval: 8)
mockSubscriptions.append(subscription)
return subscription
}

func onStatusChange(bufferingPolicy _: BufferingPolicy) async -> Subscription<RoomStatusChange> {
Expand All @@ -92,7 +90,7 @@ actor MockMessages: Messages {
let roomID: String
let channel: RealtimeChannelProtocol

private var mockSubscriptions: [MockSubscription<Message>] = []
private let mockSubscriptions = MockSubscriptionStorage<Message>()

init(clientID: String, roomID: String) {
self.clientID = clientID
Expand All @@ -101,7 +99,7 @@ actor MockMessages: Messages {
}

private func createSubscription() -> MockSubscription<Message> {
let subscription = MockSubscription<Message>(randomElement: {
mockSubscriptions.create(randomElement: {
Message(
serial: "\(Date().timeIntervalSince1970)",
action: .create,
Expand All @@ -113,8 +111,6 @@ actor MockMessages: Messages {
headers: [:]
)
}, interval: 3)
mockSubscriptions.append(subscription)
return subscription
}

func subscribe(bufferingPolicy _: BufferingPolicy) async -> MessageSubscription {
Expand All @@ -138,9 +134,7 @@ actor MockMessages: Messages {
metadata: params.metadata ?? [:],
headers: params.headers ?? [:]
)
for subscription in mockSubscriptions {
subscription.emit(message)
}
mockSubscriptions.emit(message)
return message
}

Expand All @@ -154,7 +148,7 @@ actor MockRoomReactions: RoomReactions {
let roomID: String
let channel: RealtimeChannelProtocol

private var mockSubscriptions: [MockSubscription<Reaction>] = []
private let mockSubscriptions = MockSubscriptionStorage<Reaction>()

init(clientID: String, roomID: String) {
self.clientID = clientID
Expand All @@ -163,7 +157,7 @@ actor MockRoomReactions: RoomReactions {
}

private func createSubscription() -> MockSubscription<Reaction> {
let subscription = MockSubscription<Reaction>(randomElement: {
mockSubscriptions.create(randomElement: {
Reaction(
type: ReactionType.allCases.randomElement()!.emoji,
metadata: [:],
Expand All @@ -173,8 +167,6 @@ actor MockRoomReactions: RoomReactions {
isSelf: false
)
}, interval: Double.random(in: 0.1 ... 0.5))
mockSubscriptions.append(subscription)
return subscription
}

func send(params: SendReactionParams) async throws {
Expand All @@ -186,9 +178,7 @@ actor MockRoomReactions: RoomReactions {
clientID: clientID,
isSelf: false
)
for subscription in mockSubscriptions {
subscription.emit(reaction)
}
mockSubscriptions.emit(reaction)
}

func subscribe(bufferingPolicy _: BufferingPolicy) -> Subscription<Reaction> {
Expand All @@ -205,7 +195,7 @@ actor MockTyping: Typing {
let roomID: String
let channel: RealtimeChannelProtocol

private var mockSubscriptions: [MockSubscription<TypingEvent>] = []
private let mockSubscriptions = MockSubscriptionStorage<TypingEvent>()

init(clientID: String, roomID: String) {
self.clientID = clientID
Expand All @@ -214,14 +204,12 @@ actor MockTyping: Typing {
}

private func createSubscription() -> MockSubscription<TypingEvent> {
let subscription = MockSubscription<TypingEvent>(randomElement: {
mockSubscriptions.create(randomElement: {
TypingEvent(currentlyTyping: [
MockStrings.names.randomElement()!,
MockStrings.names.randomElement()!,
])
}, interval: 2)
mockSubscriptions.append(subscription)
return subscription
}

func subscribe(bufferingPolicy _: BufferingPolicy) -> Subscription<TypingEvent> {
Expand All @@ -233,15 +221,11 @@ actor MockTyping: Typing {
}

func start() async throws {
for subscription in mockSubscriptions {
subscription.emit(TypingEvent(currentlyTyping: [clientID]))
}
mockSubscriptions.emit(TypingEvent(currentlyTyping: [clientID]))
}

func stop() async throws {
for subscription in mockSubscriptions {
subscription.emit(TypingEvent(currentlyTyping: []))
}
mockSubscriptions.emit(TypingEvent(currentlyTyping: []))
}

func onDiscontinuity(bufferingPolicy _: BufferingPolicy) -> Subscription<DiscontinuityEvent> {
Expand All @@ -253,24 +237,22 @@ actor MockPresence: Presence {
let clientID: String
let roomID: String

private var mockSubscriptions: [MockSubscription<PresenceEvent>] = []
private let mockSubscriptions = MockSubscriptionStorage<PresenceEvent>()

init(clientID: String, roomID: String) {
self.clientID = clientID
self.roomID = roomID
}

private func createSubscription() -> MockSubscription<PresenceEvent> {
let subscription = MockSubscription<PresenceEvent>(randomElement: {
mockSubscriptions.create(randomElement: {
PresenceEvent(
action: [.enter, .leave].randomElement()!,
clientID: MockStrings.names.randomElement()!,
timestamp: Date(),
data: nil
)
}, interval: 5)
mockSubscriptions.append(subscription)
return subscription
}

func get() async throws -> [PresenceMember] {
Expand Down Expand Up @@ -310,16 +292,14 @@ actor MockPresence: Presence {
}

private func enter(dataForEvent: PresenceData?) async throws {
for subscription in mockSubscriptions {
subscription.emit(
PresenceEvent(
action: .enter,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
mockSubscriptions.emit(
PresenceEvent(
action: .enter,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
}
)
}

func update() async throws {
Expand All @@ -331,16 +311,14 @@ actor MockPresence: Presence {
}

private func update(dataForEvent: PresenceData? = nil) async throws {
for subscription in mockSubscriptions {
subscription.emit(
PresenceEvent(
action: .update,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
mockSubscriptions.emit(
PresenceEvent(
action: .update,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
}
)
}

func leave() async throws {
Expand All @@ -352,16 +330,14 @@ actor MockPresence: Presence {
}

func leave(dataForEvent: PresenceData? = nil) async throws {
for subscription in mockSubscriptions {
subscription.emit(
PresenceEvent(
action: .leave,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
mockSubscriptions.emit(
PresenceEvent(
action: .leave,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
}
)
}

func subscribe(event _: PresenceEventType, bufferingPolicy _: BufferingPolicy) -> Subscription<PresenceEvent> {
Expand All @@ -382,7 +358,7 @@ actor MockOccupancy: Occupancy {
let roomID: String
let channel: RealtimeChannelProtocol

private var mockSubscriptions: [MockSubscription<OccupancyEvent>] = []
private let mockSubscriptions = MockSubscriptionStorage<OccupancyEvent>()

init(clientID: String, roomID: String) {
self.clientID = clientID
Expand All @@ -391,12 +367,10 @@ actor MockOccupancy: Occupancy {
}

private func createSubscription() -> MockSubscription<OccupancyEvent> {
let subscription = MockSubscription<OccupancyEvent>(randomElement: {
mockSubscriptions.create(randomElement: {
let random = Int.random(in: 1 ... 10)
return OccupancyEvent(connections: random, presenceMembers: Int.random(in: 0 ... random))
}, interval: 1)
mockSubscriptions.append(subscription)
return subscription
}

func subscribe(bufferingPolicy _: BufferingPolicy) async -> Subscription<OccupancyEvent> {
Expand Down
6 changes: 6 additions & 0 deletions Example/AblyChatExample/Mocks/MockSubscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ final class MockSubscription<T: Sendable>: Sendable, AsyncSequence {
randomElement()
})
}

func setOnTermination(_ onTermination: @escaping @Sendable () -> Void) {
continuation.onTermination = { _ in
onTermination()
}
}
}
41 changes: 41 additions & 0 deletions Example/AblyChatExample/Mocks/MockSubscriptionStorage.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Foundation

// This is copied from ably-chat’s internal class `SubscriptionStorage`.
class MockSubscriptionStorage<Element: Sendable>: @unchecked Sendable {
// We hold a weak reference to the subscriptions that we create, so that the subscriptions’ termination handlers get called when the user releases their final reference to the subscription.
private struct WeaklyHeldSubscription {
weak var subscription: MockSubscription<Element>?
}

/// Access must be synchronised via ``lock``.
private var subscriptions: [UUID: WeaklyHeldSubscription] = [:]
private let lock = NSLock()

// You must not call the `setOnTermination` method of a subscription returned by this function, as it will replace the termination handler set by this function.
func create(randomElement: @escaping @Sendable () -> Element, interval: Double) -> MockSubscription<Element> {
let subscription = MockSubscription<Element>(randomElement: randomElement, interval: interval)
let id = UUID()

lock.lock()
subscriptions[id] = .init(subscription: subscription)
lock.unlock()

subscription.setOnTermination { [weak self] in
self?.subscriptionDidTerminate(id: id)
}

return subscription
}

private func subscriptionDidTerminate(id: UUID) {
lock.lock()
subscriptions.removeValue(forKey: id)
lock.unlock()
}

func emit(_ element: Element) {
for subscription in subscriptions.values {
subscription.subscription?.emit(element)
}
}
}
Loading

0 comments on commit 2554e1a

Please sign in to comment.