Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: flush pending GQL executor jobs before graceful shutdown #10821

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions packages/gql-executor/gqlExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import executeGraphQL from '../server/graphql/executeGraphQL'
import '../server/initSentry'
import '../server/monkeyPatchFetch'
import {GQLRequest} from '../server/types/custom'
import {Logger} from '../server/utils/Logger'
import RedisInstance from '../server/utils/RedisInstance'
import RedisStream from './RedisStream'
import {Logger} from '../server/utils/Logger'

tracer.init({
service: `gql`,
Expand All @@ -29,26 +29,43 @@ const run = async () => {
const publisher = new RedisInstance('gql_pub')
const subscriber = new RedisInstance('gql_sub')
const executorChannel = GQLExecutorChannelId.join(SERVER_ID!)
let activeJobCount = 0

// on shutdown, remove consumer from the group
// on shutdown, remove consumer from the group wait for current jobs to complete
process.on('SIGTERM', async (signal) => {
Logger.log(`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`)
const MAX_SHUTDOWN_TIME = 40000
const SHUTDOWN_CHECK_INTERVAL = 1000
let start = Date.now()
Logger.log(
`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`
)

await publisher.xgroup(
'DELCONSUMER',
ServerChannel.GQL_EXECUTOR_STREAM,
ServerChannel.GQL_EXECUTOR_CONSUMER_GROUP,
executorChannel
)
Logger.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`)
process.exit()

setInterval(() => {
if (Date.now() - start >= MAX_SHUTDOWN_TIME) {
Logger.log(`Server ID: ${SERVER_ID}. Graceful shutdown timed out, exiting.`)
process.exit()
} else if (activeJobCount <= 0) {
Logger.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`)
process.exit()
}
}, SHUTDOWN_CHECK_INTERVAL)
})

// subscribe to direct messages
const onMessage = async (_channel: string, message: string) => {
const {jobId, socketServerId, request} = JSON.parse(message) as PubSubPromiseMessage
activeJobCount++
const response = await executeGraphQL(request)
const channel = SocketServerChannelId.join(socketServerId)
publisher.publish(channel, JSON.stringify({response, jobId}))
activeJobCount--
}

subscriber.on('message', onMessage)
Expand Down
4 changes: 4 additions & 0 deletions packages/server/utils/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ function trace(level: LogLevel, message: any, ...optionalParameters: any[]) {

if (span) {
tracer.inject(span.context(), formats.LOG, record)
const tags = optionalParameters.find((param) => param.tags) as Record<string, any> | undefined
if (tags && typeof tags === 'object') {
span.addTags(tags)
}
}

LogFun[level](JSON.stringify(record))
Expand Down
4 changes: 3 additions & 1 deletion packages/server/utils/publishInternalGQL.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ const publishInternalGQL = async <NarrowResponse>(options: Options) => {
userId: getUserId(authToken),
tags: {
authToken: JSON.stringify(authToken),
query: query || ''
query: query || '',
variables: JSON.stringify(variables),
socketServerId: socketId
}
})
} else {
Expand Down
2 changes: 1 addition & 1 deletion packages/server/utils/sendToSentry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ export interface SentryOptions {

// Even though this is a promise we'll never need to await it, so we'll never need to worry about catching an error
const sendToSentry = async (error: Error, options: SentryOptions = {}) => {
Logger.log('SEND TO SENTRY', error || JSON.stringify(error))
const {sampleRate, tags, extras, userId, ip} = options
Logger.log('SEND TO SENTRY', error || JSON.stringify(error), {tags})
if (sampleRate && Math.random() > sampleRate) return
const fullUser = userId ? await getUserById(userId) : null
const user = fullUser ? {id: fullUser.id, email: fullUser.email} : null
Expand Down