Skip to content

Commit

Permalink
Use promises for setup/writing for logging backend
Browse files Browse the repository at this point in the history
The balena logging backend now uses async functions to setup the
connection and write messages to the request stream. This adds some
backpressure on `log` calls by by the log monitor module, to prevent a
very agressive container causing the supervisor to waste CPU cycles just
dropping messages.

Change-type: patch
  • Loading branch information
pipex committed Jul 30, 2024
1 parent f3fcb0d commit 8bc0875
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 83 deletions.
184 changes: 105 additions & 79 deletions src/logging/balena-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import _ from 'lodash';
import stream from 'stream';
import url from 'url';
import zlib from 'zlib';
import { setTimeout } from 'timers/promises';

import type { LogMessage } from './log-backend';
import { LogBackend } from './log-backend';
Expand All @@ -15,7 +16,6 @@ const MIN_COOLDOWN_PERIOD = 5 * 1000; // 5 seconds
const MAX_COOLDOWN_PERIOD = 300 * 1000; // 5 minutes
const KEEPALIVE_TIMEOUT = 60 * 1000;
const RESPONSE_GRACE_PERIOD = 5 * 1000;

const MAX_LOG_LENGTH = 10 * 1000;
const MAX_PENDING_BYTES = 256 * 1024;

Expand All @@ -31,7 +31,6 @@ export class BalenaLogBackend extends LogBackend {
private gzip: zlib.Gzip | null = null;
private opts: Options;
private stream: stream.PassThrough;
private timeout: NodeJS.Timeout;

public initialised = false;

Expand Down Expand Up @@ -61,7 +60,7 @@ export class BalenaLogBackend extends LogBackend {
this.writable = true;
this.flush();
if (this.dropCount > 0) {
this.write({
this.tryWrite({
message: `Warning: Suppressed ${this.dropCount} message(s) due to high load`,
timestamp: Date.now(),
isSystem: true,
Expand All @@ -82,10 +81,8 @@ export class BalenaLogBackend extends LogBackend {
// been provisioned)
// TODO: the backend should not be aware of unmanaged or publish state
if (this.unmanaged || !this.publishEnabled || !this.initialised) {
return;
}

if (!_.isObject(message)) {
// Yield control to the event loop
await setTimeout(0);
return;
}

Expand All @@ -98,7 +95,7 @@ export class BalenaLogBackend extends LogBackend {
omission: '[...]',
});

this.write(message);
await this.write(message);
}

public assignFields(endpoint: string, uuid: string, deviceApiKey: string) {
Expand All @@ -115,14 +112,9 @@ export class BalenaLogBackend extends LogBackend {

private lastSetupAttempt = 0;
private setupFailures = 0;
private setupPending = false;
private setup() {
if (this.setupPending || this.req != null) {
// If we already have a setup pending, or we are already setup, then do nothing
return;
}
this.setupPending = true;
private setupPromise: Promise<void> | null = null;

private async trySetup() {
// Work out the total delay we need
const totalDelay = Math.min(
2 ** this.setupFailures * MIN_COOLDOWN_PERIOD,
Expand All @@ -133,62 +125,83 @@ export class BalenaLogBackend extends LogBackend {
// The difference between the two is the actual delay we want
const delay = Math.max(totalDelay - alreadyDelayedBy, 0);

setTimeout(() => {
this.setupPending = false;
this.lastSetupAttempt = Date.now();

const setupFailed = () => {
this.setupFailures++;
this.teardown();
};

this.req = https.request(this.opts);

// Since we haven't sent the request body yet, and never will,the
// only reason for the server to prematurely respond is to
// communicate an error. So teardown the connection immediately
this.req.on('response', (res) => {
log.error(
'LogBackend: server responded with status code:',
res.statusCode,
);
setupFailed();
});
await setTimeout(delay);

this.req.on('timeout', setupFailed);
this.req.on('close', setupFailed);
this.req.on('error', (err) => {
log.error('LogBackend: unexpected error:', err);
setupFailed();
});
this.lastSetupAttempt = Date.now();

// Immediately flush the headers. This gives a chance to the server to
// respond with potential errors such as 401 authentication error
this.req.flushHeaders();

// We want a very low writable high watermark to prevent having many
// chunks stored in the writable queue of @_gzip and have them in
// @_stream instead. This is desirable because once @_gzip.flush() is
// called it will do all pending writes with that flush flag. This is
// not what we want though. If there are 100 items in the queue we want
// to write all of them with Z_NO_FLUSH and only afterwards do a
// Z_SYNC_FLUSH to maximize compression
this.gzip = zlib.createGzip({ writableHighWaterMark: 1024 });
this.gzip.on('error', setupFailed);
this.gzip.pipe(this.req);

// Only start piping if there has been no error after the header flush.
// Doing it immediately would potentially lose logs if it turned out that
// the server is unavailalbe because @_req stream would consume our
// passthrough buffer
this.timeout = setTimeout(() => {
if (this.gzip != null) {
this.setupFailures = 0;
this.stream.pipe(this.gzip);
setImmediate(this.flush);
}
}, RESPONSE_GRACE_PERIOD);
}, delay);
const setupFailed = () => {
this.setupFailures++;
this.teardown();
};

this.req = https.request(this.opts);

// Since we haven't sent the request body yet, and never will,the
// only reason for the server to prematurely respond is to
// communicate an error. So teardown the connection immediately
this.req.on('response', (res) => {
log.error(
'LogBackend: server responded with status code:',
res.statusCode,
);
setupFailed();
});

this.req.on('timeout', setupFailed);
this.req.on('close', setupFailed);
this.req.on('error', (err) => {
log.error('LogBackend: unexpected error:', err);
setupFailed();
});

// Immediately flush the headers. This gives a chance to the server to
// respond with potential errors such as 401 authentication error
this.req.flushHeaders();

// We want a very low writable high watermark to prevent having many
// chunks stored in the writable queue of @_gzip and have them in
// @_stream instead. This is desirable because once @_gzip.flush() is
// called it will do all pending writes with that flush flag. This is
// not what we want though. If there are 100 items in the queue we want
// to write all of them with Z_NO_FLUSH and only afterwards do a
// Z_SYNC_FLUSH to maximize compression
this.gzip = zlib.createGzip({ writableHighWaterMark: 1024 });
this.gzip.on('error', setupFailed);
this.gzip.pipe(this.req);

// Only start piping if there has been no error after the header flush.
// Doing it immediately would potentially lose logs if it turned out that
// the server is unavailalbe because @_req stream would consume our
// passthrough buffer
await setTimeout(RESPONSE_GRACE_PERIOD);

// a teardown could happen while we wait for the grace period so we check
// that gzip is still valid
if (this.gzip != null) {
this.setupFailures = 0;
this.stream.pipe(this.gzip);
setImmediate(this.flush);
}
}

private async setup() {
if (this.req != null) {
// If we are already setup, then do nothing
return;
}

// If the setup is in progress, let callers wait for the existing promise
if (this.setupPromise != null) {
return this.setupPromise;
}

// Store the setup promise in case there are concurrent calls to
// the setup
this.setupPromise = this.trySetup().finally(() => {
this.setupPromise = null;
});

return this.setupPromise;
}

private snooze = _.debounce(this.teardown, KEEPALIVE_TIMEOUT);
Expand All @@ -208,30 +221,43 @@ export class BalenaLogBackend extends LogBackend {

private teardown() {
if (this.req != null) {
clearTimeout(this.timeout);
this.req.removeAllListeners();
this.req.on('error', _.noop);
this.req.on('error', () => {
/* noop */
});
if (this.gzip != null) {
this.stream.unpipe(this.gzip);
this.gzip.end();
this.gzip = null;
}
this.req = null;
}
}

private write(message: LogMessage) {
private tryWrite(message: LogMessage) {
try {
this.writable = this.stream.write(JSON.stringify(message) + '\n');
this.flush();
} catch (e) {
log.error('Failed to write to logging stream, dropping message.', e);
}
}

private async write(message: LogMessage) {
this.snooze();
this.setup();

// Setup could terminate unsuccessfully, at which point
// the messages will get added to the stream until it fills
await this.setup();

if (this.writable) {
try {
this.writable = this.stream.write(JSON.stringify(message) + '\n');
this.flush();
} catch (e) {
log.error('Failed to write to logging stream, dropping message.', e);
}
this.tryWrite(message);
} else {
this.dropCount += 1;

// Yield execution to the event loop to avoid
// an aggressive logger to overwhelm the process
await setTimeout(0);
}
}
}
7 changes: 3 additions & 4 deletions src/logging/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async function* splitStream(chunkIterable: AsyncIterable<any>) {
for await (const chunk of chunkIterable) {
previous += chunk;
const lines = previous.split(/\r?\n/);
previous = lines.pop() || '';
previous = lines.pop() ?? '';
yield* lines;
}

Expand All @@ -68,14 +68,13 @@ class LogMonitor {
public async start(): Promise<void> {
try {
// TODO: do not spawn journalctl if logging is not enabled
const journalctl = spawnJournalctl({
const { stdout, stderr } = spawnJournalctl({
all: true,
follow: true,
format: 'json',
filterString: '_SYSTEMD_UNIT=balena.service',
since: toJournalDate(this.lastSentTimestamp),
});
const { stdout, stderr } = journalctl;
if (!stdout) {
// this will be catched below
throw new Error('failed to open process stream');
Expand All @@ -88,14 +87,14 @@ class LogMonitor {
const self = this;

await pipeline(stdout, splitStream, async function (lines) {
self.setupAttempts = 0;
for await (const line of lines) {
try {
const row = JSON.parse(line);
if (
row.CONTAINER_ID_FULL &&
self.containers[row.CONTAINER_ID_FULL]
) {
self.setupAttempts = 0;
await self.handleRow(row);
}
} catch {
Expand Down

0 comments on commit 8bc0875

Please sign in to comment.