Skip to content

Commit

Permalink
Reverted email analytics jobs commits (#20835)
Browse files Browse the repository at this point in the history
ref https://linear.app/tryghost/issue/ENG-1518

After releasing the analytics job improvements, it appears for large
sites we're awfully close to missing some Mailgun events because of an
unexpected behavior of the aggregateStats call for just the opened
events job. This is taking 2-5x(+) the amount of time that the aggregate
queries take for the other jobs, despite not being dependent on the
events.

To err on the side of caution, we're going to roll this back and look to
optimize the aggregation queries before re-implementing. And we may be a
bit more cautious in giving _some_ but not _all_ priority to the
`opened` events.
  • Loading branch information
9larsons committed Aug 27, 2024
1 parent d5bac91 commit ae15e12
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 1,036 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,19 @@ class EmailAnalyticsServiceWrapper {
});
}

async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest opened events started');
async fetchLatest({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatestOpenedEvents({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest opens)`);
return totalEvents;
}

async fetchLatestNonOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest non-opened events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatestNonOpenedEvents({maxEvents});
const totalEvents = await this.service.fetchLatest({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest)`);
return totalEvents;
}

async fetchMissing({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch missing events started');
logging.info('[EmailAnalytics] Fetch missing started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchMissing({maxEvents});
Expand All @@ -94,7 +83,7 @@ class EmailAnalyticsServiceWrapper {
if (maxEvents < 300) {
return 0;
}
logging.info('[EmailAnalytics] Fetch scheduled events started');
logging.info('[EmailAnalytics] Fetch scheduled started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchScheduled({maxEvents});
Expand All @@ -111,31 +100,13 @@ class EmailAnalyticsServiceWrapper {
}
this.fetching = true;

// NOTE: Data shows we can process ~7500 events per minute on Pro; this can vary locally
try {
// Prioritize opens since they are the most important (only data directly displayed to users)
await this.fetchLatestOpenedEvents({maxEvents: Infinity});

// Set limits on how much we fetch without checkings for opened events. During surge events (following newsletter send)
// we want to make sure we don't spend too much time collecting delivery data.
const c1 = await this.fetchLatestNonOpenedEvents({maxEvents: 20000});
if (c1 > 15000) {
this.fetching = false;
logging.info('[EmailAnalytics] Restarting fetch due to high event count');
this.startFetch();
return;
}
const c2 = await this.fetchMissing({maxEvents: 20000});
if ((c1 + c2) > 15000) {
this.fetching = false;
logging.info('[EmailAnalytics] Restarting fetch due to high event count');
this.startFetch();
return;
}
const c1 = await this.fetchLatest({maxEvents: Infinity});
const c2 = await this.fetchMissing({maxEvents: Infinity});

// Only fetch scheduled if we didn't fetch a lot of normal events
await this.fetchScheduled({maxEvents: 20000 - c1 - c2});

this.fetching = false;
} catch (e) {
logging.error(e, 'Error while fetching email analytics');
Expand Down
138 changes: 21 additions & 117 deletions ghost/core/core/server/services/email-analytics/lib/queries.js
Original file line number Diff line number Diff line change
@@ -1,139 +1,43 @@
const _ = require('lodash');
const debug = require('@tryghost/debug')('services:email-analytics');
const db = require('../../../data/db');
const logging = require('@tryghost/logging');
const {default: ObjectID} = require('bson-objectid');

const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5;

/** @typedef {'email-analytics-latest-opened'|'email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-scheduled'} EmailAnalyticsJobName */
/** @typedef {'delivered'|'opened'|'failed'} EmailAnalyticsEvent */

module.exports = {
async shouldFetchStats() {
// don't fetch stats from Mailgun if we haven't sent any emails
const [emailCount] = await db.knex('emails').count('id as count');
return emailCount && emailCount.count > 0;
},

/**
* Retrieves the timestamp of the last seen event for the specified email analytics events.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
* @param {EmailAnalyticsEvent[]} [events=['delivered', 'opened', 'failed']] - The email analytics events to consider.
* @returns {Promise<Date|null>} The timestamp of the last seen event, or null if no events are found.
*/
async getLastEventTimestamp(jobName, events = ['delivered', 'opened', 'failed']) {
async getLastSeenEventTimestamp() {
const startDate = new Date();

let maxOpenedAt;
let maxDeliveredAt;
let maxFailedAt;

const jobData = await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first();

if (jobData) {
debug(`Using job data for ${jobName}`);
const lastJobTimestamp = jobData.finished_at || jobData.started_at;
maxOpenedAt = events.includes('opened') ? lastJobTimestamp : null;
maxDeliveredAt = events.includes('delivered') ? lastJobTimestamp : null;
maxFailedAt = events.includes('failed') ? lastJobTimestamp : null;
} else {
debug(`Job data not found for ${jobName}, using email_recipients data`);
logging.info(`Job data not found for ${jobName}, using email_recipients data`);
if (events.includes('opened')) {
maxOpenedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first()).maxOpenedAt;
}
if (events.includes('delivered')) {
maxDeliveredAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first()).maxDeliveredAt;
}
if (events.includes('failed')) {
maxFailedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt;
}

// Insert a new job row if it doesn't exist
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
started_at: new Date(),
created_at: new Date(),
status: 'started'
}).onConflict('name').ignore();
}

// Convert string dates to Date objects for SQLite compatibility
[maxOpenedAt, maxDeliveredAt, maxFailedAt] = [maxOpenedAt, maxDeliveredAt, maxFailedAt].map(date => (
date && !(date instanceof Date) ? new Date(date) : date
));

const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
// three separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
let {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
let {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
let {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};

return lastSeenEventTimestamp;
},
if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxDeliveredAt = new Date(maxDeliveredAt);
}

/**
* Sets the timestamp of the last seen event for the specified email analytics events.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
* @param {'completed'|'started'} field - The field to update.
* @param {Date} date - The timestamp of the last seen event.
* @returns {Promise<void>}
* @description
* Updates the `finished_at` or `started_at` column of the specified job in the `jobs` table with the provided timestamp.
* This is used to keep track of the last time the job was run to avoid expensive queries following reboot.
*/
async setJobTimestamp(jobName, field, date) {
// Convert string dates to Date objects for SQLite compatibility
try {
debug(`Setting ${field} timestamp for job ${jobName} to ${date}`);
const updateField = field === 'completed' ? 'finished_at' : 'started_at';
const status = field === 'completed' ? 'finished' : 'started';
const result = await db.knex('jobs').update({[updateField]: date, updated_at: new Date(), status: status}).where('name', jobName);
if (result === 0) {
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
[updateField]: date,
updated_at: date,
status: status
});
}
} catch (err) {
debug(`Error setting ${field} timestamp for job ${jobName}: ${err.message}`);
if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxOpenedAt = new Date(maxOpenedAt);
}
},

/**
* Sets the status of the specified email analytics job.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
* @param {'started'|'finished'|'failed'} status - The new status of the job.
* @returns {Promise<void>}
* @description
* Updates the `status` column of the specified job in the `jobs` table with the provided status.
* This is used to keep track of the current state of the job.
*/
async setJobStatus(jobName, status) {
debug(`Setting status for job ${jobName} to ${status}`);
try {
const result = await db.knex('jobs')
.update({
status: status,
updated_at: new Date()
})
.where('name', jobName);

if (result === 0) {
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
status: status,
created_at: new Date(),
updated_at: new Date()
});
}
} catch (err) {
debug(`Error setting status for job ${jobName}: ${err.message}`);
throw err;
if (maxFailedAt && !(maxFailedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxFailedAt = new Date(maxFailedAt);
}

const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);

return lastSeenEventTimestamp;
},

async aggregateEmailStats(emailId) {
Expand Down Expand Up @@ -174,4 +78,4 @@ module.exports = {
.update(updateQuery)
.where('id', memberId);
}
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('EmailEventStorage', function () {
before(async function () {
// Stub queries before boot
const queries = require('../../../../core/server/services/email-analytics/lib/queries');
sinon.stub(queries, 'getLastEventTimestamp').callsFake(async function () {
sinon.stub(queries, 'getLastSeenEventTimestamp').callsFake(async function () {
// This is required because otherwise the last event timestamp will be now, and that is too close to NOW to start fetching new events
return new Date(2000, 0, 1);
});
Expand Down Expand Up @@ -78,7 +78,7 @@ describe('EmailEventStorage', function () {

// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const result = await emailAnalytics.fetchLatestNonOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -125,7 +125,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('delivered_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestNonOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -170,7 +170,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('opened_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -250,7 +250,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('delivered_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -346,7 +346,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('delivered_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -439,7 +439,7 @@ describe('EmailEventStorage', function () {
assert.notEqual(initialModel.get('failed_at'), null, 'This test requires a failed email recipient');

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -529,7 +529,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('failed_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -645,7 +645,7 @@ describe('EmailEventStorage', function () {
assert.equal(initialModel.get('failed_at'), null);

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -747,7 +747,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -849,7 +849,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -951,7 +951,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -1015,7 +1015,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -1074,7 +1074,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);

// Since this is all event based we should wait for all dispatched events to be completed.
Expand Down Expand Up @@ -1118,7 +1118,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 1);
});

Expand All @@ -1132,7 +1132,7 @@ describe('EmailEventStorage', function () {
}];

// Fire event processing
const result = await emailAnalytics.fetchLatestOpenedEvents();
const result = await emailAnalytics.fetchLatest();
assert.equal(result, 0);
});
});
Loading

0 comments on commit ae15e12

Please sign in to comment.