Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming: refactor to custom Error classes #28632

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions streaming/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// @ts-check

/**
* Typed as a string because otherwise it's a const string, which means we can't
* override it in let statements.
* @type {string}
*/
const UNEXPECTED_ERROR_MESSAGE = 'An unexpected error occurred';
exports.UNKNOWN_ERROR_MESSAGE = UNEXPECTED_ERROR_MESSAGE;

/**
* Extracts the status and message properties from the error object, if
* available for public use. The `unknown` is for catch statements
* @param {Error | AuthenticationError | RequestError | unknown} err
*/
exports.extractStatusAndMessage = function(err) {
let statusCode = 500;
let errorMessage = UNEXPECTED_ERROR_MESSAGE;
if (err instanceof AuthenticationError || err instanceof RequestError) {
statusCode = err.status;
errorMessage = err.message;
}

return { statusCode, errorMessage };
};

class RequestError extends Error {
/**
* @param {string} message
*/
constructor(message) {
super(message);
this.name = "RequestError";
this.status = 400;
}
}

exports.RequestError = RequestError;

class AuthenticationError extends Error {
/**
* @param {string} message
*/
constructor(message) {
super(message);
this.name = "AuthenticationError";
this.status = 401;
}
}

exports.AuthenticationError = AuthenticationError;
94 changes: 52 additions & 42 deletions streaming/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const pg = require('pg');
const dbUrlToConfig = require('pg-connection-string').parse;
const WebSocket = require('ws');

const errors = require('./errors');
const { AuthenticationError, RequestError } = require('./errors');
const { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } = require('./logging');
const { setupMetrics } = require('./metrics');
const { isTruthy } = require("./utils");
Expand Down Expand Up @@ -241,15 +243,15 @@ const startServer = async () => {
// Unfortunately for using the on('upgrade') setup, we need to manually
// write a HTTP Response to the Socket to close the connection upgrade
// attempt, so the following code is to handle all of that.
const statusCode = err.status ?? 401;
const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);

/** @type {Record<string, string | number | import('pino-http').ReqId>} */
const headers = {
'Connection': 'close',
'Content-Type': 'text/plain',
'Content-Length': 0,
'X-Request-Id': request.id,
'X-Error-Message': err.status ? err.toString() : 'An unexpected error occurred'
'X-Error-Message': errorMessage
};

// Ensure the socket is closed once we've finished writing to it:
Expand All @@ -267,7 +269,7 @@ const startServer = async () => {
statusCode,
headers
}
}, err.toString());
}, errorMessage);

return;
}
Expand Down Expand Up @@ -452,11 +454,7 @@ const startServer = async () => {
}

if (result.rows.length === 0) {
err = new Error('Invalid access token');
// @ts-ignore
err.status = 401;

reject(err);
reject(new AuthenticationError('Invalid access token'));
return;
}

Expand Down Expand Up @@ -487,11 +485,7 @@ const startServer = async () => {
const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];

if (!authorization && !accessToken) {
const err = new Error('Missing access token');
// @ts-ignore
err.status = 401;

reject(err);
reject(new AuthenticationError('Missing access token'));
return;
}

Expand Down Expand Up @@ -568,11 +562,7 @@ const startServer = async () => {
return;
}

const err = new Error('Access token does not cover required scopes');
// @ts-ignore
err.status = 401;

reject(err);
reject(new AuthenticationError('Access token does not have the required scopes'));
});

/**
Expand Down Expand Up @@ -648,11 +638,7 @@ const startServer = async () => {
// If no channelName can be found for the request, then we should terminate
// the connection, as there's nothing to stream back
if (!channelName) {
const err = new Error('Unknown channel requested');
// @ts-ignore
err.status = 400;

next(err);
next(new RequestError('Unknown channel requested'));
return;
}

Expand All @@ -679,10 +665,7 @@ const startServer = async () => {
return;
}

const hasStatusCode = Object.hasOwnProperty.call(err, 'status');
// @ts-ignore
const statusCode = hasStatusCode ? err.status : 500;
const errorMessage = hasStatusCode ? err.toString() : 'An unexpected error occurred';
const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);

res.writeHead(statusCode, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: errorMessage }));
Expand Down Expand Up @@ -1057,7 +1040,7 @@ const startServer = async () => {
};

/**
* @param {any} res
* @param {http.ServerResponse} res
*/
const httpNotFound = res => {
res.writeHead(404, { 'Content-Type': 'application/json' });
Expand All @@ -1072,16 +1055,29 @@ const startServer = async () => {
api.use(errorMiddleware);

api.get('/api/v1/streaming/*', (req, res) => {
// @ts-ignore
channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
const channelName = channelNameFromPath(req);

// FIXME: In theory we'd never actually reach here due to
// authenticationMiddleware catching this case, however, we need to refactor
// how those middlewares work, so I'm adding the extra check in here.
if (!channelName) {
httpNotFound(res);
return;
}

channelNameToIds(req, channelName, req.query).then(({ channelIds, options }) => {
const onSend = streamToHttp(req, res);
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));

// @ts-ignore
streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering);
}).catch(err => {
res.log.info({ err }, 'Subscription error:', err.toString());
httpNotFound(res);
const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);

res.log.info({ err }, 'Eventsource subscription error');

res.writeHead(statusCode, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: errorMessage }));
});
});

Expand Down Expand Up @@ -1210,8 +1206,8 @@ const startServer = async () => {

break;
case 'hashtag':
if (!params.tag || params.tag.length === 0) {
reject('No tag for stream provided');
if (!params.tag) {
reject(new RequestError('Missing tag name parameter'));
} else {
resolve({
channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
Expand All @@ -1221,8 +1217,8 @@ const startServer = async () => {

break;
case 'hashtag:local':
if (!params.tag || params.tag.length === 0) {
reject('No tag for stream provided');
if (!params.tag) {
reject(new RequestError('Missing tag name parameter'));
} else {
resolve({
channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
Expand All @@ -1232,19 +1228,23 @@ const startServer = async () => {

break;
case 'list':
// @ts-ignore
if (!params.list) {
reject(new RequestError('Missing list name parameter'));
return;
}

authorizeListAccess(params.list, req).then(() => {
resolve({
channelIds: [`timeline:list:${params.list}`],
options: { needsFiltering: false },
});
}).catch(() => {
reject('Not authorized to stream this list');
reject(new AuthenticationError('Not authorized to stream this list'));
});

break;
default:
reject('Unknown stream type');
reject(new RequestError('Unknown stream type'));
}
});

Expand Down Expand Up @@ -1298,8 +1298,17 @@ const startServer = async () => {
stopHeartbeat,
};
}).catch(err => {
logger.error({ err }, 'Subscription error');
websocket.send(JSON.stringify({ error: err.toString() }));
const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);

logger.error({ err }, 'Websocket subscription error');

// If we have a socket that is alive and open still, send the error back to the client:
if (websocket.isAlive && websocket.readyState === websocket.OPEN) {
websocket.send(JSON.stringify({
error: errorMessage,
status: statusCode
}));
}
});
};

Expand Down Expand Up @@ -1338,10 +1347,11 @@ const startServer = async () => {
channelNameToIds(request, channelName, params).then(({ channelIds }) => {
removeSubscription(session, channelIds);
}).catch(err => {
logger.error({err}, 'Unsubscribe error');
logger.error({err}, 'Websocket unsubscribe error');

// If we have a socket that is alive and open still, send the error back to the client:
if (websocket.isAlive && websocket.readyState === websocket.OPEN) {
// TODO: Use a better error response here
websocket.send(JSON.stringify({ error: "Error unsubscribing from channel" }));
}
});
Expand Down