Skip to content

Commit

Permalink
fix(server/diffdownload): fix the bug where client connection close e…
Browse files Browse the repository at this point in the history
…vents were not propogated to the database stream connection (#3921)

* WIP: trying to catch a bug

* great success

* reinstate gzip

* Remove feature flag

* remove stream-chain dependency

* remove superfluous logging lines

* re-align with original where possible

* re-align package.json

* More re-alignment with main branch before the previous 'fix'

* A smaller failing example
  • Loading branch information
iainsproat authored Feb 4, 2025
1 parent 272c136 commit 55315fe
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 141 deletions.
80 changes: 17 additions & 63 deletions packages/server/modules/core/rest/diffDownload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import zlib from 'zlib'
import { corsMiddleware } from '@/modules/core/configs/cors'
import type { Application } from 'express'
import { SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream'
import { Duplex, PassThrough, pipeline } from 'stream'
import { pipeline, PassThrough } from 'stream'
import { getObjectsStreamFactory } from '@/modules/core/repositories/objects'
import { db } from '@/db/knex'
import { validatePermissionsReadStreamFactory } from '@/modules/core/services/streams/auth'
Expand All @@ -11,13 +11,8 @@ import { authorizeResolver, validateScopes } from '@/modules/shared'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { UserInputError } from '@/modules/core/errors/userinput'
import { ensureError } from '@speckle/shared'
import chain from 'stream-chain'
import { get } from 'lodash'
import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'
import { DatabaseError } from '@/modules/shared/errors'

const { FF_OBJECTS_STREAMING_FIX } = getFeatureFlags()

export default (app: Application) => {
const validatePermissionsReadStream = validatePermissionsReadStreamFactory({
getStream: getStreamFactory({ db }),
Expand All @@ -32,6 +27,7 @@ export default (app: Application) => {
userId: req.context.userId || '-',
streamId: req.params.streamId
})

const hasStreamAccess = await validatePermissionsReadStream(
req.params.streamId,
req
Expand Down Expand Up @@ -61,70 +57,32 @@ export default (app: Application) => {
// "output" stream, connected to res with `pipeline` (auto-closing res)
const speckleObjStream = new SpeckleObjectsStream(simpleText)
const gzipStream = zlib.createGzip()

let chainPipeline: Duplex

if (FF_OBJECTS_STREAMING_FIX) {
// From node documentation: https://nodejs.org/docs/latest-v18.x/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback
// > stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.
// As workaround, we are using chain from 'stream-chain'
// Some more conversation around this: https://stackoverflow.com/questions/61072482/node-closing-streams-properly-after-pipeline
chainPipeline = chain([
speckleObjStream,
gzipStream,
new PassThrough({ highWaterMark: 16384 * 31 }),
res
])
chainPipeline.on('error', (err) => {
pipeline(
speckleObjStream,
gzipStream,
new PassThrough({ highWaterMark: 16384 * 31 }),
res,
(err) => {
if (err) {
switch (get(err, 'code')) {
switch (err.code) {
case 'ERR_STREAM_PREMATURE_CLOSE':
req.log.info({ err }, 'Stream to client has prematurely closed')
req.log.debug({ err }, 'Stream to client has prematurely closed')
break
default:
req.log.error(err, 'App error streaming objects')
break
}
return
}

req.log.info(
{
childCount: childrenList.length,
mbWritten: gzipStream.bytesWritten / 1000000
},
'Encountered error. Prior to error, we streamed {childCount} objects (size: {mbWritten} MB)'
'Streamed {childCount} objects (size: {mbWritten} MB)'
)
})
} else {
pipeline(
speckleObjStream,
gzipStream,
new PassThrough({ highWaterMark: 16384 * 31 }),
res,
(err) => {
if (err) {
switch (err.code) {
case 'ERR_STREAM_PREMATURE_CLOSE':
req.log.info({ err }, 'Stream to client has prematurely closed')
break
default:
req.log.error(err, 'App error streaming objects')
break
}
return
}

req.log.info(
{
childCount: childrenList.length,
mbWritten: gzipStream.bytesWritten / 1000000
},
'Streamed {childCount} objects (size: {mbWritten} MB)'
)
}
)
}
}
)

const cSize = 1000
try {
Expand All @@ -138,19 +96,15 @@ export default (app: Application) => {
})
// https://knexjs.org/faq/recipes.html#manually-closing-streams
// https://github.com/knex/knex/issues/2324
req.on('close', () => {
dbStream.end.bind(dbStream)
dbStream.destroy.bind(dbStream)
res.on('close', () => {
dbStream.end()
dbStream.destroy()
})

await new Promise((resolve, reject) => {
if (FF_OBJECTS_STREAMING_FIX) {
dbStream.pipe(chainPipeline, { end: false })
} else {
dbStream.pipe(speckleObjStream, { end: false })
}
dbStream.once('end', resolve)
dbStream.once('error', reject)
dbStream.pipe(speckleObjStream, { end: false }) // will not call end on the speckleObjStream, so it remains open for the next batch of objects
})
}
} catch (ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import {
storeObjectsIfNotFoundFactory
} from '@/modules/core/repositories/objects'
import { expect } from 'chai'
import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'
import { parse, Parser } from 'csv-parse'
import { createReadStream } from 'fs'

const getServerInfo = getServerInfoFactory({ db })
const getUser = legacyGetUserFactory({ db })
Expand Down Expand Up @@ -96,68 +97,149 @@ const createObjectsBatched = createObjectsBatchedFactory({
storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db })
})

const { FF_OBJECTS_STREAMING_FIX } = getFeatureFlags()

describe('Objects REST @core', () => {
describe('Objects streaming REST @core', () => {
let serverAddress: string
before(async () => {
const ctx = await beforeEachContext()
;({ serverAddress } = await initializeTestServer(ctx))
})
;(FF_OBJECTS_STREAMING_FIX ? it : it.skip)(
'should close database connections if client connection is prematurely closed',
async () => {
const userId = await createUser({
name: 'emails user',
email: createRandomEmail(),
password: createRandomPassword()

it('should close database connections if client connection is prematurely closed', async () => {
const userId = await createUser({
name: 'emails user',
email: createRandomEmail(),
password: createRandomPassword()
})
const user = await getUser(userId)

const project = {
id: '',
name: 'test project',
ownerId: userId
}
await createTestStream(project as unknown as BasicTestStream, user)

const token = `Bearer ${await createPersonalAccessToken(
user.id,
'test token user A',
[
Scopes.Streams.Read,
Scopes.Streams.Write,
Scopes.Users.Read,
Scopes.Users.Email,
Scopes.Tokens.Write,
Scopes.Tokens.Read,
Scopes.Profile.Read,
Scopes.Profile.Email
]
)}`

const manyObjs: { commit: RawSpeckleObject; objs: RawSpeckleObject[] } =
generateManyObjects(3333, 'perlin merlin magic')
const objsIds = manyObjs.objs.map((o) => o.id)

await createObjectsBatched({ streamId: project.id, objects: manyObjs.objs })
for (let i = 0; i < 4; i++) {
forceCloseStreamingConnection({
serverAddress,
projectId: project.id,
token,
objsIds
})
const user = await getUser(userId)

const project = {
id: '',
name: 'test project',
ownerId: userId
}
await createTestStream(project as unknown as BasicTestStream, user)

const token = `Bearer ${await createPersonalAccessToken(
user.id,
'test token user A',
[
Scopes.Streams.Read,
Scopes.Streams.Write,
Scopes.Users.Read,
Scopes.Users.Email,
Scopes.Tokens.Write,
Scopes.Tokens.Read,
Scopes.Profile.Read,
Scopes.Profile.Email
]
)}`

const manyObjs: { commit: RawSpeckleObject; objs: RawSpeckleObject[] } =
generateManyObjects(3333, 'perlin merlin magic')
const objsIds = manyObjs.objs.map((o) => o.id)

await createObjectsBatched({ streamId: project.id, objects: manyObjs.objs })
for (let i = 0; i < 4; i++) {
forceCloseStreamingConnection({
serverAddress,
projectId: project.id,
token,
objsIds
}

//sleep for a bit to allow the server to close the connections
await new Promise((r) => setTimeout(r, 3000))
const gaugeContents = await determineRemainingDatabaseConnectionCapacity({
serverAddress
})
expect(parseInt(gaugeContents), gaugeContents).to.gte(4) //expect all connections to become available again after the client closes them
})

it('should stream model with some failing feature', async () => {
const userId = await createUser({
name: 'emails user',
email: createRandomEmail(),
password: createRandomPassword()
})
const user = await getUser(userId)

const project = {
id: '',
name: 'test project',
ownerId: userId
}
await createTestStream(project as unknown as BasicTestStream, user)

const token = `Bearer ${await createPersonalAccessToken(
user.id,
'test token user A',
[
Scopes.Streams.Read,
Scopes.Streams.Write,
Scopes.Users.Read,
Scopes.Users.Email,
Scopes.Tokens.Write,
Scopes.Tokens.Read,
Scopes.Profile.Read,
Scopes.Profile.Email
]
)}`

// import CSV file
const csvStream = createReadStream(
//FIXME this relies on running this test from `packages/server` directory
`${process.cwd()}/test/assets/failing-streaming-model-f547dc4e88.csv`
)
// eslint-disable-next-line camelcase
.pipe(parse({ delimiter: ',', from_line: 2 }))

function csvParserAsPromise(
stream: Parser
): Promise<{ manyObjs: RawSpeckleObject[]; objsIds: string[] }> {
const manyObjs: RawSpeckleObject[] = []
const objsIds: string[] = []
return new Promise((resolve, reject) => {
stream.on('data', (row: string[]) => {
const obj = JSON.parse(row[1])
manyObjs.push(obj)
objsIds.push(row[0])
})
}
stream.on('end', () => resolve({ manyObjs, objsIds }))
stream.on('error', (error: unknown) => reject(error))
})
}

const { manyObjs, objsIds } = await csvParserAsPromise(csvStream)

const preGaugeContents = await determineRemainingDatabaseConnectionCapacity({
serverAddress
})
expect(
parseInt(preGaugeContents),
`Prior to test, we did not have sufficient DB connections free: ${preGaugeContents}`
).to.gte(4) // all connections are available before the test

//sleep for a bit to allow the server to close the connections
await new Promise((r) => setTimeout(r, 3000))
const gaugeContents = await determineRemainingDatabaseConnectionCapacity({
serverAddress
await createObjectsBatched({ streamId: project.id, objects: manyObjs })
for (let i = 0; i < 1; i++) {
forceCloseStreamingConnection({
serverAddress,
projectId: project.id,
token,
objsIds
})
expect(parseInt(gaugeContents), gaugeContents).to.gte(4) //expect all connections to become available again after the client closes them
}
)

//sleep for a bit to allow the server to close the connections
await new Promise((r) => setTimeout(r, 3000))
const postGaugeContents = await determineRemainingDatabaseConnectionCapacity({
serverAddress
})
expect(
parseInt(postGaugeContents),
`After the test, we did not have sufficient DB connections free: ${postGaugeContents}`
).to.gte(4) //expect all connections to become available again after the client closes them
}).timeout(50000)
})

const forceCloseStreamingConnection = async (params: {
Expand Down
1 change: 0 additions & 1 deletion packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
"response-time": "^2.3.2",
"sanitize-html": "^2.7.1",
"sharp": "^0.32.6",
"stream-chain": "^3.4.0",
"string-pixel-width": "^1.10.0",
"stripe": "^17.1.0",
"subscriptions-transport-ws": "^0.11.0",
Expand Down
104 changes: 104 additions & 0 deletions packages/server/test/assets/failing-streaming-model-f547dc4e88.csv

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions packages/shared/src/environment/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ const parseFeatureFlags = () => {
FF_FORCE_ONBOARDING: {
schema: z.boolean(),
defaults: { production: false, _: false }
},
// Fixes the streaming of objects by ensuring that the database stream is closed properly
FF_OBJECTS_STREAMING_FIX: {
schema: z.boolean(),
defaults: { production: false, _: false }
}
})

Expand Down Expand Up @@ -97,7 +92,6 @@ export function getFeatureFlags(): {
FF_FILEIMPORT_IFC_DOTNET_ENABLED: boolean
FF_FORCE_EMAIL_VERIFICATION: boolean
FF_FORCE_ONBOARDING: boolean
FF_OBJECTS_STREAMING_FIX: boolean
} {
if (!parsedFlags) parsedFlags = parseFeatureFlags()
return parsedFlags
Expand Down
3 changes: 0 additions & 3 deletions utils/helm/speckle-server/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,6 @@ Generate the environment variables for Speckle server and Speckle objects deploy
- name: FF_FORCE_ONBOARDING
value: {{ .Values.featureFlags.forceOnboarding | quote }}

- name: FF_OBJECTS_STREAMING_FIX
value: {{ .Values.featureFlags.objectsStreamingFix | quote }}

{{- if .Values.featureFlags.billingIntegrationEnabled }}
- name: STRIPE_API_KEY
valueFrom:
Expand Down
5 changes: 0 additions & 5 deletions utils/helm/speckle-server/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@
"type": "boolean",
"description": "Forces onboarding for all users",
"default": false
},
"objectsStreamingFix": {
"type": "boolean",
"description": "Enables the fix for the objects streaming issue when client prematurely closes the connection",
"default": false
}
}
},
Expand Down
Loading

0 comments on commit 55315fe

Please sign in to comment.