Skip to content

Commit

Permalink
Use correct next users with keys query
Browse files Browse the repository at this point in the history
  • Loading branch information
Anderas committed Mar 2, 2023
1 parent 29e3713 commit a266f2a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 12 deletions.
25 changes: 13 additions & 12 deletions MatrixSDK/Crypto/CryptoMachine/MXKeysQueryScheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,22 @@ public actor MXKeysQueryScheduler<Response> {
private func currentOrNextQuery(users: Set<String>) -> Task<Response, Error> {
if let currentQuery = currentQuery {
if currentQuery.contains(users: users) {
log("... query already running")
log("... query with the same user(s) already running")

return currentQuery.task

} else {
log("... queueing users for the next query")
log("... queueing user(s) for the next query")

nextUsers = nextUsers.union(users)

let task = nextTask ?? Task {
// Next task needs to await to completion of the currently running task
let _ = await currentQuery.task.result

// Extract and reset next users
let users = nextUsers
nextUsers = []
// At this point the previous query has already changed `self.currentQuery`
// to `next`, so we can extract its users to execute
let users = self.currentQuery?.users ?? []

// Only then we can execute the actual work
return try await executeQuery(users: users)
Expand All @@ -81,28 +81,29 @@ public actor MXKeysQueryScheduler<Response> {
}

} else {
log("... query starting")

let task = Task {
// Since we do not have any task running we can execute work right away
try await executeQuery(users: users)
}
currentQuery = .init(users: users, task: task)
currentQuery = Query(users: users, task: task)
return task
}
}

private func executeQuery(users: Set<String>) async throws -> Response {
defer {
if let next = nextTask {
log("... query completed, starting next pending query.")
currentQuery = .init(users: users, task: next)
if let nextTask = nextTask {
log("... query for \(users) completed, starting next pending query.")
currentQuery = Query(users: nextUsers, task: nextTask)
} else {
log("... query completed, no other queries scheduled.")
log("... query for \(users) completed, no other queries scheduled.")
currentQuery = nil
}
nextTask = nil
nextUsers = []
}

log("... query starting for \(users)")
return try await queryAction(Array(users))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {

// MARK: - Tests

// We assert a query like so:
// |-------------| (Alice)
func test_queryAlice() async {
let exp = expectation(description: "exp")

Expand All @@ -102,6 +104,8 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {
XCTAssertQueriesCount(1)
}

// We assert a query like so:
// |-----------------| (Alice, Bob)
func test_queryAliceAndBob() async {
let exp = expectation(description: "exp")

Expand All @@ -117,6 +121,9 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {
XCTAssertQueriesCount(1)
}

// We assert consecutive queries like so:
// |-------------| (Alice)
// |.....--------| (Bob)
func test_queryBobAfterAlice() async {
let exp = expectation(description: "exp")
exp.expectedFulfillmentCount = 2
Expand All @@ -139,6 +146,10 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {
XCTAssertQueriesCount(2)
}

// We assert parallel queries like so:
// |-------------| (Alice)
// |---------| (Alice)
// |----| (Alice)
func test_executeMultipleAliceQueriesOnce() async {
queryStartSpy = {
self.stubbedResult = .success([
Expand Down Expand Up @@ -178,6 +189,10 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {
XCTAssertQueriesCount(1)
}

// We assert consecutive queries like so:
// |-------------| (Alice)
// |-----------| (Alice)
// |---------| (Alice)
func test_executeEachAliceQuerySeparately() async {
queryStartSpy = {
self.stubbedResult = .success([
Expand Down Expand Up @@ -218,6 +233,9 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {
XCTAssertQueriesCount(3)
}

// We assert parallel queries like so:
// |-------------| (Alice, Bob)
// |---------| (Bob)
func test_executeMultipleBobQueriesOnce() async {
queryStartSpy = {
self.stubbedResult = .success([
Expand Down Expand Up @@ -252,6 +270,9 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {
XCTAssertQueriesCount(1)
}

// We assert consecutive queries like so:
// |-------------| (Alice, Bob)
// |.....--------| (Bob, Carol)
func test_executeSecondBobQuerySeparately() async {
queryStartSpy = {
self.stubbedResult = .success([
Expand Down Expand Up @@ -287,6 +308,11 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {
XCTAssertQueriesCount(2)
}

// We assert consecutive queries like so:
// |-------------| (Alice)
// |.........--------| + Bob
// |.....--------| + Carol
// |..--------| + David
func test_nextQueryAggregatesPendingUsers() async {
let exp = expectation(description: "exp")
exp.expectedFulfillmentCount = 4
Expand Down Expand Up @@ -331,6 +357,11 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {
XCTAssertQueriesCount(2)
}

// We assert consecutive queries like so:
// |-------------| (Alice)
// |.........--------| + Bob
// |.....--------| + Carol
// |--------| (David)
func test_pendingUsersResetAfterQuery() async {
var exp = expectation(description: "exp")
exp.expectedFulfillmentCount = 3
Expand Down Expand Up @@ -385,6 +416,45 @@ class MXKeysQuerySchedulerUnitTests: XCTestCase {
XCTAssertQueriesCount(4)
}

// We assert consecutive queries like so:
// |-------------| (Alice)
// |.........--------| (Bob)
// |----| (Bob)
func test_alreadyRunningQueriesGetUpdated() async {
let exp = expectation(description: "exp")
exp.expectedFulfillmentCount = 3

await query(users: ["alice"]) { response in
XCTAssertEqual(response, [
"alice": ["A"],
])
exp.fulfill()

// We need to trigger another Bob request after Alice has completed
Task.detached {
await self.query(users: ["bob"]) { response in
XCTAssertEqual(response, [
"bob": ["B"],
])
exp.fulfill()
}
}
}

await query(users: ["bob"]) { response in
XCTAssertEqual(response, [
"bob": ["B"],
])
exp.fulfill()
}

await waitForExpectations(timeout: 1)

// At the end of making 3 queries we expect to have executed only
// one per each user
XCTAssertQueriesCount(2)
}

func test_queryFail() async {
scheduler = MXKeysQueryScheduler { _ in
try! await Task.sleep(nanoseconds: 1_000_000)
Expand Down
1 change: 1 addition & 0 deletions changelog.d/pr-1726.change
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use correct next users with keys query

0 comments on commit a266f2a

Please sign in to comment.