Skip to content

Commit

Permalink
Merge tag '2024.5.0-io.5b' into bun
Browse files Browse the repository at this point in the history
  • Loading branch information
u1-liquid committed Dec 25, 2024
2 parents 1cde9b7 + 1c6829d commit bbf510b
Show file tree
Hide file tree
Showing 26 changed files with 139 additions and 106 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
DFLY_snapshot_cron: '* * * * *'
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
env:
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down Expand Up @@ -106,7 +106,7 @@ jobs:
env:
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down
4 changes: 2 additions & 2 deletions chart/templates/Deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ spec:
value: false
- name: DFLY_tcp_backlog
value: 2048
- name: DFLY_default_lua_flags
value: allow-undeclared-keys
- name: DFLY_lock_on_hashtags
value: true
- name: DFLY_pipeline_squash
value: 0
- name: DFLY_multi_exec_squash
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.local-db.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
DFLY_snapshot_cron: '* * * * *'
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down
2 changes: 1 addition & 1 deletion docker-compose_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
DFLY_snapshot_cron: '* * * * *'
DFLY_version_check: false
DFLY_tcp_backlog: 2048
DFLY_default_lua_flags: allow-undeclared-keys
DFLY_lock_on_hashtags: true
DFLY_pipeline_squash: 0
DFLY_multi_exec_squash: false
DFLY_conn_io_threads: 4
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "misskey",
"version": "2024.5.0-io.5",
"version": "2024.5.0-io.5b",
"codename": "nasubi",
"repository": {
"type": "git",
Expand Down
1 change: 1 addition & 0 deletions packages/backend/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type RedisOptionsSource = Partial<RedisOptions> & {
pass: string;
db?: number;
prefix?: string;
queueNameSuffix?: string;
};

/**
Expand Down
14 changes: 9 additions & 5 deletions packages/backend/src/core/QueueModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
import type { Config, RedisOptionsSource } from '@/config.js';
import { QUEUE, baseQueueOptions, formatQueueName } from '@/queue/const.js';
import { allSettled } from '@/misc/promise-tracker.js';
import { Queues } from '@/misc/queues.js';
import type { Provider } from '@nestjs/common';
Expand Down Expand Up @@ -36,13 +36,13 @@ const $endedPollNotification: Provider = {

const $deliver: Provider = {
provide: 'queue:deliver',
useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map(queueConfig => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER)))),
useFactory: (config: Config) => createQueues(QUEUE.DELIVER, config.redisForDeliverQueues, config.bullmqQueueOptions),
inject: [DI.config],
};

const $inbox: Provider = {
provide: 'queue:inbox',
useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map(queueConfig => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX)))),
useFactory: (config: Config) => createQueues(QUEUE.INBOX, config.redisForInboxQueues, config.bullmqQueueOptions),
inject: [DI.config],
};

Expand All @@ -54,7 +54,7 @@ const $db: Provider = {

const $relationship: Provider = {
provide: 'queue:relationship',
useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map(queueConfig => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)))),
useFactory: (config: Config) => createQueues(QUEUE.RELATIONSHIP, config.redisForRelationshipQueues, config.bullmqQueueOptions),
inject: [DI.config],
};

Expand All @@ -70,6 +70,10 @@ const $webhookDeliver: Provider = {
inject: [DI.config],
};

function createQueues(name: typeof QUEUE[keyof typeof QUEUE], config: RedisOptionsSource[], queueOptions: Partial<Bull.QueueOptions>): Queues {
return new Queues(config.map(queueConfig => new Bull.Queue(formatQueueName(queueConfig, name), baseQueueOptions(queueConfig, queueOptions, name))));
}

@Module({
imports: [
],
Expand Down
8 changes: 4 additions & 4 deletions packages/backend/src/queue/QueueProcessorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
import { QUEUE, baseWorkerOptions } from './const.js';
import { QUEUE, baseWorkerOptions, formatQueueName } from './const.js';

// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
function httpRelatedBackoff(attemptsMade: number) {
Expand Down Expand Up @@ -208,7 +208,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region deliver
this.deliverQueueWorkers = this.config.redisForDeliverQueues
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
.map(config => new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
.map(config => new Bull.Worker(formatQueueName(config, QUEUE.DELIVER), (job) => this.deliverProcessorService.process(job), {
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER),
autorun: false,
concurrency: this.config.deliverJobConcurrency ?? 128,
Expand Down Expand Up @@ -236,7 +236,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region inbox
this.inboxQueueWorkers = this.config.redisForInboxQueues
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
.map(config => new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), {
.map(config => new Bull.Worker(formatQueueName(config, QUEUE.INBOX), (job) => this.inboxProcessorService.process(job), {
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX),
autorun: false,
concurrency: this.config.inboxJobConcurrency ?? 16,
Expand Down Expand Up @@ -288,7 +288,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region relationship
this.relationshipQueueWorkers = this.config.redisForRelationshipQueues
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
.map(config => new Bull.Worker(QUEUE.RELATIONSHIP, (job) => {
.map(config => new Bull.Worker(formatQueueName(config, QUEUE.RELATIONSHIP), (job) => {
switch (job.name) {
case 'follow': return this.relationshipProcessorService.processFollow(job);
case 'unfollow': return this.relationshipProcessorService.processUnfollow(job);
Expand Down
10 changes: 8 additions & 2 deletions packages/backend/src/queue/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ export const QUEUE = {
WEBHOOK_DELIVER: 'webhookDeliver',
};

export function formatQueueName(config: RedisOptionsSource, queueName: typeof QUEUE[keyof typeof QUEUE]): string {
return typeof config.queueNameSuffix === 'string' ? `${queueName}-${config.queueNameSuffix}` : queueName;
}

export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueOptions: Partial<Bull.QueueOptions>, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {
const name = formatQueueName(config, queueName);
return {
...queueOptions,
connection: {
Expand All @@ -33,11 +38,12 @@ export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queu
return 1;
},
},
prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`,
prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`,
};
}

export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, workerOptions: Partial<Bull.WorkerOptions>, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions {
const name = formatQueueName(config, queueName);
return {
...workerOptions,
connection: {
Expand All @@ -52,6 +58,6 @@ export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, wor
return 1;
},
},
prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`,
prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`,
};
}
14 changes: 9 additions & 5 deletions packages/backend/src/server/api/stream/channels/antenna.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class AntennaChannel extends Channel {
public static readonly requireCredential = true as const;
public static readonly kind = 'read:account';
private antennaId: string;
private idOnly: boolean;
private minimize: boolean;

constructor(
private noteEntityService: NoteEntityService,
Expand All @@ -30,7 +30,7 @@ class AntennaChannel extends Channel {
@bindThis
public async init(params: any) {
this.antennaId = params.antennaId as string;
this.idOnly = params.idOnly ?? false;
this.minimize = params.minimize ?? false;

// Subscribe stream
this.subscriber.on(`antennaStream:${this.antennaId}`, this.onEvent);
Expand All @@ -51,9 +51,13 @@ class AntennaChannel extends Channel {

if (this.isNoteMutedOrBlocked(note)) return;

if (this.idOnly && ['public', 'home'].includes(note.visibility)) {
const idOnlyNote = { id: note.id };
this.send('note', idOnlyNote);
if (this.minimize && ['public', 'home'].includes(note.visibility)) {
this.send('note', {
id: note.id, myReaction: note.myReaction,
poll: note.poll ? { choices: note.poll.choices } : undefined,
reply: note.reply ? { myReaction: note.reply.myReaction } : undefined,
renote: note.renote ? { myReaction: note.renote.myReaction } : undefined,
});
} else {
this.connection.cacheNote(note);
this.send('note', note);
Expand Down
14 changes: 9 additions & 5 deletions packages/backend/src/server/api/stream/channels/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ChannelChannel extends Channel {
public static readonly shouldShare = false;
public static readonly requireCredential = false as const;
private channelId: string;
private idOnly: boolean;
private minimize: boolean;

constructor(
private noteEntityService: NoteEntityService,
Expand All @@ -30,7 +30,7 @@ class ChannelChannel extends Channel {
@bindThis
public async init(params: any) {
this.channelId = params.channelId as string;
this.idOnly = params.idOnly ?? false;
this.minimize = params.minimize ?? false;

// Subscribe stream
this.subscriber.on('notesStream', this.onNote);
Expand All @@ -57,9 +57,13 @@ class ChannelChannel extends Channel {
}
}

if (this.idOnly && ['public', 'home'].includes(note.visibility)) {
const idOnlyNote = { id: note.id };
this.send('note', idOnlyNote);
if (this.minimize && ['public', 'home'].includes(note.visibility)) {
this.send('note', {
id: note.id, myReaction: note.myReaction,
poll: note.poll ? { choices: note.poll.choices } : undefined,
reply: note.reply ? { myReaction: note.reply.myReaction } : undefined,
renote: note.renote ? { myReaction: note.renote.myReaction } : undefined,
});
} else {
this.connection.cacheNote(note);
this.send('note', note);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class GlobalTimelineChannel extends Channel {
public static readonly requireCredential = false as const;
private withRenotes: boolean;
private withFiles: boolean;
private idOnly: boolean;
private minimize: boolean;

constructor(
private metaService: MetaService,
Expand All @@ -39,7 +39,7 @@ class GlobalTimelineChannel extends Channel {

this.withRenotes = params.withRenotes ?? true;
this.withFiles = params.withFiles ?? false;
this.idOnly = params.idOnly ?? false;
this.minimize = params.minimize ?? false;

// Subscribe events
this.subscriber.on('notesStream', this.onNote);
Expand Down Expand Up @@ -87,9 +87,13 @@ class GlobalTimelineChannel extends Channel {
}
}

if (this.idOnly && ['public', 'home'].includes(note.visibility)) {
const idOnlyNote = { id: note.id };
this.send('note', idOnlyNote);
if (this.minimize && ['public', 'home'].includes(note.visibility)) {
this.send('note', {
id: note.id, myReaction: note.myReaction,
poll: note.poll ? { choices: note.poll.choices } : undefined,
reply: note.reply ? { myReaction: note.reply.myReaction } : undefined,
renote: note.renote ? { myReaction: note.renote.myReaction } : undefined,
});
} else {
this.connection.cacheNote(note);
this.send('note', note);
Expand Down
14 changes: 9 additions & 5 deletions packages/backend/src/server/api/stream/channels/home-timeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class HomeTimelineChannel extends Channel {
public static readonly kind = 'read:account';
private withRenotes: boolean;
private withFiles: boolean;
private idOnly: boolean;
private minimize: boolean;

constructor(
private noteEntityService: NoteEntityService,
Expand All @@ -33,7 +33,7 @@ class HomeTimelineChannel extends Channel {
public async init(params: any) {
this.withRenotes = params.withRenotes ?? true;
this.withFiles = params.withFiles ?? false;
this.idOnly = params.idOnly ?? false;
this.minimize = params.minimize ?? false;

this.subscriber.on('notesStream', this.onNote);
}
Expand Down Expand Up @@ -91,9 +91,13 @@ class HomeTimelineChannel extends Channel {
}
}

if (this.idOnly && ['public', 'home'].includes(note.visibility)) {
const idOnlyNote = { id: note.id };
this.send('note', idOnlyNote);
if (this.minimize && ['public', 'home'].includes(note.visibility)) {
this.send('note', {
id: note.id, myReaction: note.myReaction,
poll: note.poll ? { choices: note.poll.choices } : undefined,
reply: note.reply ? { myReaction: note.reply.myReaction } : undefined,
renote: note.renote ? { myReaction: note.renote.myReaction } : undefined,
});
} else {
this.connection.cacheNote(note);
this.send('note', note);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class HybridTimelineChannel extends Channel {
private withRenotes: boolean;
private withReplies: boolean;
private withFiles: boolean;
private idOnly: boolean;
private minimize: boolean;

constructor(
private metaService: MetaService,
Expand All @@ -42,7 +42,7 @@ class HybridTimelineChannel extends Channel {
this.withRenotes = params.withRenotes ?? true;
this.withReplies = params.withReplies ?? false;
this.withFiles = params.withFiles ?? false;
this.idOnly = params.idOnly ?? false;
this.minimize = params.minimize ?? false;

// Subscribe events
this.subscriber.on('notesStream', this.onNote);
Expand Down Expand Up @@ -105,9 +105,13 @@ class HybridTimelineChannel extends Channel {
}
}

if (this.idOnly && ['public', 'home'].includes(note.visibility)) {
const idOnlyNote = { id: note.id };
this.send('note', idOnlyNote);
if (this.minimize && ['public', 'home'].includes(note.visibility)) {
this.send('note', {
id: note.id, myReaction: note.myReaction,
poll: note.poll ? { choices: note.poll.choices } : undefined,
reply: note.reply ? { myReaction: note.reply.myReaction } : undefined,
renote: note.renote ? { myReaction: note.renote.myReaction } : undefined,
});
} else {
this.connection.cacheNote(note);
this.send('note', note);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class LocalTimelineChannel extends Channel {
private withRenotes: boolean;
private withReplies: boolean;
private withFiles: boolean;
private idOnly: boolean;
private minimize: boolean;

constructor(
private metaService: MetaService,
Expand All @@ -41,7 +41,7 @@ class LocalTimelineChannel extends Channel {
this.withRenotes = params.withRenotes ?? true;
this.withReplies = params.withReplies ?? false;
this.withFiles = params.withFiles ?? false;
this.idOnly = params.idOnly ?? false;
this.minimize = params.minimize ?? false;

// Subscribe events
this.subscriber.on('notesStream', this.onNote);
Expand Down Expand Up @@ -90,9 +90,13 @@ class LocalTimelineChannel extends Channel {
}
}

if (this.idOnly && ['public', 'home'].includes(note.visibility)) {
const idOnlyNote = { id: note.id };
this.send('note', idOnlyNote);
if (this.minimize && ['public', 'home'].includes(note.visibility)) {
this.send('note', {
id: note.id, myReaction: note.myReaction,
poll: note.poll ? { choices: note.poll.choices } : undefined,
reply: note.reply ? { myReaction: note.reply.myReaction } : undefined,
renote: note.renote ? { myReaction: note.renote.myReaction } : undefined,
});
} else {
this.connection.cacheNote(note);
this.send('note', note);
Expand Down
Loading

0 comments on commit bbf510b

Please sign in to comment.