Skip to content

Commit

Permalink
Revert "Merge pull request #86 from MisskeyIO/revert-bullmq"
Browse files Browse the repository at this point in the history
This reverts commit e8cf53a.

Revert "外部サーバーへの配送が行われない問題を修正 (たぶん) (#98)"

This reverts commit 53931ec.

Revert "fix(backend): restore start method in QueueProcessorService (#87)"

This reverts commit e88617e.
  • Loading branch information
riku6460 authored and u1-liquid committed Jul 22, 2023
1 parent eca41ca commit 757eae2
Show file tree
Hide file tree
Showing 41 changed files with 524 additions and 494 deletions.
2 changes: 1 addition & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
"autwh": "0.1.0",
"bcryptjs": "2.4.3",
"blurhash": "2.0.5",
"bull": "4.10.4",
"bullmq": "3.15.0",
"cacheable-lookup": "6.1.0",
"cbor": "9.0.0",
"chalk": "5.2.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/core/NoteCreateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ export class NoteCreateService implements OnApplicationShutdown {

if (data.poll && data.poll.expiresAt) {
const delay = data.poll.expiresAt.getTime() - Date.now();
this.queueService.endedPollNotificationQueue.add({
this.queueService.endedPollNotificationQueue.add(note.id, {
noteId: note.id,
}, {
delay,
Expand Down
53 changes: 11 additions & 42 deletions packages/backend/src/core/QueueModule.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,11 @@
import { setTimeout } from 'node:timers/promises';
import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
import Bull from 'bull';
import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
import type { Provider } from '@nestjs/common';
import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js';

function q<T>(config: Config, name: string, limitPerSec = -1) {
return new Bull<T>(name, {
redis: {
port: config.redisForJobQueue.port,
host: config.redisForJobQueue.host,
family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family,
password: config.redisForJobQueue.pass,
db: config.redisForJobQueue.db ?? 0,
},
prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
max: limitPerSec,
duration: 1000,
} : undefined,
settings: {
backoffStrategies: {
apBackoff,
},
},
});
}

// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
function apBackoff(attemptsMade: number, err: Error) {
const baseDelay = 60 * 1000; // 1min
const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
backoff = Math.min(backoff, maxBackoff);
backoff += Math.round(backoff * Math.random() * 0.2);
return backoff;
}
import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js';

export type SystemQueue = Bull.Queue<Record<string, unknown>>;
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
Expand All @@ -49,49 +18,49 @@ export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>;

const $system: Provider = {
provide: 'queue:system',
useFactory: (config: Config) => q(config, 'system'),
useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)),
inject: [DI.config],
};

const $endedPollNotification: Provider = {
provide: 'queue:endedPollNotification',
useFactory: (config: Config) => q(config, 'endedPollNotification'),
useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)),
inject: [DI.config],
};

const $deliver: Provider = {
provide: 'queue:deliver',
useFactory: (config: Config) => q(config, 'deliver', config.deliverJobPerSec ?? 128),
useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)),
inject: [DI.config],
};

const $inbox: Provider = {
provide: 'queue:inbox',
useFactory: (config: Config) => q(config, 'inbox', config.inboxJobPerSec ?? 16),
useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)),
inject: [DI.config],
};

const $db: Provider = {
provide: 'queue:db',
useFactory: (config: Config) => q(config, 'db'),
useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)),
inject: [DI.config],
};

const $relationship: Provider = {
provide: 'queue:relationship',
useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64),
useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)),
inject: [DI.config],
};

const $objectStorage: Provider = {
provide: 'queue:objectStorage',
useFactory: (config: Config) => q(config, 'objectStorage'),
useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)),
inject: [DI.config],
};

const $webhookDeliver: Provider = {
provide: 'queue:webhookDeliver',
useFactory: (config: Config) => q(config, 'webhookDeliver', 64),
useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)),
inject: [DI.config],
};

