Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(logs): unify HTTP logs and retry #3503

Merged
merged 3 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

_No pre-built integration yet (time to contribute: <48h)_

<Tip>Not seeing the integration you need? [Build your own](https://docs.nango.dev/guides/custom-integration-builder/overview) independently.</Tip>
<Tip>Not seeing the integration you need? [Build your own](https://docs.nango.dev/guides/custom-integrations/overview) independently.</Tip>
2 changes: 1 addition & 1 deletion docs-v2/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,7 @@ paths:
type: string
description: The number of retries in case of failure (with exponential back-off). Optional, default 0.
- in: header
name: Retry On
name: Retry-On
schema:
type: string
description: Comma separated status codes to explicitly retry on in addition to the default 5xx and 429.
Expand Down
31 changes: 16 additions & 15 deletions packages/logs/lib/client.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { MessageRow, MessageRowInsert, MessageMeta, OperationRow } from '@nangohq/types';
import { setRunning, createMessage, setFailed, setCancelled, setTimeouted, setSuccess, update } from './models/messages.js';
import { getFormattedMessage } from './models/helpers.js';
import { errorToObject, metrics, stringifyError } from '@nangohq/utils';
import { isCli, logger } from './utils.js';
import { metrics, stringifyError } from '@nangohq/utils';
import { errorToDocument, isCli, logger, logLevelToLogger } from './utils.js';
import { envs } from './env.js';
import { OtlpSpan } from './otlp/otlpSpan.js';

Expand All @@ -26,11 +26,15 @@ export class LogContextStateless {
}

async log(data: MessageRowInsert): Promise<boolean> {
if (data.error && data.error.constructor.name !== 'Object') {
data.error = errorToDocument(data.error);
}

if (this.logToConsole) {
const obj: Record<string, any> = {};
if (data.error) obj['error'] = data.error;
if (data.meta) obj['meta'] = data.meta;
logger[data.level!](`${this.dryRun ? '[dry] ' : ''}log: ${data.message}`, Object.keys(obj).length > 0 ? obj : undefined);
logger[logLevelToLogger[data.level!]](`${this.dryRun ? '[dry] ' : ''}log: ${data.message}`, Object.keys(obj).length > 0 ? obj : undefined);
}
if (this.dryRun) {
return true;
Expand Down Expand Up @@ -63,34 +67,31 @@ export class LogContextStateless {

async error(message: string, meta: (MessageMeta & { error?: unknown; err?: never; e?: never }) | null = null): Promise<boolean> {
const { error, ...rest } = meta || {};
const err = error ? { name: 'Unknown Error', message: 'unknown error', ...errorToObject(error) } : null;
return await this.log({
type: 'log',
level: 'error',
message,
error: err
? {
name: error instanceof Error ? error.constructor.name : err.name,
message: err.message,
type: 'type' in err ? (err.type as string) : null,
payload: 'payload' in err ? err.payload : null
}
: null,
error: errorToDocument(error),
meta: Object.keys(rest).length > 0 ? rest : null,
source: 'internal'
});
}

async http(
message: string,
data: {
{
error,
...data
}: {
request: MessageRow['request'];
response: MessageRow['response'];
error?: unknown;
meta?: MessageRow['meta'];
level?: MessageRow['level'];
}
): Promise<boolean> {
const level: MessageRow['level'] = data.response && data.response.code >= 400 ? 'error' : 'info';
return await this.log({ type: 'http', level, message, ...data, source: 'internal' });
const level: MessageRow['level'] = data.level ?? (data.response && data.response.code >= 400 ? 'error' : 'info');
return await this.log({ type: 'http', level, message, ...data, error: errorToDocument(error), source: 'internal' });
}

/**
Expand Down
27 changes: 26 additions & 1 deletion packages/logs/lib/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { getLogger } from '@nangohq/utils';
import { errorToObject, getLogger } from '@nangohq/utils';
import { createKVStore } from '@nangohq/kvstore';
import type { KVStore } from '@nangohq/kvstore';
import type { MessageRow } from '@nangohq/types';

export const logger = getLogger('logs');

Expand All @@ -14,3 +15,27 @@ export async function getKVStore() {

return kvstore;
}

export const logLevelToLogger = {
info: 'info',
debug: 'debug',
error: 'error',
warn: 'warning',
http: 'info',
verbose: 'debug',
silly: 'debug'
} as const;

export function errorToDocument(error?: unknown): MessageRow['error'] {
if (!error) {
return null;
}

const err = { name: 'Unknown Error', message: 'unknown error', ...errorToObject(error) };
return {
name: error instanceof Error ? error.constructor.name : err.name,
message: err.message,
type: 'type' in err ? (err.type as string) : null,
payload: 'payload' in err ? err.payload : null
};
}
72 changes: 16 additions & 56 deletions packages/server/lib/controllers/proxy.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class ProxyController {
const valuesToFilter = Object.values(config.connection.credentials);
const safeHeaders = redactHeaders({ headers: requestConfig.headers, valuesToFilter });
const redactedURL = redactURL({ url: requestConfig.url!, valuesToFilter });
await logCtx.http(`${config.method} ${redactedURL} was successful`, {
await logCtx.http(`${config.method} ${redactedURL}`, {
request: {
method: config.method,
url: redactedURL,
Expand Down Expand Up @@ -321,19 +321,7 @@ class ProxyController {
});
}

private async handleErrorResponse({
res,
e,
config,
requestConfig,
logCtx
}: {
res: Response;
e: unknown;
config: ApplicationConstructedProxyConfiguration;
requestConfig: AxiosRequestConfig;
logCtx: LogContext;
}) {
private async handleErrorResponse({ res, e, requestConfig, logCtx }: { res: Response; e: unknown; requestConfig: AxiosRequestConfig; logCtx: LogContext }) {
const error = e as AxiosError;

if (!error.response?.data && error.toJSON) {
Expand All @@ -345,8 +333,6 @@ class ProxyController {
status
} = error.toJSON() as any;

await this.reportError({ error, config, requestConfig, errorContent: message, logCtx });

const errorObject = { message, stack, code, status, url: requestConfig.url, method };

const responseStatus = error.response?.status || 500;
Expand Down Expand Up @@ -388,11 +374,12 @@ class ProxyController {
// Intentionally left blank - errorData will be a string
}
}
void this.reportError({ error, config, requestConfig, errorContent: errorData, logCtx });
void logCtx.error('Failed with this body', { body: errorData });
});
} else {
await logCtx.error('Unknown error');
await logCtx.error('Unknown error', { error });
await logCtx.failed();
res.status(500).send();
}
}

Expand Down Expand Up @@ -434,54 +421,27 @@ class ProxyController {
requestConfig.data = data;
}
const responseStream: AxiosResponse = await backOff(
() => {
return axios(requestConfig);
async () => {
try {
return await axios(requestConfig);
} catch (err) {
const handling = proxyService.logErrorResponse({ error: err, requestConfig, config });
logs.push(...handling.logs);
throw err;
}
},
{ numOfAttempts: Number(config.retries), retry: proxyService.retry.bind(this, config, logs) }
);

await flushLogsBuffer(logs, logCtx);

await this.handleResponse({ res, responseStream, config, requestConfig, logCtx });
} catch (err) {
await this.handleErrorResponse({ res, e: err, requestConfig, config, logCtx });
await flushLogsBuffer(logs, logCtx);
await this.handleErrorResponse({ res, e: err, requestConfig, logCtx });
await logCtx.failed();
metrics.increment(metrics.Types.PROXY_FAILURE);
}
}

private async reportError({
error,
config,
requestConfig,
errorContent,
logCtx
}: {
error: AxiosError;
config: ApplicationConstructedProxyConfiguration;
requestConfig: AxiosRequestConfig;
errorContent: string | Record<string, string>;
logCtx: LogContext;
}) {
const valuesToFilter = Object.values(config.connection.credentials);
const safeHeaders = redactHeaders({ headers: requestConfig.headers, valuesToFilter });
const redactedURL = redactURL({ url: requestConfig.url!, valuesToFilter });

await logCtx.http(`${requestConfig.method} ${redactedURL} failed with status '${error.response?.status}'`, {
meta: {
content: errorContent
},
request: {
method: config.method,
url: redactedURL,
headers: safeHeaders
},
response: {
code: error.response?.status || 500,
headers: error.response?.headers ? redactHeaders({ headers: error.response.headers as Record<string, string> }) : {}
}
});
await logCtx.failed();
}
}

/**
Expand Down
Loading
Loading