Skip to content

Commit

Permalink
chore(framework): Unified resource loading and exclude non js/ts fil…
Browse files Browse the repository at this point in the history
…es (#11707)

* chore(framework): Unified resource loading and exclude non js/ts files

* chore(framework): Unified resource loading and exclude non js/ts files

* chore(framework): Unified resource loading and exclude non js/ts files

* chore(framework): Unified resource loading and exclude non js/ts files

* chore(framework): Unified resource loading and exclude non js/ts files

* Create six-parrots-shave.md
  • Loading branch information
adrien2p authored Mar 4, 2025
1 parent aabbbb7 commit 5d184ba
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 225 deletions.
5 changes: 5 additions & 0 deletions .changeset/six-parrots-shave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@medusajs/framework": patch
---

chore(framework): Unified resource loading and exclude non js/ts files
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { MedusaContainer } from "@medusajs/types"

export default async function handler(container: MedusaContainer) {
console.log(`You have received 5 orders today`)
}

export const config = {
name: "summarize-orders",
schedule: "* * * * * *",
numberOfExecutions: 2,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { MedusaContainer } from "@medusajs/types"

export default async function handler(container: MedusaContainer) {
console.log(`You have received 5 orders today`)
}

export const config = {
name: "summarize-orders",
schedule: "* * * * * *",
numberOfExecutions: 2,
}
20 changes: 15 additions & 5 deletions packages/core/framework/src/jobs/__tests__/register-jobs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import { JobLoader } from "../job-loader"
describe("register jobs", () => {
WorkflowScheduler.setStorage(new MockSchedulerStorage())

let jobLoader!: JobLoader

beforeAll(() => {
jobLoader = new JobLoader(join(__dirname, "../__fixtures__/plugin/jobs"))
afterEach(() => {
WorkflowManager.unregisterAll()
})

it("registers jobs from plugins", async () => {
it("should registers jobs from plugins", async () => {
const jobLoader: JobLoader = new JobLoader(
join(__dirname, "../__fixtures__/plugin/jobs")
)
await jobLoader.load()
const workflow = WorkflowManager.getWorkflow("job-summarize-orders")
expect(workflow).toBeDefined()
Expand All @@ -21,4 +22,13 @@ describe("register jobs", () => {
numberOfExecutions: 2,
})
})

it("should not load non js/ts files", async () => {
const jobLoader: JobLoader = new JobLoader(
join(__dirname, "../__fixtures__/plugin/jobs-with-other-files")
)
await jobLoader.load()
const workflow = WorkflowManager.getWorkflow("job-summarize-orders")
expect(workflow).toBeUndefined()
})
})
105 changes: 23 additions & 82 deletions packages/core/framework/src/jobs/job-loader.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
import type { SchedulerOptions } from "@medusajs/orchestration"
import { MedusaContainer } from "@medusajs/types"
import {
dynamicImport,
isObject,
MedusaError,
promiseAll,
readDirRecursive,
} from "@medusajs/utils"
import { isObject, MedusaError } from "@medusajs/utils"
import {
createStep,
createWorkflow,
StepResponse,
} from "@medusajs/workflows-sdk"
import { Dirent } from "fs"
import { access } from "fs/promises"
import { join } from "path"
import { logger } from "../logger"
import { ResourceLoader } from "../utils/resource-loader"

type CronJobConfig = {
name: string
Expand All @@ -25,27 +17,26 @@ type CronJobConfig = {

type CronJobHandler = (container: MedusaContainer) => Promise<any>

export class JobLoader {
/**
* The directory from which to load the jobs
* @private
*/
#sourceDir: string | string[]

/**
* The list of file names to exclude from the subscriber scan
* @private
*/
#excludes: RegExp[] = [
/index\.js/,
/index\.ts/,
/\.DS_Store/,
/(\.ts\.map|\.js\.map|\.d\.ts|\.md)/,
/^_[^/\\]*(\.[^/\\]+)?$/,
]
export class JobLoader extends ResourceLoader {
protected resourceName = "job"

constructor(sourceDir: string | string[]) {
this.#sourceDir = sourceDir
super(sourceDir)
}

protected async onFileLoaded(
path: string,
fileExports: {
default: CronJobHandler
config: CronJobConfig
}
) {
this.validateConfig(fileExports.config)
logger.debug(`Registering job from ${path}.`)
this.register({
config: fileExports.config,
handler: fileExports.default,
})
}

/**
Expand Down Expand Up @@ -85,7 +76,7 @@ export class JobLoader {
* @param handler
* @protected
*/
protected registerJob({
protected register({
config,
handler,
}: {
Expand Down Expand Up @@ -128,58 +119,8 @@ export class JobLoader {
* Load cron jobs from one or multiple source paths
*/
async load() {
const normalizedSourcePath = Array.isArray(this.#sourceDir)
? this.#sourceDir
: [this.#sourceDir]

const promises = normalizedSourcePath.map(async (sourcePath) => {
try {
await access(sourcePath)
} catch {
logger.info(`No job to load from ${sourcePath}. skipped.`)
return
}

return await readDirRecursive(sourcePath).then(async (entries) => {
const fileEntries = entries.filter((entry: Dirent) => {
return (
!entry.isDirectory() &&
!this.#excludes.some((exclude) => exclude.test(entry.name))
)
})

logger.debug(`Registering jobs from ${sourcePath}.`)

return await promiseAll(
fileEntries.map(async (entry: Dirent) => {
const fullPath = join(entry.path, entry.name)

const module_ = await dynamicImport(fullPath)

const input = {
config: module_.config,
handler: module_.default,
}

this.validateConfig(input.config)
return input
})
)
})
})

const jobsInputs = await promiseAll(promises)
const flatJobsInput = jobsInputs.flat(1).filter(
(
job
): job is {
config: CronJobConfig
handler: CronJobHandler
} => !!job
)

flatJobsInput.map(this.registerJob)
await super.discoverResources()

logger.debug(`Job registered.`)
logger.debug(`Jobs registered.`)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { SubscriberArgs, SubscriberConfig } from "../../types"

export default async function orderNotifier(\_: SubscriberArgs) {
return await Promise.resolve()
}

export const config: SubscriberConfig = {
event: ["order.placed", "order.canceled", "order.completed"],
context: { subscriberId: "order-notifier-md" },
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { SubscriberArgs, SubscriberConfig } from "../../types"

export default async function orderNotifier(_: SubscriberArgs) {
return await Promise.resolve()
}

export const config: SubscriberConfig = {
event: ["order.placed", "order.canceled", "order.completed"],
context: { subscriberId: "order-notifier-txt" },
}
110 changes: 27 additions & 83 deletions packages/core/framework/src/subscribers/subscriber-loader.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
import { Event, IEventBusModuleService, Subscriber } from "@medusajs/types"
import {
dynamicImport,
kebabCase,
Modules,
promiseAll,
readDirRecursive,
resolveExports,
} from "@medusajs/utils"
import { access } from "fs/promises"
import { join, parse } from "path"

import { Dirent } from "fs"
import { kebabCase, Modules } from "@medusajs/utils"
import { parse } from "path"

import { configManager } from "../config"
import { container } from "../container"
import { logger } from "../logger"
import { SubscriberArgs, SubscriberConfig } from "./types"
import { ResourceLoader } from "../utils/resource-loader"

type SubscriberHandler<T> = (args: SubscriberArgs<T>) => Promise<void>

Expand All @@ -23,31 +15,15 @@ type SubscriberModule<T> = {
handler: SubscriberHandler<T>
}

export class SubscriberLoader {
export class SubscriberLoader extends ResourceLoader {
protected resourceName = "subscriber"

/**
* The options of the plugin from which the subscribers are being loaded
* @private
*/
#pluginOptions: Record<string, unknown>

/**
* The base directory from which to scan for the subscribers
* @private
*/
#sourceDir: string | string[]

/**
* The list of file names to exclude from the subscriber scan
* @private
*/
#excludes: RegExp[] = [
/index\.js/,
/index\.ts/,
/\.DS_Store/,
/(\.ts\.map|\.js\.map|\.d\.ts|\.md)/,
/^_[^/\\]*(\.[^/\\]+)?$/,
]

/**
* Map of subscribers descriptors to consume in the loader
* @private
Expand All @@ -58,10 +34,28 @@ export class SubscriberLoader {
sourceDir: string | string[],
options: Record<string, unknown> = {}
) {
this.#sourceDir = sourceDir
super(sourceDir)
this.#pluginOptions = options
}

protected async onFileLoaded(
path: string,
fileExports: Record<string, unknown>
) {
const isValid = this.validateSubscriber(fileExports, path)

logger.debug(`Registering subscribers from ${path}.`)

if (!isValid) {
return
}

this.#subscriberDescriptors.set(path, {
config: fileExports.config,
handler: fileExports.default,
})
}

private validateSubscriber(
subscriber: any,
path: string
Expand Down Expand Up @@ -122,42 +116,6 @@ export class SubscriberLoader {
return true
}

private async createDescriptor(absolutePath: string) {
return await dynamicImport(absolutePath).then((module_) => {
module_ = resolveExports(module_)
const isValid = this.validateSubscriber(module_, absolutePath)

if (!isValid) {
return
}

this.#subscriberDescriptors.set(absolutePath, {
config: module_.config,
handler: module_.default,
})
})
}

private async createMap(dirPath: string) {
const promises = await readDirRecursive(dirPath).then(async (entries) => {
const fileEntries = entries.filter((entry) => {
return (
!entry.isDirectory() &&
!this.#excludes.some((exclude) => exclude.test(entry.name))
)
})

logger.debug(`Registering subscribers from ${dirPath}.`)

return fileEntries.flatMap(async (entry: Dirent) => {
const fullPath = join(entry.path, entry.name)
return await this.createDescriptor(fullPath)
})
})

await promiseAll(promises)
}

private inferIdentifier<T>(
fileName: string,
{ context }: SubscriberConfig,
Expand Down Expand Up @@ -222,21 +180,7 @@ export class SubscriberLoader {
}

async load() {
const normalizeSourcePaths = Array.isArray(this.#sourceDir)
? this.#sourceDir
: [this.#sourceDir]
const promises = normalizeSourcePaths.map(async (sourcePath) => {
try {
await access(sourcePath)
} catch {
logger.info(`No subscribers to load from ${sourcePath}. skipped.`)
return
}

return await this.createMap(sourcePath)
})

await promiseAll(promises)
await super.discoverResources()

for (const [
fileName,
Expand Down
Loading

0 comments on commit 5d184ba

Please sign in to comment.