Skip to content

Commit

Permalink
fix(otlp-exporter-base)!: ensure we do not retry after the timeout ha…
Browse files Browse the repository at this point in the history
…s elapsed (open-telemetry#4889)
  • Loading branch information
pichlermarc authored Aug 9, 2024
1 parent 01cea7c commit 5cc3dc2
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
import { ExportResponse } from './export-response';

export interface IExporterTransport {
send(data: Uint8Array): Promise<ExportResponse>;
send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse>;
shutdown(): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { ISerializer } from '@opentelemetry/otlp-transformer';
import { IExporterTransport } from '../../exporter-transport';
import { createHttpExporterTransport } from './http-exporter-transport';
import { OTLPExporterError } from '../../types';
import { createRetryingTransport } from '../../retryable-transport';
import { createRetryingTransport } from '../../retrying-transport';

/**
* Collector Metric Exporter abstract base class
Expand Down Expand Up @@ -76,7 +76,6 @@ export abstract class OTLPExporterNodeBase<
signalSpecificHeaders
),
url: this.url,
timeoutMillis: this.timeoutMillis,
}),
});
}
Expand All @@ -100,16 +99,19 @@ export abstract class OTLPExporterNodeBase<
return;
}

const promise = this._transport.send(data).then(response => {
if (response.status === 'success') {
onSuccess();
return;
}
if (response.status === 'failure' && response.error) {
onError(response.error);
}
onError(new OTLPExporterError('Export failed with unknown error'));
}, onError);
const promise = this._transport
.send(data, this.timeoutMillis)
.then(response => {
if (response.status === 'success') {
onSuccess();
} else if (response.status === 'failure' && response.error) {
onError(response.error);
} else if (response.status === 'retryable') {
onError(new OTLPExporterError('Export failed with retryable status'));
} else {
onError(new OTLPExporterError('Export failed with unknown error'));
}
}, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HttpExporterTransport implements IExporterTransport {

constructor(private _parameters: HttpRequestParameters) {}

async send(data: Uint8Array): Promise<ExportResponse> {
async send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
if (this._send == null) {
// Lazy require to ensure that http/https is not required before instrumentations can wrap it.
const {
Expand All @@ -50,9 +50,15 @@ class HttpExporterTransport implements IExporterTransport {
return new Promise<ExportResponse>(resolve => {
// this will always be defined
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._send?.(this._parameters, this._agent!, data, result => {
resolve(result);
});
this._send?.(
this._parameters,
this._agent!,
data,
result => {
resolve(result);
},
timeoutMillis
);
});
}
shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ export type sendWithHttp = (
params: HttpRequestParameters,
agent: http.Agent | https.Agent,
data: Uint8Array,
onDone: (response: ExportResponse) => void
onDone: (response: ExportResponse) => void,
timeoutMillis: number
) => void;

export interface HttpRequestParameters {
timeoutMillis: number;
url: string;
headers: Record<string, string>;
compression: 'gzip' | 'none';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export function sendWithHttp(
params: HttpRequestParameters,
agent: http.Agent | https.Agent,
data: Uint8Array,
onDone: (response: ExportResponse) => void
onDone: (response: ExportResponse) => void,
timeoutMillis: number
): void {
const parsedUrl = new URL(params.url);
const nodeVersion = Number(process.versions.node.split('.')[0]);
Expand Down Expand Up @@ -86,7 +87,7 @@ export function sendWithHttp(
});
});

req.setTimeout(params.timeoutMillis, () => {
req.setTimeout(timeoutMillis, () => {
req.destroy();
onDone({
status: 'failure',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,42 @@ function getJitter() {
class RetryingTransport implements IExporterTransport {
constructor(private _transport: IExporterTransport) {}

private retry(data: Uint8Array, inMillis: number): Promise<ExportResponse> {
private retry(
data: Uint8Array,
timeoutMillis: number,
inMillis: number
): Promise<ExportResponse> {
return new Promise((resolve, reject) => {
setTimeout(() => {
this._transport.send(data).then(resolve, reject);
this._transport.send(data, timeoutMillis).then(resolve, reject);
}, inMillis);
});
}

async send(data: Uint8Array): Promise<ExportResponse> {
let result = await this._transport.send(data);
async send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
const deadline = Date.now() + timeoutMillis;
let result = await this._transport.send(data, timeoutMillis);
let attempts = MAX_ATTEMPTS;
let nextBackoff = INITIAL_BACKOFF;

while (result.status === 'retryable' && attempts > 0) {
attempts--;
const backoff = Math.min(nextBackoff, MAX_BACKOFF) + getJitter();

// use maximum of computed backoff and 0 to avoid negative timeouts
const backoff = Math.max(
Math.min(nextBackoff, MAX_BACKOFF) + getJitter(),
0
);
nextBackoff = nextBackoff * BACKOFF_MULTIPLIER;
result = await this.retry(data, result.retryInMillis ?? backoff);
const retryInMillis = result.retryInMillis ?? backoff;

// return when expected retry time is after the export deadline.
const remainingTimeoutMillis = deadline - Date.now();
if (retryInMillis > remainingTimeoutMillis) {
return result;
}

result = await this.retry(data, remainingTimeoutMillis, retryInMillis);
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import * as sinon from 'sinon';
import * as assert from 'assert';
import { IExporterTransport } from '../../src';
import { createRetryingTransport } from '../../src/retryable-transport';
import { createRetryingTransport } from '../../src/retrying-transport';
import { ExportResponse } from '../../src';
import { assertRejects } from '../testHelper';

const timeoutMillis = 1000000;

describe('RetryingTransport', function () {
describe('send', function () {
it('does not retry when underlying transport succeeds', async function () {
Expand All @@ -39,10 +41,14 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
const actualResponse = await transport.send(mockData);
const actualResponse = await transport.send(mockData, timeoutMillis);

// assert
sinon.assert.calledOnceWithExactly(transportStubs.send, mockData);
sinon.assert.calledOnceWithExactly(
transportStubs.send,
mockData,
timeoutMillis
);
assert.deepEqual(actualResponse, expectedResponse);
});

Expand All @@ -63,10 +69,14 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
const actualResponse = await transport.send(mockData);
const actualResponse = await transport.send(mockData, timeoutMillis);

// assert
sinon.assert.calledOnceWithExactly(transportStubs.send, mockData);
sinon.assert.calledOnceWithExactly(
transportStubs.send,
mockData,
timeoutMillis
);
assert.deepEqual(actualResponse, expectedResponse);
});

Expand All @@ -84,10 +94,14 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
await assertRejects(() => transport.send(mockData));
await assertRejects(() => transport.send(mockData, timeoutMillis));

// assert
sinon.assert.calledOnceWithExactly(transportStubs.send, mockData);
sinon.assert.calledOnceWithExactly(
transportStubs.send,
mockData,
timeoutMillis
);
});

it('does retry when the underlying transport returns retryable', async function () {
Expand All @@ -113,11 +127,19 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
const actualResponse = await transport.send(mockData);
const actualResponse = await transport.send(mockData, timeoutMillis);

// assert
sinon.assert.calledTwice(transportStubs.send);
sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData);
sinon.assert.alwaysCalledWithMatch(
transportStubs.send,
mockData,
sinon.match.number.and(
sinon.match(value => {
return value <= timeoutMillis;
})
)
);
assert.deepEqual(actualResponse, successResponse);
});

Expand All @@ -143,11 +165,19 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
await assertRejects(() => transport.send(mockData));
await assertRejects(() => transport.send(mockData, timeoutMillis));

// assert
sinon.assert.calledTwice(transportStubs.send);
sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData);
sinon.assert.alwaysCalledWithMatch(
transportStubs.send,
mockData,
sinon.match.number.and(
sinon.match(value => {
return value <= timeoutMillis;
})
)
);
});

it('does retry 5 times, then resolves as retryable', async function () {
Expand All @@ -169,11 +199,48 @@ describe('RetryingTransport', function () {
const transport = createRetryingTransport({ transport: mockTransport });

// act
const result = await transport.send(mockData);
const result = await transport.send(mockData, timeoutMillis);

// assert
sinon.assert.callCount(transportStubs.send, 6); // 1 initial try and 5 retries
sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData);
sinon.assert.alwaysCalledWithMatch(
transportStubs.send,
mockData,
sinon.match.number.and(
sinon.match(value => {
return value <= timeoutMillis;
})
)
);
assert.strictEqual(result, retryResponse);
});

it('does not retry when retryInMillis takes place after timeoutMillis', async function () {
// arrange
const retryResponse: ExportResponse = {
status: 'retryable',
retryInMillis: timeoutMillis + 100,
};

const mockData = Uint8Array.from([1, 2, 3]);

const transportStubs = {
send: sinon.stub().resolves(retryResponse),
shutdown: sinon.stub(),
};
const mockTransport = <IExporterTransport>transportStubs;
const transport = createRetryingTransport({ transport: mockTransport });

// act
const result = await transport.send(mockData, timeoutMillis);

// assert
// initial try, no retries.
sinon.assert.calledOnceWithExactly(
transportStubs.send,
mockData,
timeoutMillis
);
assert.strictEqual(result, retryResponse);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ export abstract class OTLPGRPCExporterNodeBase<
grpcName: grpcName,
grpcPath: grpcPath,
metadata: metadataProvider,
timeoutMillis: this.timeoutMillis,
});
}

Expand Down Expand Up @@ -126,16 +125,19 @@ export abstract class OTLPGRPCExporterNodeBase<
return;
}

const promise = this._transport.send(data).then(response => {
if (response.status === 'success') {
onSuccess();
return;
}
if (response.status === 'failure' && response.error) {
onError(response.error);
}
onError(new OTLPExporterError('Export failed with unknown error'));
}, onError);
const promise = this._transport
.send(data, this.timeoutMillis)
.then(response => {
if (response.status === 'success') {
onSuccess();
} else if (response.status === 'failure' && response.error) {
onError(response.error);
} else if (response.status === 'retryable') {
onError(new OTLPExporterError('Export failed with retryable status'));
} else {
onError(new OTLPExporterError('Export failed with unknown error'));
}
}, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ export interface GrpcExporterTransportParameters {
*/
metadata: () => Metadata;
compression: 'gzip' | 'none';
timeoutMillis: number;
}

export class GrpcExporterTransport implements IExporterTransport {
Expand All @@ -101,7 +100,7 @@ export class GrpcExporterTransport implements IExporterTransport {
this._client?.close();
}

send(data: Uint8Array): Promise<ExportResponse> {
send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
// We need to make a for gRPC
const buffer = Buffer.from(data);

Expand Down Expand Up @@ -145,9 +144,7 @@ export class GrpcExporterTransport implements IExporterTransport {
}

return new Promise<ExportResponse>(resolve => {
// this will always be defined
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const deadline = Date.now() + this._parameters.timeoutMillis;
const deadline = Date.now() + timeoutMillis;

// this should never happen
if (this._metadata == null) {
Expand Down
Loading

0 comments on commit 5cc3dc2

Please sign in to comment.