Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cleaning progress #1989

Merged
merged 6 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/src/services/email-status/reacher/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ interface ValidationOptions {
}

export default class ReacherClient {
private static readonly SINGLE_VERIFICATION_PATH = '/v0/check_email';
private static readonly SINGLE_VERIFICATION_PATH = '/v1/check_email';

private static readonly BULK_VERIFICATION_PATH = '/v0/bulk';

Expand Down Expand Up @@ -202,7 +202,7 @@ export default class ReacherClient {
},
{ signal: abortSignal }
);
return data;
return { ...data, input: email };
} catch (error) {
logError(error, `[Reacher:checkSingleEmail:${email}]`, this.logger);
throw error;
Expand Down
55 changes: 19 additions & 36 deletions backend/src/services/tasks-manager/TasksManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
RedisCommand,
StreamInfo,
Task,
TaskExtract,
TaskFetch,
TaskProgress,
TaskProgressType
Expand Down Expand Up @@ -369,30 +368,6 @@ export default class TasksManager {
await Promise.all(stopPromises);
}

private static getEventName(
miningId: string,
progressType: TaskProgressType,
progress: TaskProgress,
fetch: TaskFetch,
extract: TaskExtract
) {
let eventName = `${progressType}-${miningId}`;

// If the fetching is completed, notify the clients that it has finished.
if (progressType === 'fetched' && fetch.stoppedAt) {
eventName = 'fetching-finished';
}

if (
progressType === 'extracted' &&
fetch.stoppedAt &&
(progress.extracted >= progress.fetched || extract.stoppedAt)
) {
eventName = 'extraction-finished';
}
return eventName;
}

/**
* Notifies the client of the progress of a mining task with a given mining ID.
*
Expand All @@ -402,7 +377,8 @@ export default class TasksManager {
*/
private notifyChanges(
miningId: string,
progressType: TaskProgressType
progressType: TaskProgressType,
event: string | null = null
): void {
const task = this.ACTIVE_MINING_TASKS.get(miningId);

Expand All @@ -418,14 +394,7 @@ export default class TasksManager {
};

const value = progress[`${progressType}`];
const eventName = TasksManager.getEventName(
miningId,
progressType,
progress,
fetch,
extract
);

const eventName = event ?? `${progressType}-${miningId}`;
// Send the progress to parties subscribed on SSE
progressHandlerSSE.sendSSE(value, eventName);
}
Expand Down Expand Up @@ -495,6 +464,7 @@ export default class TasksManager {

if (!fetch.stoppedAt && fetch.instance.isCompleted) {
await this.stopTask([fetch]);
this.notifyChanges(task.miningId, 'fetched', 'fetching-finished');
}

if (
Expand All @@ -503,15 +473,28 @@ export default class TasksManager {
progress.extracted >= progress.fetched
) {
await this.stopTask([extract]);
this.notifyChanges(task.miningId, 'extracted', 'extracting-finished');
}

if (
!clean.stoppedAt &&
extract.stoppedAt &&
progress.verifiedContacts >= progress.createdContacts
) {
await this.stopTask([clean]);
this.notifyChanges(
task.miningId,
'verifiedContacts',
'cleaning-finished'
);
}

const status =
fetch.stoppedAt !== undefined &&
extract.stoppedAt !== undefined &&
progress.verifiedContacts >= progress.createdContacts;
clean.stoppedAt !== undefined;

if (status) {
await this.stopTask([clean]);
try {
await this.deleteTask(miningId, null);
} catch (error) {
Expand Down
211 changes: 113 additions & 98 deletions backend/src/workers/email-verification/emailVerificationHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Contacts } from '../../db/interfaces/Contacts';
import EmailStatusCache from '../../services/cache/EmailStatusCache';
import {
EmailStatusResult,
EmailVerifierType
EmailStatusVerifier
} from '../../services/email-status/EmailStatusVerifier';
import EmailStatusVerifierFactory from '../../services/email-status/EmailStatusVerifierFactory';
import logger from '../../utils/logger';
Expand All @@ -13,6 +13,48 @@ export interface EmailVerificationData {
miningId: string;
}

function logRejectedAndReturnResolved<T>(
results: PromiseSettledResult<T>[],
context: string
): T[] {
results
.filter(
(result): result is PromiseRejectedResult => result.status === 'rejected'
)
.forEach((result) => {
logger.error(`${context}: Promise rejected`, {
error: result.reason,
timestamp: new Date().toISOString()
});
});

return results
.filter(
(result): result is PromiseFulfilledResult<T> =>
result.status === 'fulfilled'
)
.map((result) => result.value);
}

async function ThrottleAsyncVerification(
batchSize: number,
emails: string[],
verifier: EmailStatusVerifier
) {
const results: EmailStatusResult[] = [];
for (let i = 0; i < emails.length; i += batchSize) {
const batch = emails.slice(i, i + batchSize);
results.push(
...logRejectedAndReturnResolved<EmailStatusResult>(
// eslint-disable-next-line no-await-in-loop
await Promise.allSettled(batch.map((email) => verifier.verify(email))),
'Async email status verification'
)
);
}

return results;
}
/**
* Performs verification either in bulk or single checks on the provided emails,
* then caches and writes the results to the database.
Expand All @@ -31,130 +73,103 @@ async function emailVerificationHandlerWithBulk(
const waitingForVerification = new Map();

try {
for (const { userId, email } of verificationData) {
try {
const existingStatus =
// eslint-disable-next-line no-await-in-loop
(await emailStatusCache.get(email)) ??
// eslint-disable-next-line no-await-in-loop
(await contacts.SelectRecentEmailStatus(email));

if (existingStatus) {
logger.debug('Updating email status from cache.', {
email,
verifier: 'CACHED',
result: existingStatus
});
// eslint-disable-next-line no-await-in-loop
await contacts.upsertEmailStatus({
verifiedOn: new Date().toISOString(),
...existingStatus, // overwrites verifiedOn if exists.
email,
userId
});
} else {
waitingForVerification.set(email, userId);
}
} catch (err) {
logger.error('Error updating email status from cache', err);
// update email status from cache
const cachePromises = verificationData.map(async ({ userId, email }) => {
const existingStatus =
(await emailStatusCache.get(email)) ??
(await contacts.SelectRecentEmailStatus(email));

if (!existingStatus) {
waitingForVerification.set(email, userId);
return;
}
}

logger.debug('Cached result', {
email,
verifier: 'CACHED',
result: existingStatus
});

await contacts.upsertEmailStatus({
verifiedOn: new Date().toISOString(),
...existingStatus, // overwrites verifiedOn if exists.
email,
userId
});
});

logRejectedAndReturnResolved(
await Promise.allSettled(cachePromises),
'Updating email status from cache'
);

const verifiers = emailStatusVerifierFactory.getEmailVerifiers(
Array.from(waitingForVerification.keys())
);

const verificationResult = (
await Promise.allSettled(
Array.from(verifiers.entries()).map(
async ([verifierName, [verifier, emails]]) => {
const startTime = performance.now();
try {
logger.info(
`[${verifierName}]: Verification started with ${emails.length} email(s)`,
{ started_at: startTime }
);

const verified =
verifierName === 'mailercheck' && emails.length > 100
? await verifier.verifyMany(emails)
: (
await Promise.allSettled(
emails.map((email) => verifier.verify(email))
)
)
.filter(
(
promise
): promise is PromiseFulfilledResult<EmailStatusResult> =>
promise.status === 'fulfilled'
)
.flatMap((promise) => promise.value);

logger.info(
`[${verifierName}]: Verification completed with ${verified.length} results`,
{
started_at: startTime,
stopped_at: performance.now(),
duration: performance.now() - startTime
}
);

return { verifierName, verified };
} catch (error) {
logger.error(`[${verifierName}]: Verification failed`, { error });
throw error;
}
// Perform email status verification
const verificationPromises = Array.from(verifiers.entries()).map(
async ([verifierName, [verifier, emails]]) => {
const startTime = performance.now();

logger.info(
`[${verifierName}]: Verification started with ${emails.length} email(s)`,
{ started_at: startTime }
);

const verified =
verifierName === 'mailercheck' && emails.length > 100
? await verifier.verifyMany(emails)
: await ThrottleAsyncVerification(200, emails, verifier);

logger.info(
`[${verifierName}]: Verification completed with ${verified.length} results`,
{
started_at: startTime,
stopped_at: performance.now(),
duration: performance.now() - startTime
}
)
)
)
.filter(
(
promise
): promise is PromiseFulfilledResult<{
verifierName: EmailVerifierType;
verified: EmailStatusResult[];
}> => promise.status === 'fulfilled'
)
.flatMap((promise) => promise.value);
);

return { verifierName, verified };
}
);

const verificationResult = logRejectedAndReturnResolved(
await Promise.allSettled(verificationPromises),
'Email verification failed'
);

for (const { verifierName, verified } of verificationResult) {
for (const result of verified) {
// Update email status
const promises = verificationResult.flatMap(({ verified }) =>
verified.map(async (result) => {
const { email } = result;
const userId = waitingForVerification.get(email);

if (!userId) {
logger.warn('No userId found to update email status.', { email });
continue;
return;
}

try {
logger.debug('Updating email status', {
email,
result,
verifierName
});
// eslint-disable-next-line no-await-in-loop
await emailStatusCache.set(email, result);
// eslint-disable-next-line no-await-in-loop
emailStatusCache.set(email, result);
await contacts.upsertEmailStatus({
email,
userId,
status: result.status,
details: result.details,
verifiedOn: new Date().toISOString()
});
} catch (updateError) {
} catch (error) {
logger.error('Error updating email status', {
email,
result,
verifierName,
error: updateError
error: error instanceof Error ? error.message : String(error)
});
}
}
}
})
);

await Promise.allSettled(promises);
} catch (error) {
logger.error('Failed when processing message from the stream', error);
}
Expand Down
Loading
Loading