diff --git a/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js b/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js index 823281f68c8..c8892f5809b 100644 --- a/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js +++ b/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js @@ -57,22 +57,11 @@ class EmailAnalyticsServiceWrapper { }); } - async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) { - logging.info('[EmailAnalytics] Fetch latest opened events started'); + async fetchLatest({maxEvents} = {maxEvents: Infinity}) { + logging.info('[EmailAnalytics] Fetch latest started'); const fetchStartDate = new Date(); - const totalEvents = await this.service.fetchLatestOpenedEvents({maxEvents}); - const fetchEndDate = new Date(); - - logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest opens)`); - return totalEvents; - } - - async fetchLatestNonOpenedEvents({maxEvents} = {maxEvents: Infinity}) { - logging.info('[EmailAnalytics] Fetch latest non-opened events started'); - - const fetchStartDate = new Date(); - const totalEvents = await this.service.fetchLatestNonOpenedEvents({maxEvents}); + const totalEvents = await this.service.fetchLatest({maxEvents}); const fetchEndDate = new Date(); logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest)`); @@ -80,7 +69,7 @@ class EmailAnalyticsServiceWrapper { } async fetchMissing({maxEvents} = {maxEvents: Infinity}) { - logging.info('[EmailAnalytics] Fetch missing events started'); + logging.info('[EmailAnalytics] Fetch missing started'); const fetchStartDate = new Date(); const totalEvents = await this.service.fetchMissing({maxEvents}); @@ -94,7 +83,7 @@ class EmailAnalyticsServiceWrapper { if (maxEvents < 300) { return 0; } - logging.info('[EmailAnalytics] Fetch scheduled events started'); + logging.info('[EmailAnalytics] Fetch scheduled started'); const fetchStartDate = new Date(); const totalEvents = await this.service.fetchScheduled({maxEvents}); @@ -111,31 +100,13 @@ class EmailAnalyticsServiceWrapper { } this.fetching = true; - // NOTE: Data shows we can process ~7500 events per minute on Pro; this can vary locally try { - // Prioritize opens since they are the most important (only data directly displayed to users) - await this.fetchLatestOpenedEvents({maxEvents: Infinity}); - - // Set limits on how much we fetch without checkings for opened events. During surge events (following newsletter send) - // we want to make sure we don't spend too much time collecting delivery data. - const c1 = await this.fetchLatestNonOpenedEvents({maxEvents: 20000}); - if (c1 > 15000) { - this.fetching = false; - logging.info('[EmailAnalytics] Restarting fetch due to high event count'); - this.startFetch(); - return; - } - const c2 = await this.fetchMissing({maxEvents: 20000}); - if ((c1 + c2) > 15000) { - this.fetching = false; - logging.info('[EmailAnalytics] Restarting fetch due to high event count'); - this.startFetch(); - return; - } + const c1 = await this.fetchLatest({maxEvents: Infinity}); + const c2 = await this.fetchMissing({maxEvents: Infinity}); // Only fetch scheduled if we didn't fetch a lot of normal events await this.fetchScheduled({maxEvents: 20000 - c1 - c2}); - + this.fetching = false; } catch (e) { logging.error(e, 'Error while fetching email analytics'); diff --git a/ghost/core/core/server/services/email-analytics/lib/queries.js b/ghost/core/core/server/services/email-analytics/lib/queries.js index 93185f62aaf..fbe2019fdc3 100644 --- a/ghost/core/core/server/services/email-analytics/lib/queries.js +++ b/ghost/core/core/server/services/email-analytics/lib/queries.js @@ -1,14 +1,9 @@ const _ = require('lodash'); const debug = require('@tryghost/debug')('services:email-analytics'); const db = require('../../../data/db'); -const logging = require('@tryghost/logging'); -const {default: ObjectID} = require('bson-objectid'); const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5; -/** @typedef {'email-analytics-latest-opened'|'email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-scheduled'} EmailAnalyticsJobName */ -/** @typedef {'delivered'|'opened'|'failed'} EmailAnalyticsEvent */ - module.exports = { async shouldFetchStats() { // don't fetch stats from Mailgun if we haven't sent any emails @@ -16,124 +11,33 @@ module.exports = { return emailCount && emailCount.count > 0; }, - /** - * Retrieves the timestamp of the last seen event for the specified email analytics events. - * @param {EmailAnalyticsJobName} jobName - The name of the job to update. - * @param {EmailAnalyticsEvent[]} [events=['delivered', 'opened', 'failed']] - The email analytics events to consider. - * @returns {Promise} The timestamp of the last seen event, or null if no events are found. - */ - async getLastEventTimestamp(jobName, events = ['delivered', 'opened', 'failed']) { + async getLastSeenEventTimestamp() { const startDate = new Date(); - - let maxOpenedAt; - let maxDeliveredAt; - let maxFailedAt; - - const jobData = await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first(); - - if (jobData) { - debug(`Using job data for ${jobName}`); - const lastJobTimestamp = jobData.finished_at || jobData.started_at; - maxOpenedAt = events.includes('opened') ? lastJobTimestamp : null; - maxDeliveredAt = events.includes('delivered') ? lastJobTimestamp : null; - maxFailedAt = events.includes('failed') ? lastJobTimestamp : null; - } else { - debug(`Job data not found for ${jobName}, using email_recipients data`); - logging.info(`Job data not found for ${jobName}, using email_recipients data`); - if (events.includes('opened')) { - maxOpenedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first()).maxOpenedAt; - } - if (events.includes('delivered')) { - maxDeliveredAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first()).maxDeliveredAt; - } - if (events.includes('failed')) { - maxFailedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt; - } - - // Insert a new job row if it doesn't exist - await db.knex('jobs').insert({ - id: new ObjectID().toHexString(), - name: jobName, - started_at: new Date(), - created_at: new Date(), - status: 'started' - }).onConflict('name').ignore(); - } - - // Convert string dates to Date objects for SQLite compatibility - [maxOpenedAt, maxDeliveredAt, maxFailedAt] = [maxOpenedAt, maxDeliveredAt, maxFailedAt].map(date => ( - date && !(date instanceof Date) ? new Date(date) : date - )); - const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]); - debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`); + // three separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns + let {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {}; + let {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {}; + let {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {}; - return lastSeenEventTimestamp; - }, + if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) { + // SQLite returns a string instead of a Date + maxDeliveredAt = new Date(maxDeliveredAt); + } - /** - * Sets the timestamp of the last seen event for the specified email analytics events. - * @param {EmailAnalyticsJobName} jobName - The name of the job to update. - * @param {'completed'|'started'} field - The field to update. - * @param {Date} date - The timestamp of the last seen event. - * @returns {Promise} - * @description - * Updates the `finished_at` or `started_at` column of the specified job in the `jobs` table with the provided timestamp. - * This is used to keep track of the last time the job was run to avoid expensive queries following reboot. - */ - async setJobTimestamp(jobName, field, date) { - // Convert string dates to Date objects for SQLite compatibility - try { - debug(`Setting ${field} timestamp for job ${jobName} to ${date}`); - const updateField = field === 'completed' ? 'finished_at' : 'started_at'; - const status = field === 'completed' ? 'finished' : 'started'; - const result = await db.knex('jobs').update({[updateField]: date, updated_at: new Date(), status: status}).where('name', jobName); - if (result === 0) { - await db.knex('jobs').insert({ - id: new ObjectID().toHexString(), - name: jobName, - [updateField]: date, - updated_at: date, - status: status - }); - } - } catch (err) { - debug(`Error setting ${field} timestamp for job ${jobName}: ${err.message}`); + if (maxOpenedAt && !(maxOpenedAt instanceof Date)) { + // SQLite returns a string instead of a Date + maxOpenedAt = new Date(maxOpenedAt); } - }, - /** - * Sets the status of the specified email analytics job. - * @param {EmailAnalyticsJobName} jobName - The name of the job to update. - * @param {'started'|'finished'|'failed'} status - The new status of the job. - * @returns {Promise} - * @description - * Updates the `status` column of the specified job in the `jobs` table with the provided status. - * This is used to keep track of the current state of the job. - */ - async setJobStatus(jobName, status) { - debug(`Setting status for job ${jobName} to ${status}`); - try { - const result = await db.knex('jobs') - .update({ - status: status, - updated_at: new Date() - }) - .where('name', jobName); - - if (result === 0) { - await db.knex('jobs').insert({ - id: new ObjectID().toHexString(), - name: jobName, - status: status, - created_at: new Date(), - updated_at: new Date() - }); - } - } catch (err) { - debug(`Error setting status for job ${jobName}: ${err.message}`); - throw err; + if (maxFailedAt && !(maxFailedAt instanceof Date)) { + // SQLite returns a string instead of a Date + maxFailedAt = new Date(maxFailedAt); } + + const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]); + debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`); + + return lastSeenEventTimestamp; }, async aggregateEmailStats(emailId) { @@ -174,4 +78,4 @@ module.exports = { .update(updateQuery) .where('id', memberId); } -}; \ No newline at end of file +}; diff --git a/ghost/core/test/integration/services/email-service/email-event-storage.test.js b/ghost/core/test/integration/services/email-service/email-event-storage.test.js index d88a5455bfb..e712f68a83c 100644 --- a/ghost/core/test/integration/services/email-service/email-event-storage.test.js +++ b/ghost/core/test/integration/services/email-service/email-event-storage.test.js @@ -23,7 +23,7 @@ describe('EmailEventStorage', function () { before(async function () { // Stub queries before boot const queries = require('../../../../core/server/services/email-analytics/lib/queries'); - sinon.stub(queries, 'getLastEventTimestamp').callsFake(async function () { + sinon.stub(queries, 'getLastSeenEventTimestamp').callsFake(async function () { // This is required because otherwise the last event timestamp will be now, and that is too close to NOW to start fetching new events return new Date(2000, 0, 1); }); @@ -78,7 +78,7 @@ describe('EmailEventStorage', function () { // Fire event processing // We use offloading to have correct coverage and usage of worker thread - const result = await emailAnalytics.fetchLatestNonOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -125,7 +125,7 @@ describe('EmailEventStorage', function () { assert.equal(initialModel.get('delivered_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatestNonOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -170,7 +170,7 @@ describe('EmailEventStorage', function () { assert.equal(initialModel.get('opened_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -250,7 +250,7 @@ describe('EmailEventStorage', function () { assert.notEqual(initialModel.get('delivered_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -346,7 +346,7 @@ describe('EmailEventStorage', function () { assert.notEqual(initialModel.get('delivered_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -439,7 +439,7 @@ describe('EmailEventStorage', function () { assert.notEqual(initialModel.get('failed_at'), null, 'This test requires a failed email recipient'); // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -529,7 +529,7 @@ describe('EmailEventStorage', function () { assert.equal(initialModel.get('failed_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -645,7 +645,7 @@ describe('EmailEventStorage', function () { assert.equal(initialModel.get('failed_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -747,7 +747,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -849,7 +849,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -951,7 +951,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -1015,7 +1015,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -1074,7 +1074,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -1118,7 +1118,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 1); }); @@ -1132,7 +1132,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatestOpenedEvents(); + const result = await emailAnalytics.fetchLatest(); assert.equal(result, 0); }); }); diff --git a/ghost/core/test/integration/services/mailgun-email-suppression-list.test.js b/ghost/core/test/integration/services/mailgun-email-suppression-list.test.js index 2fa02c76ec8..6d6addb6aab 100644 --- a/ghost/core/test/integration/services/mailgun-email-suppression-list.test.js +++ b/ghost/core/test/integration/services/mailgun-email-suppression-list.test.js @@ -44,7 +44,7 @@ describe('MailgunEmailSuppressionList', function () { recipient })]; - await emailAnalytics.fetchLatestOpenedEvents(); + await emailAnalytics.fetchLatest(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); @@ -72,7 +72,7 @@ describe('MailgunEmailSuppressionList', function () { recipient })]; - await emailAnalytics.fetchLatestOpenedEvents(); + await emailAnalytics.fetchLatest(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); @@ -100,7 +100,7 @@ describe('MailgunEmailSuppressionList', function () { recipient })]; - await emailAnalytics.fetchLatestOpenedEvents(); + await emailAnalytics.fetchLatest(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); @@ -128,7 +128,7 @@ describe('MailgunEmailSuppressionList', function () { recipient })]; - await emailAnalytics.fetchLatestOpenedEvents(); + await emailAnalytics.fetchLatest(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); @@ -163,7 +163,7 @@ describe('MailgunEmailSuppressionList', function () { timestamp: Math.round(timestamp.getTime() / 1000) }]; - await emailAnalytics.fetchLatestOpenedEvents(); + await emailAnalytics.fetchLatest(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); diff --git a/ghost/email-analytics-provider-mailgun/lib/EmailAnalyticsProviderMailgun.js b/ghost/email-analytics-provider-mailgun/lib/EmailAnalyticsProviderMailgun.js index cb748b5dbcc..4bcfacf3a4a 100644 --- a/ghost/email-analytics-provider-mailgun/lib/EmailAnalyticsProviderMailgun.js +++ b/ghost/email-analytics-provider-mailgun/lib/EmailAnalyticsProviderMailgun.js @@ -1,6 +1,6 @@ const MailgunClient = require('@tryghost/mailgun-client'); -const DEFAULT_EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained'; +const EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained'; const PAGE_LIMIT = 300; const DEFAULT_TAGS = ['bulk-email']; @@ -26,12 +26,11 @@ class EmailAnalyticsProviderMailgun { * @param {Number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks. * @param {Date} [options.begin] * @param {Date} [options.end] - * @param {String[]} [options.events] */ fetchLatest(batchHandler, options) { const mailgunOptions = { limit: PAGE_LIMIT, - event: options?.events ? options.events.join(' OR ') : DEFAULT_EVENT_FILTER, + event: EVENT_FILTER, tags: this.tags.join(' AND '), begin: options.begin ? options.begin.getTime() / 1000 : undefined, end: options.end ? options.end.getTime() / 1000 : undefined, diff --git a/ghost/email-analytics-provider-mailgun/test/provider-mailgun.test.js b/ghost/email-analytics-provider-mailgun/test/provider-mailgun.test.js index ca99240eada..c0cba21531c 100644 --- a/ghost/email-analytics-provider-mailgun/test/provider-mailgun.test.js +++ b/ghost/email-analytics-provider-mailgun/test/provider-mailgun.test.js @@ -155,28 +155,5 @@ describe('EmailAnalyticsProviderMailgun', function () { tags: 'bulk-email AND custom-tag' }, batchHandler, {maxEvents: undefined}); }); - - it('uses provided events when supplied', async function () { - const configStub = sinon.stub(config, 'get'); - configStub.withArgs('bulkEmail').returns({ - mailgun: { - apiKey: 'apiKey', - domain: 'domain.com', - baseUrl: 'https://api.mailgun.net/v3' - } - }); - const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings}); - - const batchHandler = sinon.spy(); - const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS); - - await mailgunProvider.fetchLatest(batchHandler, {events: ['delivered'], begin: LATEST_TIMESTAMP}); - - sinon.assert.calledWithExactly(mailgunFetchEventsStub, { - ...MAILGUN_OPTIONS, - event: 'delivered', - tags: 'bulk-email' - }, batchHandler, {maxEvents: undefined}); - }); }); }); diff --git a/ghost/email-analytics-service/lib/EmailAnalyticsService.js b/ghost/email-analytics-service/lib/EmailAnalyticsService.js index 589080b5cc2..32262b6e559 100644 --- a/ghost/email-analytics-service/lib/EmailAnalyticsService.js +++ b/ghost/email-analytics-service/lib/EmailAnalyticsService.js @@ -9,7 +9,6 @@ const errors = require('@tryghost/errors'); /** * @typedef {object} FetchData * @property {boolean} running - * @property {('email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-latest-opened'|'email-analytics-scheduled')} jobName Name of the job that is running * @property {Date} [lastStarted] Date the last fetch started on * @property {Date} [lastBegin] The begin time used during the last fetch * @property {Date} [lastEventTimestamp] @@ -17,11 +16,7 @@ const errors = require('@tryghost/errors'); */ /** - * @typedef {FetchData & {schedule?: {begin: Date, end: Date}}} FetchDataScheduled - */ - -/** - * @typedef {'delivered' | 'opened' | 'failed' | 'unsubscribed' | 'complained'} EmailAnalyticsEvent + * @typedef {FetchData & {schedule: {begin: Date, end: Date}}} FetchDataScheduled */ const TRUST_THRESHOLD_MS = 30 * 60 * 1000; // 30 minutes @@ -37,42 +32,21 @@ module.exports = class EmailAnalyticsService { /** * @type {FetchData} */ - #fetchLatestNonOpenedData = { - running: false, - jobName: 'email-analytics-latest-others' - }; + #fetchLatestData = null; /** * @type {FetchData} */ - #fetchMissingData = { - running: false, - jobName: 'email-analytics-missing' - }; - - /** - * @type {FetchData} - */ - #fetchLatestOpenedData = { - running: false, - jobName: 'email-analytics-latest-opened' - }; + #fetchMissingData = null; /** * @type {FetchDataScheduled} */ - #fetchScheduledData = { - running: false, - jobName: 'email-analytics-scheduled' - }; + #fetchScheduledData = null; /** * @param {object} dependencies - * @param {object} dependencies.config - * @param {object} dependencies.settings - * @param {object} dependencies.queries * @param {EmailEventProcessor} dependencies.eventProcessor - * @param {object} dependencies.providers */ constructor({config, settings, queries, eventProcessor, providers}) { this.config = config; @@ -84,65 +58,38 @@ module.exports = class EmailAnalyticsService { getStatus() { return { - latest: this.#fetchLatestNonOpenedData, + latest: this.#fetchLatestData, missing: this.#fetchMissingData, - scheduled: this.#fetchScheduledData, - latestOpened: this.#fetchLatestOpenedData + scheduled: this.#fetchScheduledData }; } /** - * Returns the timestamp of the last non-opened event we processed. Defaults to now minus 30 minutes if we have no data yet. - */ - async getLastNonOpenedEventTimestamp() { - return this.#fetchLatestNonOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(this.#fetchLatestNonOpenedData.jobName,['delivered','failed'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); - } - - /** - * Returns the timestamp of the last opened event we processed. Defaults to now minus 30 minutes if we have no data yet. + * Returns the timestamp of the last event we processed. Defaults to now minus 30 minutes if we have no data yet. */ - async getLastOpenedEventTimestamp() { - return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(this.#fetchLatestOpenedData.jobName,['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); + async getLastEventTimestamp() { + return this.#fetchLatestData?.lastEventTimestamp ?? (await this.queries.getLastSeenEventTimestamp()) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); } - /** - * Fetches the latest opened events. - * @param {Object} options - The options for fetching events. - * @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch. - * @returns {Promise} The total number of events fetched. - */ - async fetchLatestOpenedEvents({maxEvents = Infinity} = {}) { + async fetchLatest({maxEvents = Infinity} = {}) { // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available - const begin = await this.getLastOpenedEventTimestamp(); - const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage + const begin = await this.getLastEventTimestamp(); + const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // ALways stop at x minutes ago to give Mailgun a bit more time to stabilize storage if (end <= begin) { // Skip for now - logging.info('[EmailAnalytics] Skipping fetchLatestOpenedEvents because end (' + end + ') is before begin (' + begin + ')'); + logging.info('[EmailAnalytics] Skipping fetchLatest because end (' + end + ') is before begin (' + begin + ')'); return 0; } - return await this.#fetchEvents(this.#fetchLatestOpenedData, {begin, end, maxEvents, eventTypes: ['opened']}); - } - - /** - * Fetches the latest non-opened events. - * @param {Object} options - The options for fetching events. - * @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch. - * @returns {Promise} The total number of events fetched. - */ - async fetchLatestNonOpenedEvents({maxEvents = Infinity} = {}) { - // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available - const begin = await this.getLastNonOpenedEventTimestamp(); - const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage - - if (end <= begin) { - // Skip for now - logging.info('[EmailAnalytics] Skipping fetchLatestNonOpenedEvents because end (' + end + ') is before begin (' + begin + ')'); - return 0; + // Create the fetch data object if it doesn't exist yet + if (!this.#fetchLatestData) { + this.#fetchLatestData = { + running: false + }; } - return await this.#fetchEvents(this.#fetchLatestNonOpenedData, {begin, end, maxEvents, eventTypes: ['delivered', 'failed', 'unsubscribed', 'complained']}); + return await this.#fetchEvents(this.#fetchLatestData, {begin, end, maxEvents}); } /** @@ -154,11 +101,11 @@ module.exports = class EmailAnalyticsService { // We start where we left of, or 1,5h ago after a server restart const begin = this.#fetchMissingData?.lastEventTimestamp ?? this.#fetchMissingData?.lastBegin ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 3); - // Always stop at the earlier of the time the fetchLatest started fetching on or 30 minutes ago + // Always stop at the time the fetchLatest started fetching on, or maximum 30 minutes ago const end = new Date( Math.min( Date.now() - TRUST_THRESHOLD_MS, - this.#fetchLatestNonOpenedData?.lastBegin?.getTime() || Date.now() // Fallback to now if the previous job didn't run, for whatever reason, prevents catastrophic error + this.#fetchLatestData?.lastBegin?.getTime() ) ); @@ -168,15 +115,18 @@ module.exports = class EmailAnalyticsService { return 0; } + // Create the fetch data object if it doesn't exist yet + if (!this.#fetchMissingData) { + this.#fetchMissingData = { + running: false + }; + } + return await this.#fetchEvents(this.#fetchMissingData, {begin, end, maxEvents}); } /** - * Schedule a new fetch for email analytics events. - * @param {Object} options - The options for scheduling the fetch. - * @param {Date} options.begin - The start date for the scheduled fetch. - * @param {Date} options.end - The end date for the scheduled fetch. - * @throws {errors.ValidationError} Throws an error if a fetch is already in progress. + * Schedule a new fetch that should happen */ schedule({begin, end}) { if (this.#fetchScheduledData && this.#fetchScheduledData.running) { @@ -187,7 +137,6 @@ module.exports = class EmailAnalyticsService { logging.info('[EmailAnalytics] Scheduling fetch from ' + begin.toISOString() + ' until ' + end.toISOString()); this.#fetchScheduledData = { running: false, - jobName: 'email-analytics-scheduled', schedule: { begin, end @@ -195,32 +144,19 @@ module.exports = class EmailAnalyticsService { }; } - /** - * Cancels the scheduled fetch of email analytics events. - * If a fetch is currently running, it marks it for cancellation. - * If no fetch is running, it clears the scheduled fetch data. - * @method cancelScheduled - */ cancelScheduled() { if (this.#fetchScheduledData) { if (this.#fetchScheduledData.running) { // Cancel the running fetch this.#fetchScheduledData.canceled = true; } else { - this.#fetchScheduledData = { - running: false, - jobName: 'email-analytics-scheduled' - }; + this.#fetchScheduledData = null; } } } /** * Continues fetching the scheduled events (does not start one). Resets the scheduled event when received 0 events. - * @method fetchScheduled - * @param {Object} [options] - The options for fetching scheduled events. - * @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch. - * @returns {Promise} The number of events fetched. */ async fetchScheduled({maxEvents = Infinity} = {}) { if (!this.#fetchScheduledData || !this.#fetchScheduledData.schedule) { @@ -245,36 +181,27 @@ module.exports = class EmailAnalyticsService { if (end <= begin) { // Skip for now logging.info('[EmailAnalytics] Ending fetchScheduled because end is before begin'); - this.#fetchScheduledData = { - running: false, - jobName: 'email-analytics-scheduled' - }; + this.#fetchScheduledData = null; return 0; } const count = await this.#fetchEvents(this.#fetchScheduledData, {begin, end, maxEvents}); if (count === 0 || this.#fetchScheduledData.canceled) { // Reset the scheduled fetch - this.#fetchScheduledData = { - running: false, - jobName: 'email-analytics-scheduled' - }; + this.#fetchScheduledData = null; } - - this.queries.setJobTimestamp(this.#fetchScheduledData.jobName, 'completed', this.#fetchScheduledData.lastEventTimestamp); return count; } + /** * Start fetching analytics and store the data of the progress inside fetchData - * @param {FetchData} fetchData - Object to store the progress of the fetch operation - * @param {object} options - Options for fetching events - * @param {Date} options.begin - Start date for fetching events - * @param {Date} options.end - End date for fetching events - * @param {number} [options.maxEvents=Infinity] - Maximum number of events to fetch. Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks. - * @param {EmailAnalyticsEvent[]} [options.eventTypes] - Array of event types to fetch. If not provided, Mailgun will return all event types. - * @returns {Promise} The number of events fetched + * @param {FetchData} fetchData + * @param {object} options + * @param {Date} options.begin + * @param {Date} options.end + * @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks. */ - async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity, eventTypes = null}) { + async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity}) { // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available logging.info('[EmailAnalytics] Fetching from ' + begin.toISOString() + ' until ' + end.toISOString() + ' (maxEvents: ' + maxEvents + ')'); @@ -282,7 +209,6 @@ module.exports = class EmailAnalyticsService { fetchData.running = true; fetchData.lastStarted = new Date(); fetchData.lastBegin = begin; - this.queries.setJobTimestamp(fetchData.jobName, 'started', begin); let lastAggregation = Date.now(); let eventCount = 0; @@ -291,13 +217,6 @@ module.exports = class EmailAnalyticsService { let processingResult = new EventProcessingResult(); let error = null; - /** - * Process a batch of events - * @param {Array} events - Array of event objects to process - * @param {EventProcessingResult} processingResult - Object to store the processing results - * @param {FetchData} fetchData - Object containing fetch operation data - * @returns {Promise} - */ const processBatch = async (events) => { // Even if the fetching is interrupted because of an error, we still store the last event timestamp await this.processEventBatch(events, processingResult, fetchData); @@ -327,7 +246,7 @@ module.exports = class EmailAnalyticsService { try { for (const provider of this.providers) { - await provider.fetchLatest(processBatch, {begin, end, maxEvents, events: eventTypes}); + await provider.fetchLatest(processBatch, {begin, end, maxEvents}); } logging.info('[EmailAnalytics] Fetching finished'); @@ -358,14 +277,7 @@ module.exports = class EmailAnalyticsService { // So if we didn't have errors while fetching, and total events < maxEvents, increase lastEventTimestamp with one second if (!error && eventCount > 0 && eventCount < maxEvents && fetchData.lastEventTimestamp && fetchData.lastEventTimestamp.getTime() < Date.now() - 2000) { logging.info('[EmailAnalytics] Reached end of new events, increasing lastEventTimestamp with one second'); - // set the data on the db so we can store it for fetching after reboot - await this.queries.setJobTimestamp(fetchData.jobName, 'completed', new Date(fetchData.lastEventTimestamp.getTime())); - // increment and store in local memory fetchData.lastEventTimestamp = new Date(fetchData.lastEventTimestamp.getTime() + 1000); - } else { - logging.info('[EmailAnalytics] No new events found'); - // set job status to finished - await this.queries.setJobStatus(fetchData.jobName, 'completed'); } fetchData.running = false; @@ -377,11 +289,8 @@ module.exports = class EmailAnalyticsService { } /** - * Process a batch of email analytics events. - * @param {any[]} events - An array of email analytics events to process. - * @param {Object} result - The result object to merge batch processing results into. - * @param {FetchData} fetchData - Data related to the current fetch operation. - * @returns {Promise} + * @param {any[]} events + * @param {FetchData} fetchData */ async processEventBatch(events, result, fetchData) { const processStart = Date.now(); @@ -390,7 +299,7 @@ module.exports = class EmailAnalyticsService { // Save last event timestamp if (!fetchData.lastEventTimestamp || (event.timestamp && event.timestamp > fetchData.lastEventTimestamp)) { - fetchData.lastEventTimestamp = event.timestamp; // don't need to keep db in sync; it'll fall back to last completed timestamp anyways + fetchData.lastEventTimestamp = event.timestamp; } result.merge(batchResult); @@ -496,10 +405,8 @@ module.exports = class EmailAnalyticsService { return new EventProcessingResult({unhandled: 1}); } - /** - * @param {{emailIds?: string[], memberIds?: string[]}} stats - */ async aggregateStats({emailIds = [], memberIds = []}) { + logging.info(`[EmailAnalytics] Aggregating for ${emailIds.length} emails`); for (const emailId of emailIds) { await this.aggregateEmailStats(emailId); } @@ -510,20 +417,10 @@ module.exports = class EmailAnalyticsService { } } - /** - * Aggregate email stats for a given email ID. - * @param {string} emailId - The ID of the email to aggregate stats for. - * @returns {Promise} - */ async aggregateEmailStats(emailId) { return this.queries.aggregateEmailStats(emailId); } - /** - * Aggregate member stats for a given member ID. - * @param {string} memberId - The ID of the member to aggregate stats for. - * @returns {Promise} - */ async aggregateMemberStats(memberId) { return this.queries.aggregateMemberStats(memberId); } diff --git a/ghost/email-analytics-service/test/email-analytics-service.test.js b/ghost/email-analytics-service/test/email-analytics-service.test.js index 229e9712adc..1e59d9ceb29 100644 --- a/ghost/email-analytics-service/test/email-analytics-service.test.js +++ b/ghost/email-analytics-service/test/email-analytics-service.test.js @@ -10,681 +10,69 @@ const { const EventProcessingResult = require('../lib/EventProcessingResult'); describe('EmailAnalyticsService', function () { - describe('getStatus', function () { - it('returns status object', function () { - // these are null because we're not running them before calling this - const service = new EmailAnalyticsService({}); - const result = service.getStatus(); - result.should.deepEqual({ - latest: { - jobName: 'email-analytics-latest-others', - running: false - }, - latestOpened: { - jobName: 'email-analytics-latest-opened', - running: false - }, - missing: { - jobName: 'email-analytics-missing', - running: false - }, - scheduled: { - jobName: 'email-analytics-scheduled', - running: false - } - }); + let eventProcessor; + beforeEach(function () { + eventProcessor = {}; + eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; }); }); - describe('getLastNonOpenedEventTimestamp', function () { - it('returns the queried timestamp before the fallback', async function () { - const service = new EmailAnalyticsService({ - queries: { - getLastEventTimestamp: sinon.stub().resolves(new Date(1)) - } - }); - - const result = await service.getLastNonOpenedEventTimestamp(); - result.should.eql(new Date(1)); - }); - - it('returns the fallback if nothing is found', async function () { - const service = new EmailAnalyticsService({ - queries: { - getLastEventTimestamp: sinon.stub().resolves(null) - } - }); - - const result = await service.getLastNonOpenedEventTimestamp(); - result.should.eql(new Date(Date.now() - 30 * 60 * 1000)); // should be 30 mins prior - }); - }); - - describe('getLastSeenOpenedEventTimestamp', function () { - it('returns the queried timestamp before the fallback', async function () { - const service = new EmailAnalyticsService({ - queries: { - getLastEventTimestamp: sinon.stub().resolves(new Date(1)) - } - }); - - const result = await service.getLastOpenedEventTimestamp(); - result.should.eql(new Date(1)); - }); + describe('fetchLatest', function () { - it('returns the fallback if nothing is found', async function () { - const service = new EmailAnalyticsService({ - queries: { - getLastEventTimestamp: sinon.stub().resolves(null) - } - }); - - const result = await service.getLastOpenedEventTimestamp(); - result.should.eql(new Date(Date.now() - 30 * 60 * 1000)); // should be 30 mins prior - }); - - it.skip('returns the cached value before the fallback', async function () { - }); - }); - - describe('Fetching events', function () { - afterEach(function () { - sinon.restore(); - }); - describe('fetchLatestOpenedEvents', function () { - it('fetches only opened events', async function () { - const fetchLatestSpy = sinon.spy(); - const service = new EmailAnalyticsService({ - queries: { - getLastEventTimestamp: sinon.stub().resolves(), - setJobTimestamp: sinon.stub().resolves(), - setJobStatus: sinon.stub().resolves() - }, - providers: [{ - fetchLatest: fetchLatestSpy - }] - }); - await service.fetchLatestOpenedEvents(); - fetchLatestSpy.calledOnce.should.be.true(); - fetchLatestSpy.getCall(0).args[1].should.have.property('events', ['opened']); - }); - - it('quits if the end is before the begin', async function () { - const fetchLatestSpy = sinon.spy(); - const service = new EmailAnalyticsService({ - queries: { - getLastEventTimestamp: sinon.stub().resolves(new Date(Date.now() + 24 * 60 * 60 * 1000)), // 24 hours in the future - setJobTimestamp: sinon.stub().resolves(), - setJobStatus: sinon.stub().resolves() - }, - providers: [{ - fetchLatest: fetchLatestSpy - }] - }); - await service.fetchLatestOpenedEvents(); - fetchLatestSpy.calledOnce.should.be.false(); - }); - }); - - describe('fetchLatestNonOpenedEvents', function () { - it('fetches only non-opened events', async function () { - const fetchLatestSpy = sinon.spy(); - const service = new EmailAnalyticsService({ - queries: { - getLastEventTimestamp: sinon.stub().resolves(), - setJobTimestamp: sinon.stub().resolves(), - setJobStatus: sinon.stub().resolves() - }, - providers: [{ - fetchLatest: fetchLatestSpy - }] - }); - await service.fetchLatestNonOpenedEvents(); - fetchLatestSpy.calledOnce.should.be.true(); - fetchLatestSpy.getCall(0).args[1].should.have.property('events', ['delivered', 'failed', 'unsubscribed', 'complained']); - }); - - it('quits if the end is before the begin', async function () { - const fetchLatestSpy = sinon.spy(); - const service = new EmailAnalyticsService({ - queries: { - getLastEventTimestamp: sinon.stub().resolves(new Date(Date.now() + 24 * 60 * 60 * 1000)), // 24 hours in the future - setJobTimestamp: sinon.stub().resolves(), - setJobStatus: sinon.stub().resolves() - }, - providers: [{ - fetchLatest: fetchLatestSpy - }] - }); - await service.fetchLatestNonOpenedEvents(); - fetchLatestSpy.calledOnce.should.be.false(); - }); - }); - describe('fetchScheduled', function () { - let service; - let processEventBatchStub; - let aggregateStatsStub; - - beforeEach(function () { - service = new EmailAnalyticsService({ - queries: { - setJobTimestamp: sinon.stub().resolves(), - setJobStatus: sinon.stub().resolves() - }, - providers: [{ - fetchLatest: (fn) => { - const events = [1,2,3,4,5,6,7,8,9,10]; - fn(events); - } - }] - }); - processEventBatchStub = sinon.stub(service, 'processEventBatch').resolves(); - aggregateStatsStub = sinon.stub(service, 'aggregateStats').resolves(); - }); - - afterEach(function () { - sinon.restore(); - }); - - it('returns 0 when nothing is scheduled', async function () { - const result = await service.fetchScheduled(); - result.should.equal(0); - processEventBatchStub.called.should.be.false(); - aggregateStatsStub.called.should.be.false(); - }); - - it('returns 0 when fetch is canceled', async function () { - service.schedule({ - begin: new Date(2023, 0, 1), - end: new Date(2023, 0, 2) - }); - service.cancelScheduled(); - const result = await service.fetchScheduled(); - result.should.equal(0); - processEventBatchStub.called.should.be.false(); - aggregateStatsStub.called.should.be.false(); - }); - - it('fetches events with correct parameters', async function () { - service.schedule({ - begin: new Date(2023, 0, 1), - end: new Date(2023, 0, 2) - }); - - const result = await service.fetchScheduled({maxEvents: 100}); - - result.should.equal(10); - aggregateStatsStub.calledOnce.should.be.true(); - processEventBatchStub.calledOnce.should.be.true(); - }); - - it('bails when end date is before begin date', async function () { - service.schedule({ - begin: new Date(2023, 0, 2), - end: new Date(2023, 0, 1) - }); - const result = await service.fetchScheduled({maxEvents: 100}); - result.should.equal(0); - }); - - it('resets fetchScheduledData when no events are fetched', async function () { - service = new EmailAnalyticsService({ - queries: { - setJobTimestamp: sinon.stub().resolves(), - setJobStatus: sinon.stub().resolves() - }, - providers: [{ - fetchLatest: (fn) => { - fn([]); - } - }] - }); - - service.schedule({ - begin: new Date(2023, 0, 1), - end: new Date(2023, 0, 2) - }); - const result = await service.fetchScheduled({maxEvents: 100}); - result.should.equal(0); - }); - }); - - describe('fetchMissing', function () { - it('fetches missing events', async function () { - const fetchLatestSpy = sinon.spy(); - const service = new EmailAnalyticsService({ - queries: { - setJobTimestamp: sinon.stub().resolves(), - setJobStatus: sinon.stub().resolves() - }, - providers: [{ - fetchLatest: fetchLatestSpy - }] - }); - await service.fetchMissing(); - fetchLatestSpy.calledOnce.should.be.true(); - }); - }); }); describe('processEventBatch', function () { - describe('with functional processor', function () { - let eventProcessor; - beforeEach(function () { - eventProcessor = {}; - eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handlePermanentFailed = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handleTemporaryFailed = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handleUnsubscribed = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handleComplained = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - }); - - it('uses passed-in event processor', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - await service.processEventBatch([{ - type: 'delivered', - emailId: 1, - timestamp: new Date(1) - }, { - type: 'delivered', - emailId: 2, - timestamp: new Date(2) - }, { - type: 'opened', - emailId: 1, - timestamp: new Date(3) - }], result, fetchData); - - eventProcessor.handleDelivered.callCount.should.eql(2); - eventProcessor.handleOpened.callCount.should.eql(1); - - result.should.deepEqual(new EventProcessingResult({ - delivered: 2, - opened: 1, - unprocessable: 0, - emailIds: [1, 2], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(3) - }); - }); - - it('handles opened', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'opened', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - eventProcessor.handleOpened.calledOnce.should.be.true(); - - result.should.deepEqual(new EventProcessingResult({ - delivered: 0, - opened: 1, - unprocessable: 0, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles delivered', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'delivered', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - eventProcessor.handleDelivered.calledOnce.should.be.true(); - - result.should.deepEqual(new EventProcessingResult({ - delivered: 1, - opened: 0, - unprocessable: 0, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles failed (permanent)', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'failed', - severity: 'permanent', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - eventProcessor.handlePermanentFailed.calledOnce.should.be.true(); - - result.should.deepEqual(new EventProcessingResult({ - permanentFailed: 1, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles failed (temporary)', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'failed', - severity: 'temporary', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - eventProcessor.handleTemporaryFailed.calledOnce.should.be.true(); - - result.should.deepEqual(new EventProcessingResult({ - temporaryFailed: 1, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles unsubscribed', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'unsubscribed', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - eventProcessor.handleUnsubscribed.calledOnce.should.be.true(); - eventProcessor.handleDelivered.called.should.be.false(); - eventProcessor.handleOpened.called.should.be.false(); - - result.should.deepEqual(new EventProcessingResult({ - unsubscribed: 1, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles complained', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'complained', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - eventProcessor.handleComplained.calledOnce.should.be.true(); - eventProcessor.handleDelivered.called.should.be.false(); - eventProcessor.handleOpened.called.should.be.false(); - - result.should.deepEqual(new EventProcessingResult({ - complained: 1, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it(`doens't handle other event types`, async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'notstandard', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - eventProcessor.handleDelivered.called.should.be.false(); - eventProcessor.handleOpened.called.should.be.false(); - - result.should.deepEqual(new EventProcessingResult({ - unhandled: 1 - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - }); - - describe('with null processor results', function () { - let eventProcessor; - beforeEach(function () { - eventProcessor = {}; - eventProcessor.handleDelivered = sinon.stub().returns(null); - eventProcessor.handleOpened = sinon.stub().returns(null); - eventProcessor.handlePermanentFailed = sinon.stub().returns(null); - eventProcessor.handleTemporaryFailed = sinon.stub().returns(null); - eventProcessor.handleUnsubscribed = sinon.stub().returns(null); - eventProcessor.handleComplained = sinon.stub().returns(null); - }); - - it('delivered returns unprocessable', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'delivered', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - result.should.deepEqual(new EventProcessingResult({ - unprocessable: 1 - })); - }); - - it('opened returns unprocessable', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'opened', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - result.should.deepEqual(new EventProcessingResult({ - unprocessable: 1 - })); - }); - - it('failed (permanent) returns unprocessable', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'failed', - emailId: 1, - timestamp: new Date(1), - severity: 'permanent' - }], result, fetchData); - - result.should.deepEqual(new EventProcessingResult({ - unprocessable: 1 - })); - }); - - it('failed (temporary) returns unprocessable', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'failed', - emailId: 1, - timestamp: new Date(1), - severity: 'temporary' - }], result, fetchData); - - result.should.deepEqual(new EventProcessingResult({ - unprocessable: 1 - })); + it('uses passed-in event processor', async function () { + const service = new EmailAnalyticsService({ + eventProcessor }); - it('unsubscribed returns unprocessable', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'unsubscribed', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - result.should.deepEqual(new EventProcessingResult({ - unprocessable: 1 - })); - }); + const result = new EventProcessingResult(); + const fetchData = { - it('complained returns unprocessable', async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); + }; + await service.processEventBatch([{ + type: 'delivered', + emailId: 1, + timestamp: new Date(1) + }, { + type: 'delivered', + emailId: 2, + timestamp: new Date(2) + }, { + type: 'opened', + emailId: 1, + timestamp: new Date(3) + }], result, fetchData); - const result = new EventProcessingResult(); - const fetchData = {}; + eventProcessor.handleDelivered.callCount.should.eql(2); - await service.processEventBatch([{ - type: 'complained', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); + result.should.deepEqual(new EventProcessingResult({ + delivered: 2, + opened: 1, + unprocessable: 0, + emailIds: [1, 2], + memberIds: [1] + })); - result.should.deepEqual(new EventProcessingResult({ - unprocessable: 1 - })); + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(3) }); }); }); - describe('processEvent', function () { - }); - describe('aggregateStats', function () { let service; @@ -712,34 +100,4 @@ describe('EmailAnalyticsService', function () { service.queries.aggregateMemberStats.calledWith('m-2').should.be.true(); }); }); - - describe('aggregateEmailStats', function () { - it('returns the query result', async function () { - const service = new EmailAnalyticsService({ - queries: { - aggregateEmailStats: sinon.stub().resolves() - } - }); - - await service.aggregateEmailStats('memberId'); - - service.queries.aggregateEmailStats.calledOnce.should.be.true(); - service.queries.aggregateEmailStats.calledWith('memberId').should.be.true; - }); - }); - - describe('aggregateMemberStats', function () { - it('returns the query result', async function () { - const service = new EmailAnalyticsService({ - queries: { - aggregateMemberStats: sinon.stub().resolves() - } - }); - - await service.aggregateMemberStats('memberId'); - - service.queries.aggregateMemberStats.calledOnce.should.be.true(); - service.queries.aggregateMemberStats.calledWith('memberId').should.be.true; - }); - }); });