Skip to content

Commit

Permalink
mostly graceful resuming of missed _notifications (when dmt-proc is o…
Browse files Browse the repository at this point in the history
…ffline) with deduplication
  • Loading branch information
davidhq committed Feb 4, 2025
1 parent c7619de commit 2c14c89
Show file tree
Hide file tree
Showing 33 changed files with 805 additions and 514 deletions.
2 changes: 1 addition & 1 deletion .version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2.222 · 2025-01-20
1.2.231 · 2025-02-03
1 change: 0 additions & 1 deletion core/node/common/lib/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ class Logger {
if (this.buffer.length > 0) {
const prev = this.buffer[this.buffer.length - 1];
const diff = meta.timestamp - prev.meta.timestamp;

diffStr = ` (+${formatMilliseconds(diff)})`;

if (diff > 1000) {
Expand Down
2 changes: 2 additions & 0 deletions core/node/common/lib/timeutils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export const ONE_MINUTE = 60 * ONE_SECOND;
export const ONE_HOUR = 60 * ONE_MINUTE;
export const ONE_DAY = 24 * ONE_HOUR;
export const ONE_WEEK = 7 * ONE_DAY;
export const ONE_MONTH = 30 * ONE_DAY;
export const ONE_YEAR = 365 * ONE_DAY;

export function formatSeconds(seconds) {
return formatMilliseconds(1000 * seconds);
Expand Down

This file was deleted.

This file was deleted.

1 change: 1 addition & 0 deletions core/node/controller/processes/dmt-proc.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mids.push({ gui: { condition: deviceDef => deviceDef.try('service[gui].disable')
mids.push('nearby/lanbus');
mids.push('nearby/nearby');
mids.push('iot');
mids.push('notify');

mids.push('content/samba');

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { log, dateFns, timeutils, loop, colors, isMainDevice, isMainServer, isDevUser } from 'dmt/common';

const { ONE_SECOND, ONE_MINUTE, ONE_HOUR, ONE_DAY } = timeutils;

const { format } = dateFns;
const INITIAL_DELAY = 3000;

const MAX_LOOKBACK = ONE_DAY;

export default function init(program) {
if (!isMainServer()) return;

setTimeout(() => {
const aliveTimestamp = program.slot('aliveTimestamp').get();

if (aliveTimestamp && JSON.stringify(aliveTimestamp) != '{}') {
const aliveDate = new Date(aliveTimestamp);

log.green(
`${colors.cyan('dmt-proc')} started after being offline for ${colors.cyan(timeutils.prettyTime(aliveTimestamp))} ${colors.gray(
`(since ${aliveDate.toISOString()})`
)}`
);

aliveDate.setSeconds(0);
aliveDate.setMilliseconds(0);

const _now = new Date();
_now.setSeconds(0);
_now.setMilliseconds(0);

const now = _now.getTime();

let timepoint = aliveDate.getTime();

if (now - aliveDate.getTime() > MAX_LOOKBACK) {
timepoint = now - MAX_LOOKBACK;
}

while (timepoint < now) {
if (isDevUser()) {
log.red(`Checking delayed messages at ${colors.white(format(timepoint, 'H:mm:ss'))} ...`);
}
program.simulateNotifiersTimepoint(timepoint);
timepoint += ONE_MINUTE;
}
}
setTimeout(() => {
loop(() => {
if (!program.isStopping()) {
program.slot('aliveTimestamp', { announce: false }).set(Date.now());
}
}, 2000);
}, 10 * ONE_SECOND);
}, INITIAL_DELAY);
}
13 changes: 7 additions & 6 deletions core/node/controller/program/boot/setupGlobalErrorHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ function terminateProgram(err, reason, program) {
log.red(err);
const msg = `${reason}: ${err.stack || err}`;

program.stopping();

if (!terminationInProgress) {
terminationInProgress = true;

Expand All @@ -34,23 +36,22 @@ function terminateProgram(err, reason, program) {
}
}

function reportStopping(program) {
program.sendABC({ message: 'stopping' });
}

export default function setupGlobalErrorHandler(program) {
process.on('uncaughtException', (err, origin) => terminateProgram(err, origin, program));

process.on('SIGTERM', signal => {
reportStopping(program);
program.stopping({ notifyABC: true });
log.yellow(`Process received a ${signal} signal (usually because of normal stop/restart)`);

process.exit(0);
});

process.on('SIGINT', signal => {
reportStopping(program);
program.stopping({ notifyABC: true });
log.yellow(`Process has been interrupted: ${signal}`);
setTimeout(() => {
log.gray('If exiting is taking a while, it is possibly because of connectivity issues and we are waiting for dmt connections to close');
}, 1000);

if (log.isProfiling()) {
log.green('Memory usage:');
Expand Down
26 changes: 23 additions & 3 deletions core/node/controller/program/program.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import EventEmitter from 'events';

import * as dmt from 'dmt/common';
const { log, colors, colors2, dateFns } = dmt;

const { format, addMinutes, isBefore } = dateFns;
const { log, colors, colors2 } = dmt;

import { desktop, apn, push } from 'dmt/notify';

Expand Down Expand Up @@ -32,6 +30,7 @@ import { createStore, getStore } from './createStore/createStore.js';

import setupGlobalErrorHandler from './boot/setupGlobalErrorHandler.js';
import loadMiddleware from './boot/loadMiddleware.js';
import retrogradePushMessagesRecovery from './boot/retrogradePushMessagesRecovery.js';
import osUptime from './interval/osUptime.js';

import generateKeypair from './generateKeypair.js';
Expand Down Expand Up @@ -229,6 +228,8 @@ class Program extends EventEmitter {
this.on('ready', () => {
log.green(`✓ ${colors.cyan('dmt-proc')} ${colors.bold().white(`v${dmt.dmtVersion()}`)} ${colors.cyan('ready')}`);

retrogradePushMessagesRecovery(this);

startTicker(this);

const debugInstructions = dmt.debugMode()
Expand Down Expand Up @@ -280,6 +281,13 @@ class Program extends EventEmitter {

registerNotifier(notifier) {
this.notifiers.push(notifier);
return notifier;
}

simulateNotifiersTimepoint(timepoint, minutesLate) {
for (const n of this.notifiers) {
n.simulateTimepoint?.(timepoint, minutesLate);
}
}

decommissionNotifiers() {
Expand All @@ -290,6 +298,18 @@ class Program extends EventEmitter {
this.notifiers = [];
}

stopping({ notifyABC = false } = {}) {
this._stopping = true;

if (notifyABC) {
this.sendABC({ message: 'stopping' });
}
}

isStopping() {
return this._stopping;
}

isHub() {
if (!dmt.apMode()) {
const definedIp = this.device.try('network.ip');
Expand Down
8 changes: 6 additions & 2 deletions core/node/iot/lib/onOffMonitor/index.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import { log } from 'dmt/common';
import { log, timeutils } from 'dmt/common';
import { push } from 'dmt/notify';

const { ONE_DAY } = timeutils;

import { PowerMonitor, powerLog } from '../powerline/index.js';

function notify({ msg, program, onlyAdmin, pushoverApp, pushoverUser, pushoverUsers }) {
let users;
if (pushoverUser || pushoverUsers) {
users = (pushoverUser || pushoverUsers).split(',').map(u => u.trim());
}

if (program.isHub()) {
if (onlyAdmin) {
push
.optionalApp(pushoverApp)
.user(users)
.ttl(2 * ONE_DAY)
.omitDeviceName()
.notify(msg);
} else {
push
.optionalApp(pushoverApp)
.ttl(2 * ONE_DAY)
.user(users)
.omitDeviceName()
.notifyAll(msg);
Expand Down
34 changes: 28 additions & 6 deletions core/node/notify/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { log, isMainServer, everyHour, timeutils } from 'dmt/common';
const { ONE_MINUTE, ONE_DAY } = timeutils;

import {
app,
optionalApp,
Expand All @@ -12,8 +15,10 @@ import {
user,
userKey,
bigMessage,
ttl,
notify,
notifyAll
notifyAll,
store
} from './lib/pushover/index.js';
import * as apn from './lib/apn.js';
import * as desktop from './lib/desktop.js';
Expand All @@ -37,10 +42,10 @@ function initABC(networkId) {
const _push = {
app: appName => app({ isABC, abcNetworkID }, appName),
optionalApp: appName => optionalApp({ isABC, abcNetworkID }, appName),
group: groupName => group({ isABC, abcNetworkID }, groupName),
user: _user => user({ isABC, abcNetworkID }, _user),
users: _user => user({ isABC, abcNetworkID }, _user),
userKey: _userKey => userKey({ isABC, abcNetworkID }, _userKey),
group: (...groups) => group({ isABC, abcNetworkID }, groups.length === 1 ? groups[0] : groups),
user: (...users) => user({ isABC, abcNetworkID }, users.length === 1 ? users[0] : users),
users: (...users) => user({ isABC, abcNetworkID }, users.length === 1 ? users[0] : users),
userKey: (...keys) => userKey({ isABC, abcNetworkID }, keys.length === 1 ? keys[0] : keys),
title: _title => title({ isABC, abcNetworkID }, _title),
omitAppName: () => omitAppName(),
omitDeviceName: () => omitDeviceName({ isABC, abcNetworkID }),
Expand All @@ -49,10 +54,27 @@ const _push = {
highPriority: (high = true) => highPriority({ isABC, abcNetworkID }, high),
enableHtml: (enable = true) => enableHtml({ isABC, abcNetworkID }, enable),
bigMessage: () => bigMessage({ isABC, abcNetworkID }),
ttl: _ttl => ttl({ isABC, abcNetworkID }, _ttl),
notify: (...options) => notify({ isABC, abcNetworkID }, ...options),
notifyAll: (...options) => notifyAll({ isABC, abcNetworkID }, ...options),
notifyRaw,
initABC
};

export { _push as push, apn, desktop, notifier, dailyNotifier, weeklyNotifier, dateNotifier, holidayNotifier, trashTakeoutNotifier };
function init(program) {
if (isMainServer()) {
const slot = store.slot('pushMessages');
log.cyan('✉️ Deduplicating push messages on main server');

setTimeout(
() =>
everyHour(() => {
const now = Date.now();
slot.removeArrayElements(({ timestamp }) => now - timestamp >= ONE_DAY);
}),
10 * ONE_MINUTE
);
}
}

export { init, _push as push, apn, desktop, notifier, dailyNotifier, weeklyNotifier, dateNotifier, holidayNotifier, trashTakeoutNotifier };
15 changes: 15 additions & 0 deletions core/node/notify/lib/pushover/dedupStore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import path from 'path';

import { dmtStateDir } from 'dmt/common';
import { SyncStore } from 'dmt/connectome-stores';

const store = new SyncStore(
{},
{
stateFilePath: path.join(dmtStateDir, 'push_messages_dedup.json')
}
);

store.slot('pushMessages').makeArray();

export { store };
9 changes: 9 additions & 0 deletions core/node/notify/lib/pushover/getDedupKey.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { dateFns } from 'dmt/common';

const { format } = dateFns;

export default function getDedupKey(dateTime) {
const dt = new Date(dateTime.getTime());
dt.setSeconds(0);
return format(dt, 'yyyy-MM-dd H:mm');
}
14 changes: 13 additions & 1 deletion core/node/notify/lib/pushover/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import MessageSender from './messageSender.js';
import { store } from './dedupStore.js';

function notify(constructorOptions, ...options) {
return new MessageSender(constructorOptions).notify(...options);
Expand Down Expand Up @@ -52,6 +53,10 @@ function omitAppName(constructorOptions) {
return new MessageSender(constructorOptions).omitAppName();
}

function ttl(constructorOptions) {
return new MessageSender(constructorOptions).ttl();
}

function bigMessage(constructorOptions) {
return new MessageSender(constructorOptions).bigMessage();
}
Expand All @@ -64,6 +69,10 @@ function enableHtml(constructorOptions, enable) {
return new MessageSender(constructorOptions).enableHtml(enable);
}

function dedup(constructorOptions, key) {
return new MessageSender(constructorOptions).dedup(key);
}

export {
notify,
notifyAll,
Expand All @@ -78,7 +87,10 @@ export {
highPriority,
enableHtml,
bigMessage,
ttl,
network,
url,
urlTitle
urlTitle,
dedup,
store
};
Loading

0 comments on commit 2c14c89

Please sign in to comment.