Skip to content

Commit

Permalink
Implement toggle notification queue mode (#1950)
Browse files Browse the repository at this point in the history
  • Loading branch information
IhostVlad authored Jul 14, 2021
1 parent 54c4e67 commit 38805a3
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,15 @@ const build: ExternalMethods['build'] = async (
)
const trxTableNameAsId = escapeId(`${tablePrefix}__${schemaName}__TRX__`)

const xaKey = generateGuid(`${Date.now()}${Math.random()}${process.pid}`)
const firstRandom = Math.random()
let lastRandom: number | null = null
// More entropy via branch misprediction and more context changes
for (let index = 0; index < Math.floor(firstRandom * 50) + 1; index++) {
lastRandom = Math.random()
}
const xaKey = generateGuid(
`${Date.now()}${firstRandom}${lastRandom}${process.pid}`
)

const rows = (await inlineLedgerRunQuery(
`WITH "MaybeAcquireLock" AS (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
.join('\n')
}
if (pool.connection === connection) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
pool.connection = null!
}
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import type { GenerateGuidMethod } from './types'

const GUID_BYTES = 32

const generateGuid: GenerateGuidMethod = (...args) => {
const baseBuffer = Buffer.from(`${args.map(String).join('')}`)
const resultBuffer = Buffer.alloc(8)
const resultBuffer = Buffer.alloc(GUID_BYTES)

for (let index = 0; index < baseBuffer.length; index++) {
resultBuffer[index % 8] = resultBuffer[index % 8] ^ baseBuffer[index]
resultBuffer[index % GUID_BYTES] =
resultBuffer[index % GUID_BYTES] ^ baseBuffer[index]
}

const result = `e${resultBuffer.toString('hex').toLowerCase()}`
Expand Down
64 changes: 36 additions & 28 deletions packages/runtime/runtime/src/cloud/init-subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
setFunctionTags,
deleteEventSourceMapping,
getFunctionTags,
invokeFunction,
} from 'resolve-cloud-common/lambda'
import { getCallerIdentity } from 'resolve-cloud-common/sts'

Expand All @@ -34,46 +35,50 @@ const initSubscriber = (resolve, lambdaContext) => {
const region = process.env.AWS_REGION
const userId = process.env.RESOLVE_USER_ID
const functionArn = `arn:aws:lambda:${region}:${accountId}:function:${functionName}`
const useSqs = !!process.env.EXPERIMENTAL_SQS_TRANSPORT

resolve.getEventSubscriberDestination = (eventSubscriber) =>
`arn:aws:sqs:${region}:${accountId}:${userId}-${resolve.eventSubscriberScope}-${eventSubscriber}`
useSqs
? `arn:aws:sqs:${region}:${accountId}:${userId}-${resolve.eventSubscriberScope}-${eventSubscriber}`
: functionArn

resolve.subscriptionsCredentials = {
applicationLambdaArn: lambdaContext.invokedFunctionArn,
}

resolve.sendSqsMessage = async (destination, parameters) => {
// Send SQS messages within 1 minute with exponential jitter from 64 ms
const getRemainingSendingTime = ((endTime) => endTime - Date.now()).bind(
null,
Date.now() + 60 * 1000
)
const getJitterTime = (attempt) => Math.pow(2, attempt + 7)
resolve.invokeLambdaAsync = async (destination, parameters) => {
await invokeFunction({
Region: region,
FunctionName: destination,
Payload: parameters,
InvocationType: 'Event',
})
}

resolve.sendSqsMessage = async (destination, parameters) => {
const queueUrl = `https://sqs.${region}.amazonaws.com/${accountId}/${destination}`
for (let attempt = 0; getRemainingSendingTime() > 0; attempt++) {
try {
await sendMessage({
Region: region,
QueueUrl: queueUrl,
MessageBody: JSON.stringify(parameters),
})
break
} catch (err) {
await new Promise((resolve) =>
setTimeout(resolve, getJitterTime(attempt))
)
}
}
await sendMessage({
Region: region,
QueueUrl: queueUrl,
MessageBody: JSON.stringify(parameters),
})
}

resolve.invokeBuildAsync = async (parameters) => {
await resolve.sendSqsMessage(
`${userId}-${resolve.eventSubscriberScope}-${parameters.eventSubscriber}`,
parameters
)
}
resolve.invokeBuildAsync = async (parameters) =>
useSqs
? await resolve.sendSqsMessage(
`${userId}-${resolve.eventSubscriberScope}-${parameters.eventSubscriber}`,
parameters
)
: await resolve.invokeLambdaAsync(functionName, {
resolveSource: 'BuildEventSubscriber',
...parameters,
})

resolve.ensureQueue = async (name) => {
if (!useSqs) {
return
}
const getTags = () => {
const tags = {
'resolve-deployment-id': resolve.eventSubscriberScope,
Expand Down Expand Up @@ -206,6 +211,9 @@ const initSubscriber = (resolve, lambdaContext) => {
}

resolve.deleteQueue = async (name) => {
if (!useSqs) {
return
}
const errors = []
let functionTags = null
let UUID = null
Expand Down
20 changes: 20 additions & 0 deletions packages/runtime/runtime/src/cloud/lambda-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ const lambdaWorker = async (resolveBase, lambdaEvent, lambdaContext) => {

log.verbose(`executorResult: ${JSON.stringify(executorResult)}`)

return executorResult
} else if (lambdaEvent.resolveSource === 'BuildEventSubscriber') {
initSubscriber(resolveBase, lambdaContext)
initScheduler(resolve)

log.debug('initializing reSolve framework')
await initResolve(resolve)
log.debug('reSolve framework initialized')

log.debug('identified event source: event-subscriber-direct')
const { resolveSource, ...buildParameters } = lambdaEvent
void resolveSource

const executorResult = await resolve.eventSubscriber.build({
...buildParameters,
coldStart,
})

log.verbose(`executorResult: ${JSON.stringify(executorResult)}`)

return executorResult
} else if (
Array.isArray(lambdaEvent.Records) &&
Expand Down
43 changes: 26 additions & 17 deletions packages/runtime/runtime/src/common/notify-event-subscribers.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
const getNotificationObject = (
eventSubscriber,
eventWithCursor,
isForeign
) => ({
eventSubscriber,
initiator: isForeign ? 'command-foreign' : 'command',
notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`,
sendTime: Date.now(),
...(eventWithCursor != null ? eventWithCursor : {}),
})

const notifyEventSubscriber = async (
resolveBase,
destination,
Expand All @@ -22,14 +34,17 @@ const notifyEventSubscriber = async (
}
case /^arn:aws:sqs:/.test(destination): {
const queueFullName = destination.split(':')[5]
await resolveBase.sendSqsMessage(queueFullName, {
eventSubscriber,
initiator: 'command-foreign',
notificationId: `NT-${Date.now()}${Math.floor(
Math.random() * 1000000
)}`,
sendTime: Date.now(),
...(eventWithCursor != null ? eventWithCursor : {}),
await resolveBase.sendSqsMessage(
queueFullName,
getNotificationObject(eventSubscriber, eventWithCursor, true)
)
break
}
case /^arn:aws:lambda:/.test(destination): {
const lambdaFullName = destination.split(':')[6]
await resolveBase.invokeLambdaAsync(lambdaFullName, {
resolveSource: 'BuildEventSubscriber',
...getNotificationObject(eventSubscriber, eventWithCursor, true),
})
break
}
Expand All @@ -54,15 +69,9 @@ const notifyEventSubscribers = async (resolve, eventWithCursor) => {
const promises = []
for (const { name: eventSubscriber } of resolve.eventListeners.values()) {
promises.push(
resolve.invokeBuildAsync({
eventSubscriber,
initiator: 'command',
notificationId: `NT-${Date.now()}${Math.floor(
Math.random() * 1000000
)}`,
sendTime: Date.now(),
...(eventWithCursor != null ? eventWithCursor : {}),
})
resolve.invokeBuildAsync(
getNotificationObject(eventSubscriber, eventWithCursor)
)
)
}

Expand Down

0 comments on commit 38805a3

Please sign in to comment.