Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.12] Handle content stream errors in report pre-deletion (#173792) #174138

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,43 @@ export const commonJobsRouteHandlerFactory = (reporting: ReportingCore) => {
return jobManagementPreRouting(reporting, res, docId, user, counters, async (doc) => {
const docIndex = doc.index;
const stream = await getContentStream(reporting, { id: docId, index: docIndex });
/** @note Overwriting existing content with an empty buffer to remove all the chunks. */
await new Promise<void>((resolve) => {
stream.end('', 'utf8', () => {
resolve();
});
const reportingSetup = reporting.getPluginSetupDeps();
const logger = reportingSetup.logger.get('delete-report');

// An "error" event is emitted if an error is
// passed to the `stream.end` callback from
// the _final method of the ContentStream.
// This event must be handled.
stream.on('error', (err) => {
logger.error(err);
});
await jobsQuery.delete(docIndex, docId);

return res.ok({
body: { deleted: true },
});
try {
// Overwriting existing content with an
// empty buffer to remove all the chunks.
await new Promise<void>((resolve, reject) => {
stream.end('', 'utf8', (error?: Error) => {
if (error) {
// handle error that could be thrown
// from the _write method of the ContentStream
reject(error);
} else {
resolve();
}
});
});

await jobsQuery.delete(docIndex, docId);

return res.ok({
body: { deleted: true },
});
} catch (error) {
logger.error(error);
return res.customError({
statusCode: 500,
});
}
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { registerJobInfoRoutesInternal as registerJobInfoRoutes } from '../jobs'

type SetupServerReturn = Awaited<ReturnType<typeof setupServer>>;

describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
describe(`Reporting Job Management Routes: Internal`, () => {
const reportingSymbol = Symbol('reporting');
let server: SetupServerReturn['server'];
let usageCounter: IUsageCounter;
Expand Down Expand Up @@ -144,148 +144,148 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
await server.stop();
});

it('fails on malformed download IDs', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);
describe('download report', () => {
it('fails on malformed download IDs', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
.expect(400)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
)
);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
.expect(400)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
)
);
});

it('fails on unauthenticated users', async () => {
mockStartDeps = await createMockPluginStart(
{
licensing: {
...licensingMock.createStart(),
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
it('fails on unauthenticated users', async () => {
mockStartDeps = await createMockPluginStart(
{
licensing: {
...licensingMock.createStart(),
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
},
security: { authc: { getCurrentUser: () => undefined } },
},
security: { authc: { getCurrentUser: () => undefined } },
},
mockConfigSchema
);
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
registerJobInfoRoutes(core);
mockConfigSchema
);
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
.expect(401)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
.expect(401)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
);
});

it('returns 404 if job not found', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);
it('returns 404 if job not found', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(404);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(404);
});

it('returns a 403 if not a valid job type', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'invalidJobType',
payload: { title: 'invalid!' },
})
);
registerJobInfoRoutes(core);
it('returns a 403 if not a valid job type', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'invalidJobType',
payload: { title: 'invalid!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(403);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(403);
});

it(`returns job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeBase64Encoded,
payload: {}, // payload is irrelevant
})
);
it(`returns job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeBase64Encoded,
payload: {}, // payload is irrelevant
})
);

registerJobInfoRoutes(core);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(200);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(200);
});

it(`returns 403 if a user cannot view a job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'customForbiddenJobType',
payload: {}, // payload is irrelevant
})
);
it(`returns 403 if a user cannot view a job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'customForbiddenJobType',
payload: {}, // payload is irrelevant
})
);

registerJobInfoRoutes(core);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(403);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(403);
});

it('when a job is incomplete', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'pending',
payload: { title: 'incomplete!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(503)
.expect('Content-Type', 'text/plain; charset=utf-8')
.expect('Retry-After', '30')
.then(({ text }) => expect(text).toEqual('pending'));
});
it('when a job is incomplete', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'pending',
payload: { title: 'incomplete!' },
})
);
registerJobInfoRoutes(core);

it('when a job fails', async () => {
mockEsClient.search.mockResponse(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'failed',
output: { content: 'job failure message' },
payload: { title: 'failing job!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8')
.then(({ body }) =>
expect(body.message).toEqual('Reporting generation failed: job failure message')
await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(503)
.expect('Content-Type', 'text/plain; charset=utf-8')
.expect('Retry-After', '30')
.then(({ text }) => expect(text).toEqual('pending'));
});

it('when a job fails', async () => {
mockEsClient.search.mockResponse(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'failed',
output: { content: 'job failure message' },
payload: { title: 'failing job!' },
})
);
});
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8')
.then(({ body }) =>
expect(body.message).toEqual('Reporting generation failed: job failure message')
);
});

describe('successful downloads', () => {
it('when a known job-type is complete', async () => {
mockEsClient.search.mockResponseOnce(getCompleteHits());
registerJobInfoRoutes(core);
Expand Down Expand Up @@ -483,4 +483,28 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
});
});
});

describe('delete report', () => {
it('handles content stream errors', async () => {
stream = new Readable({
read() {
this.push('test');
this.push(null);
},
}) as typeof stream;
stream.end = jest.fn().mockImplementation((_name, _encoding, callback) => {
callback(new Error('An error occurred in ending the content stream'));
});

(getContentStream as jest.MockedFunction<typeof getContentStream>).mockResolvedValue(stream);
mockEsClient.search.mockResponseOnce(getCompleteHits());
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.delete(`${INTERNAL_ROUTES.JOBS.DELETE_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8');
});
});
});
Loading