Expand Down
66 changes: 50 additions & 16 deletions packages/backend/src/core/QueueService.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Inject, Injectable } from '@nestjs/common';
import { v4 as uuid } from 'uuid';
import Bull from 'bull';
import type { IActivity } from '@/core/activitypub/type.js';
import type { DriveFile } from '@/models/entities/DriveFile.js';
import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js';
Expand All @@ -11,6 +10,7 @@ import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
import type { DbJobData, DeliverJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
import type httpSignature from '@peertube/http-signature';
import type * as Bull from 'bullmq';

@Injectable()
export class QueueService {
Expand All @@ -26,7 +26,43 @@ export class QueueService {
@Inject('queue:relationship') public relationshipQueue: RelationshipQueue,
@Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue,
@Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue,
) {}
) {
this.systemQueue.add('tickCharts', {
}, {
repeat: { pattern: '55 * * * *' },
removeOnComplete: true,
});

this.systemQueue.add('resyncCharts', {
}, {
repeat: { pattern: '0 0 * * *' },
removeOnComplete: true,
});

this.systemQueue.add('cleanCharts', {
}, {
repeat: { pattern: '0 0 * * *' },
removeOnComplete: true,
});

this.systemQueue.add('aggregateRetention', {
}, {
repeat: { pattern: '0 0 * * *' },
removeOnComplete: true,
});

this.systemQueue.add('clean', {
}, {
repeat: { pattern: '0 0 * * *' },
removeOnComplete: true,
});

this.systemQueue.add('checkExpiredMutings', {
}, {
repeat: { pattern: '*/5 * * * *' },
removeOnComplete: true,
});
}

@bindThis
public deliver(user: ThinUser, content: IActivity | null, to: string | null, isSharedInbox: boolean) {
Expand All @@ -42,11 +78,10 @@ export class QueueService {
isSharedInbox,
};

return this.deliverQueue.add(data, {
return this.deliverQueue.add(to, data, {
attempts: this.config.deliverJobMaxAttempts ?? 12,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: 'apBackoff',
type: 'custom',
},
removeOnComplete: true,
removeOnFail: true,
Expand Down Expand Up @@ -75,6 +110,7 @@ export class QueueService {
};

await this.deliverQueue.addBulk(Array.from(inboxes.entries()).map(d => ({
name: d[0],
data: {
user,
content,
Expand All @@ -94,11 +130,10 @@ export class QueueService {
signature,
};

return this.inboxQueue.add(data, {
return this.inboxQueue.add('', data, {
attempts: this.config.inboxJobMaxAttempts ?? 8,
timeout: 5 * 60 * 1000, // 5min
backoff: {
type: 'apBackoff',
type: 'custom',
},
removeOnComplete: true,
removeOnFail: true,
Expand Down Expand Up @@ -246,7 +281,7 @@ export class QueueService {
private generateToDbJobData<T extends 'importFollowingToDb' | 'importBlockingToDb', D extends DbJobData<T>>(name: T, data: D): {
name: string,
data: D,
opts: Bull.JobOptions,
opts: Bull.JobsOptions,
} {
return {
name,
Expand Down Expand Up @@ -333,10 +368,10 @@ export class QueueService {
}

@bindThis
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobOptions = {}): {
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobsOptions = {}): {
name: string,
data: RelationshipJobData,
opts: Bull.JobOptions,
opts: Bull.JobsOptions,
} {
return {
name,
Expand Down Expand Up @@ -385,11 +420,10 @@ export class QueueService {
eventId: uuid(),
};

return this.webhookDeliverQueue.add(data, {
return this.webhookDeliverQueue.add(webhook.id, data, {
attempts: 4,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: 'apBackoff',
type: 'custom',
},
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -401,11 +435,11 @@ export class QueueService {
this.deliverQueue.once('cleaned', (jobs, status) => {
//deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.deliverQueue.clean(0, 'delayed');
this.deliverQueue.clean(0, Infinity, 'delayed');

this.inboxQueue.once('cleaned', (jobs, status) => {
//inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.inboxQueue.clean(0, 'delayed');
this.inboxQueue.clean(0, Infinity, 'delayed');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { PollService } from '@/core/PollService.js';
import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { bindThis } from '@/decorators.js';
import { checkHttps } from '@/misc/check-https.js';
import { getOneApId, getApId, getOneApHrefNullable, validPost, isEmoji, getApType } from '../type.js';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports
import { ApLoggerService } from '../ApLoggerService.js';
Expand All @@ -32,7 +33,6 @@ import { ApQuestionService } from './ApQuestionService.js';
import { ApImageService } from './ApImageService.js';
import type { Resolver } from '../ApResolverService.js';
import type { IObject, IPost } from '../type.js';
import { checkHttps } from '@/misc/check-https.js';

@Injectable()
export class ApNoteService {
Expand Down
16 changes: 13 additions & 3 deletions packages/backend/src/daemons/QueueStatsService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { Injectable } from '@nestjs/common';
import { Inject, Injectable } from '@nestjs/common';
import Xev from 'xev';
import * as Bull from 'bullmq';
import { QueueService } from '@/core/QueueService.js';
import { bindThis } from '@/decorators.js';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
import type { OnApplicationShutdown } from '@nestjs/common';

const ev = new Xev();
Expand All @@ -13,6 +17,9 @@ export class QueueStatsService implements OnApplicationShutdown {
private intervalId: NodeJS.Timer;

constructor(
@Inject(DI.config)
private config: Config,

private queueService: QueueService,
) {
}
Expand All @@ -31,11 +38,14 @@ export class QueueStatsService implements OnApplicationShutdown {
let activeDeliverJobs = 0;
let activeInboxJobs = 0;

this.queueService.deliverQueue.on('global:active', () => {
const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER));
const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX));

deliverQueueEvents.on('active', () => {
activeDeliverJobs++;
});

this.queueService.inboxQueue.on('global:active', () => {
inboxQueueEvents.on('active', () => {
activeInboxJobs++;
});

Expand Down
15 changes: 8 additions & 7 deletions packages/backend/src/misc/prelude/time.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ const dateTimeIntervals = {
};

export function dateUTC(time: number[]): Date {
const d = time.length === 2 ? Date.UTC(time[0], time[1])
: time.length === 3 ? Date.UTC(time[0], time[1], time[2])
: time.length === 4 ? Date.UTC(time[0], time[1], time[2], time[3])
: time.length === 5 ? Date.UTC(time[0], time[1], time[2], time[3], time[4])
: time.length === 6 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5])
: time.length === 7 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5], time[6])
: null;
const d =
time.length === 2 ? Date.UTC(time[0], time[1])
: time.length === 3 ? Date.UTC(time[0], time[1], time[2])
: time.length === 4 ? Date.UTC(time[0], time[1], time[2], time[3])
: time.length === 5 ? Date.UTC(time[0], time[1], time[2], time[3], time[4])
: time.length === 6 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5])
: time.length === 7 ? Date.UTC(time[0], time[1], time[2], time[3], time[4], time[5], time[6])
: null;

if (!d) throw new Error('wrong number of arguments');

Expand Down
Loading

0 comments on commit 757eae2

Please sign in to comment.