diff --git a/dev-tools/migrate-to-db-managed-messaging-services.js b/dev-tools/migrate-to-db-managed-messaging-services.js new file mode 100644 index 000000000..5c0faa8f1 --- /dev/null +++ b/dev-tools/migrate-to-db-managed-messaging-services.js @@ -0,0 +1,108 @@ +require("dotenv").config(); +const _ = require("lodash"); +const knex = require("knex"); + +const config = { + client: "postgresql", + connection: process.env.DATABASE_URL, + pool: { + min: process.env.ROW_CONCURRENCY, + min: process.env.ROW_CONCURRENCY, + } +}; + +const db = knex(config); +const BATCH_SIZE = 5000; + +const getMessagingServiceSIDs = () => { + // Gather multiple messaging service SIDs (may be split across multiple env vars) + const envVarKeys = Object.keys(process.env).filter(key => + key.startsWith(`TWILIO_MESSAGE_SERVICE_SIDS`) + ); + envVarKeys.sort(); + + let messagingServiceIds = []; + for (const envVarKey of envVarKeys) { + const envVarValue = process.env[envVarKey]; + const newServiceIds = envVarValue + .split(",") + .map(serviceSid => serviceSid.trim()); + messagingServiceIds = messagingServiceIds.concat(newServiceIds); + } + + return messagingServiceIds; +}; + +const MESSAGING_SERVICE_SIDS = getMessagingServiceSIDs(); + +const getMessageServiceSID = cell => { + // Check for single message service + if (!!process.env.TWILIO_MESSAGE_SERVICE_SID) { + return process.env.TWILIO_MESSAGE_SERVICE_SID; + } + + const messagingServiceIndex = deterministicIntWithinRange( + cell, + MESSAGING_SERVICE_SIDS.length + ); + const messagingServiceId = MESSAGING_SERVICE_SIDS[messagingServiceIndex]; + + if (!messagingServiceId) + throw new Error(`Could not find Twilio message service SID for ${cell}!`); + + return messagingServiceId; +}; + +const doBatch = async () => { + const { rows } = await knex.raw( + ` + select distinct campaign_contact.cell + from campaign_contact + left join messaging_service_stick + on messaging_service_stick.cell = campaign_contact.cell + where messaging_service_stick.messaging_service_sid is null + limit ${BATCH_SIZE} + `, + [organizationId, campaignId] + ); + + const cells = rows.map(r => r.cell); + + console.log("Doing ", cells.length); + + if (cells.length === 0) { + return 0; + } + + const toInsert = cells.map(c => ({ + cell: c, + organization_id: 1, + messaging_service_sid: getMessageServiceSID(c) + })); + + await knex("messaging_service_stick").insert(toInsert); + + console.log("Did ", cells.length); + + return cells.length; +}; + +async function main() { + let done = 0; + let did = 0; + while ((did = await doBatch()) > 0) { + console.log("Did ", did); + done = done + did; + connsole.log("Done ", done); + } + + for (let c of campaigns) { + console.log("Doing campaign: ", c.id); + await ensureAllNumbersHaveMessagingServiceSIDs(c.id, 1); + console.log("...done"); + } +} + +main() + .then(console.log) + .catch(console.error); diff --git a/migrations/20190523154700_add_index_campaign_contact_get_assignment.js b/migrations/20190523154700_add_index_campaign_contact_get_assignment.1.js similarity index 88% rename from migrations/20190523154700_add_index_campaign_contact_get_assignment.js rename to migrations/20190523154700_add_index_campaign_contact_get_assignment.1.js index 328ba3e6c..b3da53be1 100644 --- a/migrations/20190523154700_add_index_campaign_contact_get_assignment.js +++ b/migrations/20190523154700_add_index_campaign_contact_get_assignment.1.js @@ -2,7 +2,7 @@ exports.up = function(knex, Promise) { return knex.schema.alterTable("campaign_contact", table => { table.index( - ["campaign_id, assignment_id, message_status, is_opted_out"], + ["campaign_id", "assignment_id", "message_status", "is_opted_out"], "campaign_contact_get_current_assignment_index" ); }); diff --git a/migrations/20190604154700_add_messaging_service_tables.js b/migrations/20190604154700_add_messaging_service_tables.js new file mode 100644 index 000000000..f691cc56d --- /dev/null +++ b/migrations/20190604154700_add_messaging_service_tables.js @@ -0,0 +1,34 @@ +// Add index for fetching current assignment target +exports.up = function(knex, Promise) { + return Promise.all([ + knex.schema.createTable("messaging_service", t => { + t.text("messaging_service_sid").primary(); // if we choose not to have the foreign key on sticks, we won't need the index here + t.integer("organization_id").references("organization(id)"); + t.index("organization_id"); + }), + knex.schema.createTable("messaging_service_stick", t => { + t.text("cell"); + t.index("cell"); + t.integer("organization_id").references("organization(id)"); + t.text("messaging_service_sid").references( + "messaging_service(messaging_service_sid)" + ); // for performance, we may want to skip the foreign key + t.index( + ["cell", "organization_id"], + "messaging_service_stick_cell_organization_index" + ); + t.unique( + ["cell", "organization_id"], + "messaging_service_stick_cell_organization_unique_constraint" + ); + }) + ]); +}; + +// Drop index for fetching current assignment target +exports.down = function(knex, Promise) { + return Promise.all([ + knex.schema.dropTable("messaging_service"), + knex.schema.dropTable("messaging_service_stick") + ]); +}; diff --git a/src/server/api/lib/message-sending.js b/src/server/api/lib/message-sending.js index 920dd527d..120077125 100644 --- a/src/server/api/lib/message-sending.js +++ b/src/server/api/lib/message-sending.js @@ -1,18 +1,93 @@ import { r } from "../../models"; -export async function getLastMessage({ contactNumber, service }) { - const lastMessage = await r - .knex("message") - .where({ - contact_number: contactNumber, - is_from_contact: false, - service - }) - .orderBy("created_at", "desc") - .limit(1) - .first("assignment_id", "campaign_contact_id"); - - return lastMessage; +/* + This was changed to accommodate multiple organizationIds. There were two potential approaches: + - option one: with campaign_id_options as select campaigns from organizationId, where campaign_id = campaign.id + ----------------------------------- + with chosen_organization as ( + select organization_id + from messaging_service + where messaging_service_sid = ? + ) + with campaign_contact_option as ( + select id + from campaign_contact + join campaign + on campaign_contact.campaign_id = campaign.id + where + campaign.organization_id in ( + select id from chosen_organization + ) + and campaign_contact.cell = ? + ) + select campaign_contact_id, assignment_id + from message + join campaign_contact_option + on message.campaign_contact_id = campaign_contact_option.id + where + message.is_from_contact = false + order by created_at desc + limit 1 + ----------------------------------- + + - option two: join campaign_contact, join campaign, where campaign.org_id = org_id + ----------------------------------- + select campaign_contact_id, assignment_id + from message + join campaign_contact + on message.campaign_contact_id = campaign_contact.id + join campaign + on campaign.id = campaign_contact.campaign_id + where + campaign.organization_id = ? + and campaign_contact.cell = ? + and message.is_from_contact = false + order by created_at desc + limit 1 + ----------------------------------- + + - must do explain analyze + - both query options were pretty good – the campaign_contact.cell and message.campaign_contact_id + index filters are fast enough and the result set to filter through small enough that the rest doesn't + really matter + - first one was much easier to plan, so going with that one + */ + +export async function getCampaignContactAndAssignmentForIncomingMessage({ + contactNumber, + service, + messaging_service_sid +}) { + const { rows } = await r.knex.raw( + ` + with chosen_organization as ( + select organization_id + from messaging_service + where messaging_service_sid = ? + ), + campaign_contact_option as ( + select campaign_contact.id + from campaign_contact + join campaign + on campaign_contact.campaign_id = campaign.id + where + campaign.organization_id in ( + select organization_id from chosen_organization + ) + and campaign_contact.cell = ? + ) + select campaign_contact_id, assignment_id + from message + join campaign_contact_option + on message.campaign_contact_id = campaign_contact_option.id + where + message.is_from_contact = false + order by created_at desc + limit 1`, + [messaging_service_sid, contactNumber] + ); + + return rows[0]; } export async function saveNewIncomingMessage(messageInstance) { diff --git a/src/server/api/lib/twilio.js b/src/server/api/lib/twilio.js index 60a03c03f..da3796460 100644 --- a/src/server/api/lib/twilio.js +++ b/src/server/api/lib/twilio.js @@ -1,8 +1,12 @@ import Twilio from "twilio"; +import _ from "lodash"; import { getFormattedPhoneNumber } from "../../../lib/phone-format"; import { Log, Message, PendingMessagePart, r } from "../../models"; import { log } from "../../../lib"; -import { getLastMessage, saveNewIncomingMessage } from "./message-sending"; +import { + getCampaignContactAndAssignmentForIncomingMessage, + saveNewIncomingMessage +} from "./message-sending"; import faker from "faker"; let twilio = null; @@ -70,21 +74,22 @@ async function convertMessagePartsToMessage(messageParts) { .join("") .replace(/\0/g, ""); // strip all UTF-8 null characters (0x00) - // TODO: this could be a slow query - const lastMessage = await getLastMessage({ + const ccInfo = await getCampaignContactAndAssignmentForIncomingMessage({ service: "twilio", - contactNumber + contactNumber, + messaging_service_sid: serviceMessages[0].MessagingServiceSid }); + return ( - lastMessage && { - campaign_contact_id: lastMessage && lastMessage.campaign_contact_id, + ccInfo && { + campaign_contact_id: ccInfo && ccInfo.campaign_contact_id, contact_number: contactNumber, user_number: userNumber, is_from_contact: true, text: textIncludingMms(text, serviceMessages), service_response: JSON.stringify(serviceMessages), service_id: serviceMessages[0].MessagingServiceSid, - assignment_id: lastMessage && lastMessage.assignment_id, + assignment_id: ccInfo && ccInfo.assignment_id, service: "twilio", send_status: "DELIVERED" } @@ -153,46 +158,61 @@ function parseMessageText(message) { return params; } -const getMessagingServiceSIDs = () => { - // Gather multiple messaging service SIDs (may be split across multiple env vars) - const envVarKeys = Object.keys(process.env).filter(key => - key.startsWith(`TWILIO_MESSAGE_SERVICE_SIDS`) +const assignMessagingServiceSID = async (cell, organizationId) => { + const result = await r.knex.raw( + ` + with chosen_messaging_service_sid as ( + select messaging_service_sid, count(*) as count + from messaging_service_stick + where organization_id = ? + group by messaging_service_sid + union + select messaging_service_sid, 0 + from messaging_service + where organization_id = ? + and not exists ( + select 1 + from messaging_service_stick + where + messaging_service_stick.messaging_service_sid = messaging_service.messaging_service_sid + ) + order by count asc + limit 1 + ) + insert into messaging_service_stick (cell, organization_id, messaging_service_sid) + values (?, ?, (select messaging_service_sid from chosen_messaging_service_sid)) + returning messaging_service_sid; + `, + [organizationId, organizationId, cell, organizationId] ); - envVarKeys.sort(); - - let messagingServiceIds = []; - for (const envVarKey of envVarKeys) { - const envVarValue = process.env[envVarKey]; - const newServiceIds = envVarValue - .split(",") - .map(serviceSid => serviceSid.trim()); - messagingServiceIds = messagingServiceIds.concat(newServiceIds); - } - return messagingServiceIds; + const chosen = result.rows[0].messaging_service_sid; + return chosen; }; -const MESSAGING_SERVICE_SIDS = getMessagingServiceSIDs(); - -const getMessageServiceSID = cell => { - // Check for single message service - if (!!process.env.TWILIO_MESSAGE_SERVICE_SID) { - return process.env.TWILIO_MESSAGE_SERVICE_SID; - } - - const messagingServiceIndex = deterministicIntWithinRange( - cell, - MESSAGING_SERVICE_SIDS.length +const getMessageServiceSID = async (cell, organizationId) => { + const { rows: existingStick } = await r.knex.raw( + ` + select messaging_service_sid + from messaging_service_stick + where + cell = ? + and organization_id = ? + `, + [cell, organizationId] ); - const messagingServiceId = MESSAGING_SERVICE_SIDS[messagingServiceIndex]; - if (!messagingServiceId) - throw new Error(`Could not find Twilio message service SID for ${cell}!`); + const existingMessageServiceSid = + existingStick[0] && existingStick[0].messaging_service_sid; - return messagingServiceId; + if (existingMessageServiceSid) { + return existingMessageServiceSid; + } + + return await assignMessagingServiceSID(cell, organizationId); }; -async function sendMessage(message, trx) { +async function sendMessage(message, organizationId, trx) { if (!twilio) { log.warn( "cannot actually send SMS message -- twilio is not fully configured:", @@ -208,12 +228,15 @@ async function sendMessage(message, trx) { return "test_message_uuid"; } - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { if (message.service !== "twilio") { log.warn("Message not marked as a twilio message", message.id); } - const messagingServiceSid = getMessageServiceSID(message.contact_number); + const messagingServiceSid = await getMessageServiceSID( + message.contact_number, + organizationId + ); const messageParams = Object.assign( { @@ -402,6 +425,103 @@ function hashStr(str) { return hash; } +// NOTE: This does not chunk inserts so make sure this is run only when you are sure the specified campaign +// has a reasonable size (< 1000) of cells without sticky messaging services. +const ensureAllNumbersHaveMessagingServiceSIDs = async ( + trx, + campaignId, + organizationId +) => { + const { rows } = await trx.raw( + ` + select distinct campaign_contact.cell + from campaign_contact + left join messaging_service_stick + on messaging_service_stick.cell = campaign_contact.cell + and messaging_service_stick.organization_id = ? + where campaign_id = ? + and messaging_service_stick.messaging_service_sid is null + `, + [organizationId, campaignId] + ); + + const cells = rows.map(r => r.cell); + + if (cells.length == 0) { + return; + } + + const { rows: messagingServiceCandidates } = await r.knex.raw( + ` + select messaging_service_sid, count(*) as count + from messaging_service_stick + where organization_id = ? + group by messaging_service_sid + union + select messaging_service_sid, 0 + from messaging_service + where organization_id = ? + and not exists ( + select 1 + from messaging_service_stick + where + messaging_service_stick.messaging_service_sid = messaging_service.messaging_service_sid + ) + order by count desc + `, + [organizationId, organizationId] + ); + + const mostAssignedNumbers = messagingServiceCandidates[0].count; + + const gapToMakeUp = messagingServiceCandidates.slice(1).reduce((acc, ms) => { + return acc + (mostAssignedNumbers - ms.count); + }, 0); + + let cellsUsedForMakingUpGap = cells.slice(0, gapToMakeUp); + const additionalCells = cells.slice(gapToMakeUp); + + const reversedMessagingServicesToAddMakeUpCellsTo = messagingServiceCandidates.slice( + 0 + ); + reversedMessagingServicesToAddMakeUpCellsTo.reverse(); + + let rowsToInsert = []; + + for (let ms of reversedMessagingServicesToAddMakeUpCellsTo) { + const gap = mostAssignedNumbers - ms.count; + + rowsToInsert = rowsToInsert.concat( + cellsUsedForMakingUpGap.slice(0, gap).map(cell => ({ + cell, + organization_id: organizationId, + messaging_service_sid: ms.messaging_service_sid + })) + ); + + cellsUsedForMakingUpGap = cellsUsedForMakingUpGap.slice(gap); + } + + const chunkSize = Math.ceil( + additionalCells.length / messagingServiceCandidates.length + ); + const chunks = _.chunk(additionalCells, chunkSize); + + messagingServiceCandidates.forEach((ms, idx) => { + const chunk = chunks[idx]; + + rowsToInsert = rowsToInsert.concat( + chunk.map(cell => ({ + cell, + organization_id: organizationId, + messaging_service_sid: ms.messaging_service_sid + })) + ); + }); + + return await r.knex("messaging_service_stick").insert(rowsToInsert); +}; + export default { syncMessagePartProcessing: !!process.env.JOBS_SAME_PROCESS, webhook, @@ -412,5 +532,6 @@ export default { saveNewIncomingMessage, handleDeliveryReport, handleIncomingMessage, - parseMessageText + parseMessageText, + ensureAllNumbersHaveMessagingServiceSIDs }; diff --git a/src/server/api/schema.js b/src/server/api/schema.js index 00766e909..96165c2ae 100644 --- a/src/server/api/schema.js +++ b/src/server/api/schema.js @@ -563,7 +563,7 @@ async function sendMessage( // Send message after we are sure messageInstance has been persisted const service = serviceMap[messageInstance.service || process.env.DEFAULT_SERVICE]; - service.sendMessage(toInsert); + service.sendMessage(toInsert, record.organization_id); // Send message to BernieSMS to be checked for bad words const badWordUrl = process.env.TFB_BAD_WORD_URL; diff --git a/src/workers/jobs.js b/src/workers/jobs.js index a8d71ef43..83525e25b 100644 --- a/src/workers/jobs.js +++ b/src/workers/jobs.js @@ -232,6 +232,14 @@ export async function uploadContacts(job) { try { await trx("campaign_contact").insert(chunk); + const service = serviceMap[process.env.DEFAULT_SERVICE]; + if (service.ensureAllNumbersHaveMessagingServiceSIDs) { + await service.ensureAllNumbersHaveMessagingServiceSIDs( + trx, + campaignId, + campaign.organization_id + ); + } } catch (exc) { console.error("Error inserting contacts:", exc); throw exc;