Skip to content

Commit

Permalink
convert some async loops into parallel loops
Browse files Browse the repository at this point in the history
If you use `await` inside a loop it makes the loop inherently serial.

If you omit the `await` however, the tasks will all start but the loop
will finish while the tasks are still being scheduled.

So, to make a set of tasks run in parallel but then have the
code block after the loop once all the tasks have been completed
you have to get an array of Promises (one for each iteration) and
then use `Promise.all()` to wait for those promises to be resolved.
Using `Array#map` is a convenient way to go from an array of inputs
to the require array of Promises.
  • Loading branch information
raybellis committed Feb 1, 2019
1 parent 07ae44d commit e7c2fad
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 49 deletions.
19 changes: 9 additions & 10 deletions src/node/db/GroupManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,19 @@ exports.deleteGroup = async function(groupID)
throw new customError("groupID does not exist", "apierror");
}

// iterate through all pads of this group and delete them
for (let padID in group.pads) {
let pad = await padManager.getPad(padID);
await pad.remove();
}
// iterate through all pads of this group and delete them (in parallel)
await Promise.all(Object.keys(group.pads).map(padID => {
return padManager.getPad(padID).then(pad => pad.remove());
}));

// iterate through group2sessions and delete all sessions
let group2sessions = await db.get("group2sessions:" + groupID);
let sessions = group2sessions ? group2sessions.sessionsIDs : [];
let sessions = group2sessions ? group2sessions.sessionsIDs : {};

// loop through all sessions and delete them
for (let session in sessions) {
await sessionManager.deleteSession(session);
}
// loop through all sessions and delete them (in parallel)
await Promise.all(Object.keys(sessions).map(session => {
return sessionManager.deleteSession(session);
}));

// remove group and group2sessions entry
await db.remove("group2sessions:" + groupID);
Expand Down
82 changes: 43 additions & 39 deletions src/node/handler/PadMessageHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ exports.updatePadClients = async function(pad)

// since all clients usually get the same set of changesets, store them in local cache
// to remove unnecessary roundtrip to the datalayer
// NB: note below possibly now accommodated via the change to promises/async
// TODO: in REAL world, if we're working without datalayer cache, all requests to revisions will be fired
// BEFORE first result will be landed to our cache object. The solution is to replace parallel processing
// via async.forEach with sequential for() loop. There is no real benefits of running this in parallel,
Expand Down Expand Up @@ -928,15 +929,16 @@ async function handleClientReady(client, message)
// get timestamp of latest revision needed for timeslider
let currentTime = await pad.getRevisionDate(pad.getHeadRevisionNumber());

// get all author data out of the database
for (let authorId of authors) {
try {
let author = await authorManager.getAuthor(authorId);
historicalAuthorData[authorId] = { name: author.name, colorId: author.colorId }; // Filter author attribs (e.g. don't send author's pads to all clients)
} catch (err) {
messageLogger.error("There is no author for authorId:", authorId);
}
}
// get all author data out of the database (in parallel)
await Promise.all(authors.map(authorId => {
return authorManager.getAuthor(authorId).then(author => {
if (!author) {
messageLogger.error("There is no author for authorId:", authorId);
} else {
historicalAuthorData[authorId] = { name: author.name, colorId: author.colorId }; // Filter author attribs (e.g. don't send author's pads to all clients)
}
});
}));

// glue the clientVars together, send them and tell the other clients that a new one is there

Expand Down Expand Up @@ -1162,45 +1164,47 @@ async function handleClientReady(client, message)
// notify all existing users about new user
client.broadcast.to(padIds.padId).json.send(messageToTheOtherUsers);

// Get sessions for this pad
// Get sessions for this pad and update them (in parallel)
roomClients = _getRoomClients(pad.id);
for (let roomClient of roomClients) {
await Promise.all(_getRoomClients(pad.id).map(async roomClient => {

// Jump over, if this session is the connection session
if (roomClient.id == client.id) {
continue;
return;
}

// Since sessioninfos might change while being enumerated, check if the
// sessionID is still assigned to a valid session
if (sessioninfos[roomClient.id] === undefined) {
continue;
return;
}

let author = sessioninfos[roomClient.id].author;

// get the authorname & colorId
let author = sessioninfos[roomClient.id].author;
let cached = historicalAuthorData[author];

// reuse previously created cache of author's data
let authorInfo = historicalAuthorData[author] || await authorManager.getAuthor(author);

// Send the new User a Notification about this other user
let msg = {
"type": "COLLABROOM",
"data": {
type: "USER_NEWINFO",
userInfo: {
"ip": "127.0.0.1",
"colorId": authorInfo.colorId,
"name": authorInfo.name,
"userAgent": "Anonymous",
"userId": author
let p = cached ? Promise.resolve(cached) : authorManager.getAuthor(author);

return p.then(authorInfo => {
// Send the new User a Notification about this other user
let msg = {
"type": "COLLABROOM",
"data": {
type: "USER_NEWINFO",
userInfo: {
"ip": "127.0.0.1",
"colorId": authorInfo.colorId,
"name": authorInfo.name,
"userAgent": "Anonymous",
"userId": author
}
}
}
};
};

client.json.send(msg);
}
client.json.send(msg);
});
}));
}
}

Expand Down Expand Up @@ -1448,16 +1452,16 @@ exports.padUsers = async function(padID) {
let padUsers = [];
let roomClients = _getRoomClients(padID);

for (let i = 0, n = roomClients.length; i < n; ++i) {
let roomClient = roomClients[i];

// iterate over all clients (in parallel)
await Promise.all(roomClients.map(async roomClient => {
let s = sessioninfos[roomClient.id];
if (s) {
let author = await authorManager.getAuthor(s.author);
author.id = s.author;
padUsers.push(author);
return authorManager.getAuthor(s.author).then(author => {
author.id = s.author;
padUsers.push(author);
});
}
}
}));

return { padUsers };
}
Expand Down

0 comments on commit e7c2fad

Please sign in to comment.