diff --git a/experimental/bench.ts b/experimental/bench.ts new file mode 100644 index 0000000..f360910 --- /dev/null +++ b/experimental/bench.ts @@ -0,0 +1,32 @@ +import { bench, boxplot, group, run, summary } from "mitata"; +import { compose } from "./fixpoint.ts"; +import { aaa } from "./functions.ts"; + +const EMPTYUI8 = new Uint8Array([1, 2, 3]); + +const f = aaa.f; + +const { termminate, resolver } = compose({ + threads: 1, +})({ + aaa, +}); + +boxplot(async () => { + group("5", () => { + summary(() => { + bench("main * 5", async () => { + await f(EMPTYUI8), await f(EMPTYUI8); + }); + + bench(" 5 thread ", async () => { + await resolver.aaa(EMPTYUI8), await resolver.aaa(EMPTYUI8); + }); + }); + }); +}); +await run(); + +await resolver.aaa(new Uint8Array([5, 3, 2, 1, 0])).then(console.log); + +termminate(); diff --git a/experimental/fixpoint.ts b/experimental/fixpoint.ts new file mode 100644 index 0000000..f38ab29 --- /dev/null +++ b/experimental/fixpoint.ts @@ -0,0 +1,189 @@ +import { getCallerFile } from "./helpers.ts"; +import { genTaskID } from "./helpers.ts"; +import { createContext } from "./main.ts"; +import type { PromiseMap } from "./mainQueue.ts"; + +type Args = "void" | "uint8"; +const symbol = Symbol.for("FIXEDPOINT"); + +type FixPoint = { + args: A; + f: ( + args: A extends "void" ? void : Uint8Array, + ) => Promise; +}; + +type SecondPart = { + statusSignal: 224 | 192; + [symbol]: string; + id: number; + importedFrom: string; +}; + +type Composed = { + args: Args; + f: Function; +} & SecondPart; + +type ReturnFixed = FixPoint & SecondPart; + +export const fixedPoint = ( + I: FixPoint, +): ReturnFixed => { + return ({ + ...I, + statusSignal: I.args === "void" ? 224 : 192, + id: genTaskID(), + importedFrom: new URL(getCallerFile(2)).href, + [symbol]: "vixeny", + }); +}; + +type UnionReturnFixed = ReturnFixed; + +type FunctionMapType> = { + [K in keyof T]: T[K]["f"]; +}; + +export type GetFunctions = ReturnType; + +export const getFunctions = async ({ list, ids }: { + list: string[]; + isWorker: boolean; + ids: number[]; +}) => { + const results = await Promise.all( + list.map(async (imports) => { + const module = await import(imports); + return Object.entries(module) // Use `Object.entries` to include names + .filter( + ([_, value]): value is ReturnFixed => + typeof value === "object" && + value !== null && + !Array.isArray(value) && + Object.getOwnPropertySymbols(value).some( + (sym) => sym === Symbol.for("FIXEDPOINT"), + ), + ) + .map(([name, value]) => ({ + //@ts-ignore trust me + ...value, + name, + })); + }), + ); + + // Flatten the results, filter by IDs, and sort + const flattenedResults = results + .flat() + .filter((obj) => ids.includes(obj.id)) + .sort((a, b) => a.name.localeCompare(b.name)); + + return flattenedResults as unknown as (UnionReturnFixed & { name: string })[]; +}; + +const toListAndIds = (args: Record, filter?: string) => { + const result = Object.values(args) + .reduce( + (acc, v) => ( + acc[0].add(v.importedFrom), acc[1].add(v.id), acc + ), + [ + new Set(), + new Set(), + ] as [ + Set, + Set, + ], + ); + + if (filter) { + console.log(filter); + result[0].delete(filter); + console.log(result); + } + + return Object.fromEntries([ + ["list", [...result[0]]], + ["ids", [...result[1]]], + ]) as { + list: string[]; + ids: number[]; + }; +}; + +const loopingBetweenThreads = + ((n) => (functions: Function[]) => (max: number) => (args: any) => + n === max ? functions[n = 0](args) : functions[n++](args))(0); + +export const compose = ({ + threads, +}: { + threads?: number; +}) => +>(args: T) => { + const promisesMap: PromiseMap = new Map(); + + const { list, ids } = toListAndIds(args); + + const listOfFunctions = Object.entries(args).map(([k, v]) => ({ + ...v, + name: k, + })) + .sort((a, b) => a.name.localeCompare(b.name)); + + const workers = Array.from({ + length: threads ?? 1, + }) + .map((_) => + createContext({ + promisesMap, + list, + ids, + }) + ); + + const map = workers.map( + (worker) => { + return listOfFunctions + .map((list, index) => ({ ...list, index })) + .reduce((acc, v) => { + { + acc.set( + v.name, + worker.resolver({ + queue: worker.queue, + fnNumber: v.index, + statusSignal: v.statusSignal, + }), + ); + } + return acc; + }, new Map>()); + }, + ) + .reduce((acc, map) => { + map.forEach( + (v, k) => { + const fun = acc.get(k); + if (fun) { + acc.set(k, [...fun, v]); + } else { + acc.set(k, [v]); + } + }, + ); + + return acc; + }, new Map()); + + const resolve = new Map Promise>(); + map.forEach((v, k) => { + resolve.set(k, loopingBetweenThreads(v)(v.length)); + }); + + return { + termminate: () => workers.forEach((worker) => worker.kills()), + resolver: Object.fromEntries(resolve) as unknown as FunctionMapType, + }; +}; diff --git a/experimental/functions.ts b/experimental/functions.ts new file mode 100644 index 0000000..8948f73 --- /dev/null +++ b/experimental/functions.ts @@ -0,0 +1,27 @@ +import { fixedPoint } from "./fixpoint.ts"; + +export const aaa = fixedPoint({ + args: "uint8", + f: async (arr) => { + // Simulate an expensive operation + + let time = 100000; + + while (time !== 0) { + performance.now(); + time--; + } + + return Array.from(arr).map((num) => num * 2); + }, +}); + +export const bbb = fixedPoint({ + args: "uint8", + f: async (arr) => new Uint8Array([2]), +}); + +export const ccc = fixedPoint({ + args: "uint8", + f: async (arr) => new Uint8Array([3]), +}); diff --git a/experimental/helpers.ts b/experimental/helpers.ts index 3908877..33f5c89 100644 --- a/experimental/helpers.ts +++ b/experimental/helpers.ts @@ -7,76 +7,43 @@ type StatusSignalForMessage = 192; export type StatusSignal = StatusSignalForVoid | StatusSignalForMessage; // Generate unique task IDs. -export const genTaskID = - ((counter = new Int32Array([0])) => () => (counter[0] += 1))(); +export const genTaskID = ((counter: number) => () => counter++)(0); // Get the current file's path. export const currentPath = () => new URL(import.meta.url); -// Write a response to a buffer. -export const writeResponse = (buffer: Uint8Array) => (msg: Uint8Array) => { - buffer.fill(0); - buffer.set(msg, 0); - buffer[msg.length] = 10; // Terminator -}; - -// Manage shared buffers for communication. -export const setArrayBuffers = { - sab: (size = 1024) => new SharedArrayBuffer(size), - - status: (sab: SharedArrayBuffer) => new Uint8Array(sab, 0, 2), - - id: (sab: SharedArrayBuffer) => new Int32Array(sab, 4, 1), - - payload: (sab: SharedArrayBuffer) => new Uint8Array(sab, 8), -}; - // Read a message from a Uint8Array. -export const readMessageToUint = ({ payload }: SignalArguments) => () => { - const terminatorIndex = payload.lastIndexOf(10); - return payload.slice(0, terminatorIndex); -}; +export const readMessageToUint = + ({ payload, payloadLenght }: SignalArguments) => () => + payload.slice(0, payloadLenght[0].valueOf()); // Write a Uint8Array message with task metadata. export const writeUintMessage = - ({ id, payload }: SignalArguments) => (task: QueueList) => { - payload.fill(0); - // If it's not null - if (task[6] !== null) { - payload.set(task[6], 0); - payload[task[6].length] = 10; // Terminator - } else { - payload[0] = 10; - } - - id[0] = task[3]; // Task ID + ({ id, payload, payloadLenght }: SignalArguments) => (task: QueueList) => { + payload.set(task[6], 0); + payloadLenght[0] = task[6].length; + id[0] = task[3]; }; export const sendUintMessage = - ({ id, payload }: SignalArguments) => (task: MainList) => { - payload.fill(0); - // If it's not null - if (task[5] !== null) { - payload.set(task[5], 0); - payload[task[5].length] = 10; // Terminator - } else { - payload[0] = 10; - } - + ({ id, payload, payloadLenght }: SignalArguments) => (task: MainList) => { + payload.set(task[3], 0); + payloadLenght[0] = task[3].length; id[0] = task[2]; }; -export const optimalOrder = (n: number) => { - const a = Array.from( - { length: (n * 2) }, - (_, i) => i % 2 == 1, - ); - a[a.length - 1] = true; - if (n > 3) { - a[a.length - 2] = true; +export const getCallerFile = (n: number) => { + const originalStackTrace = Error.prepareStackTrace; + Error.prepareStackTrace = (_, stack) => stack; + const err = new Error(); + const stack = err.stack as unknown as NodeJS.CallSite[]; + Error.prepareStackTrace = originalStackTrace; + // Get the caller file + const caller = stack[n]?.getFileName(); + + if (!caller) { + throw new Error("Unable to determine caller file."); } - return ((n: number) => (m: number) => () => n === m ? a[n = 0] : a[n++])(0)( - (n * 2) - 1, - ); + return caller; }; diff --git a/experimental/main.ts b/experimental/main.ts index 3bbb677..2c8e256 100644 --- a/experimental/main.ts +++ b/experimental/main.ts @@ -1,19 +1,18 @@ // main.ts import { Worker } from "node:worker_threads"; -import { bench, boxplot, group, run, summary } from "mitata"; import { multi, type MultiQueue, type PromiseMap } from "./mainQueue.ts"; import { genTaskID, readMessageToUint, sendUintMessage } from "./helpers.ts"; - import { mainSignal, signalsForWorker } from "./signal.ts"; - import { checker } from "./checker.ts"; -const promisesMap: PromiseMap = new Map(); - -const createContext = ({ +export const createContext = ({ promisesMap, + list, + ids, }: { promisesMap: PromiseMap; + list: string[]; + ids: number[]; }) => { const currentPath = import.meta.url; const workerUrl = new URL(currentPath.replace("main.ts", "worker.ts")); @@ -23,6 +22,7 @@ const createContext = ({ const writer = sendUintMessage(signals); const reader = readMessageToUint(signals); + const queue = multi({ writer, signalBox, @@ -30,6 +30,7 @@ const createContext = ({ genTaskID, promisesMap, }); + const check = checker({ signalBox, queue, @@ -37,180 +38,44 @@ const createContext = ({ const worker = new Worker(workerUrl, { type: "module", - workerData: { sab: signals.sab }, + workerData: { + sab: signals.sab, + list, + ids, + }, }); - const isActive = (status: Uint8Array) => + const isActive = ((status: Uint8Array) => () => status[0] === 255 ? ( // Skips one cycle status[0] = 254, queueMicrotask(check) ) - : undefined; + : undefined)(signals.status); + + type Resolver = { + queue: MultiQueue; + fnNumber: number; + statusSignal: 224 | 192; + max?: number; + }; const resolver = (args: Resolver) => { - const { queue, status, fnNumber, statusSignal } = args; + const { queue, fnNumber, statusSignal } = args; const adds = queue.add(statusSignal)(fnNumber); - return async () => ( - isActive(status), + return async (args: Uint8Array) => ( + isActive(), queue.awaits( - adds(null), + adds(args), ) ); }; return { - awaits: (ar: number) => (queue.awaits(ar)), - adds: (args: Uint8Array | null) => { - isActive(signals.status); - return queue.add(224)(0)(args); - }, - addsResolve: resolver({ - //@ts-ignore - queue, - status: signals.status, - fnNumber: 0, - statusSignal: 224, - }), + queue, + resolver, awaitArray: queue.awaitArray, kills: () => worker.terminate(), }; }; - -const decoder = new TextEncoder(); - -const f = async () => { - let sum = 0; - - // Increase or decrease the loop count for more or less work - const iterations = 10000; - - for (let i = 0; i < iterations; i++) { - sum += performance.now(); - } - - return decoder.encode(sum.toString()); -}; - -type Resolver = { - queue: MultiQueue; - fnNumber: number; - status: Uint8Array; - statusSignal: 224; - max?: number; -}; - -const context1 = createContext({ promisesMap }); -const context2 = createContext({ promisesMap }); -const context3 = createContext({ promisesMap }); -const context4 = createContext({ promisesMap }); -const context5 = createContext({ promisesMap }); - -boxplot(async () => { - group("1", () => { - summary(() => { - bench(" 1 thread ", async () => { - await context1.awaitArray([ - context1.adds(null), - ]); - }); - - bench(" main * 1", async () => { - await f(); - }); - }); - }); - - group("2", () => { - summary(() => { - bench(" 2 thread ", async () => { - await context1.awaitArray([ - context1.adds(null), - context2.adds(null), - ]); - }); - - bench(" main * 2", async () => { - await Promise.all([ - f(), - f(), - ]); - }); - }); - }); - - group("3", () => { - summary(() => { - bench(" 3 thread ", async () => { - await context1.awaitArray([ - context1.adds(null), - context2.adds(null), - context3.adds(null), - ]); - }); - - bench(" main * 3", async () => { - await Promise.all([ - f(), - f(), - f(), - ]); - }); - }); - }); -}); - -group("4", () => { - summary(() => { - bench(" 4 thread ", async () => { - await context1.awaitArray([ - context1.adds(null), - context2.adds(null), - context3.adds(null), - context4.adds(null), - ]); - }); - - bench("main * 4", async () => { - await Promise.all([ - f(), - f(), - f(), - f(), - ]); - }); - }); -}); - -group("5", () => { - summary(() => { - bench(" 5 thread ", async () => { - await context1.awaitArray([ - context1.adds(null), - context2.adds(null), - context3.adds(null), - context4.adds(null), - context5.adds(null), - ]); - }); - - bench("main * 5", async () => { - await Promise.all([ - f(), - f(), - f(), - f(), - f(), - ]); - }); - }); -}); - -await run(); -console.log(genTaskID()); -context1.kills(); -context2.kills(); -context3.kills(); -context4.kills(); -context5.kills(); diff --git a/experimental/mainQueue.ts b/experimental/mainQueue.ts index 0be3a1c..24de666 100644 --- a/experimental/mainQueue.ts +++ b/experimental/mainQueue.ts @@ -6,9 +6,9 @@ import { type MainSignal } from "./signal.ts"; // Task ID is a unique number representing a task. type TaskID = number; // RawArguments are optional arguments in the form of a Uint8Array. -type RawArguments = Uint8Array | null; +type RawArguments = Uint8Array; // WorkerResponse is the result of a task, represented as a Uint8Array. -type WorkerResponse = Uint8Array | null; +type WorkerResponse = Uint8Array; // FunctionID represents a unique identifier for a function to execute. type FunctionID = number; // Boolean flags for task state. @@ -69,7 +69,17 @@ export const multi = ( ) => { const queue = Array.from( { length: max ?? 10 }, - () => [true, false, 0, null, 0, new Uint8Array(), true, 224] as MainList, + () => + [ + true, + false, + 0, + new Uint8Array(), + 0, + new Uint8Array(), + true, + 224, + ] as MainList, ); const freeSlotOp = Array.from( @@ -99,6 +109,7 @@ export const multi = ( (rawArguments: RawArguments) => { const freeIndex = freeSlotOp.indexOf(true); const taskID = genTaskID(); + if (freeIndex === -1) { throw "No free slots! isBusyFailed uwu"; } @@ -155,7 +166,9 @@ export const multi = ( console.log(queue); throw "xd somethin whent wrong in sendNextToWorker"; } + writer(queue[idx]); + signalBox.setFunctionSignal(queue[idx][4]); signalBox.setSignal(queue[idx][7]); }, diff --git a/experimental/signal.ts b/experimental/signal.ts index 83bab73..2729a31 100644 --- a/experimental/signal.ts +++ b/experimental/signal.ts @@ -1,7 +1,7 @@ export type SignalArguments = ReturnType; export type MainSignal = ReturnType; -type StatusSignalForVoid = 224; +type StatusSignalForVoid = 224 | 192; export type StatusSignal = StatusSignalForVoid; type Sab = { @@ -11,13 +11,14 @@ type Sab = { export const signalsForWorker = (args?: Sab) => { const sab = args?.sharedSab ? args.sharedSab - : new SharedArrayBuffer(args?.size ?? 1024); + : new SharedArrayBuffer(args?.size ?? 4096); return { sab, status: new Uint8Array(sab, 0, 2), id: new Int32Array(sab, 4, 1), - payload: new Uint8Array(sab, 8), + payloadLenght: new Int32Array(sab, 8, 1), + payload: new Uint8Array(sab, 12), }; }; @@ -29,7 +30,7 @@ export const mainSignal = ({ status, id }: SignalArguments) => { updateLastSignal: () => (lastSignal = status[0]), send: (): 192 => (status[0] = lastSignal = 192), setSignal: (signal: StatusSignal) => (status[0] = signal), - setFunctionSignal: (signal: number) => (status[0] = signal), + setFunctionSignal: (signal: number) => (status[1] = signal), readyToRead: (): 127 => (status[0] = lastSignal = 127), voidMessage: (): 224 => (status[0] = lastSignal = 224), hasNoMoreMessages: (): 255 => (status[0] = lastSignal = 255), diff --git a/experimental/test.ts b/experimental/test.ts new file mode 100644 index 0000000..843100f --- /dev/null +++ b/experimental/test.ts @@ -0,0 +1,25 @@ +import { compose } from "./fixpoint.ts"; + +import { aaa, bbb, ccc } from "./functions.ts"; + +const { termminate, resolver } = compose({ + threads: 2, +})({ + ccc, + aaa, + bbb, +}); + +await Promise.all([ + resolver.aaa(new Uint8Array([1])), + resolver.aaa(new Uint8Array([1])), + resolver.aaa(new Uint8Array([1])), + resolver.bbb(new Uint8Array([1])), + resolver.bbb(new Uint8Array([1])), + resolver.bbb(new Uint8Array([1])), + resolver.ccc(new Uint8Array([1])), + resolver.ccc(new Uint8Array([1])), + resolver.ccc(new Uint8Array([1])), +]).then(console.log); + +termminate(); diff --git a/experimental/worker.ts b/experimental/worker.ts index d335123..edd438c 100644 --- a/experimental/worker.ts +++ b/experimental/worker.ts @@ -1,78 +1,73 @@ import { workerData } from "node:worker_threads"; -import { - readMessageToUint, - setArrayBuffers, - writeUintMessage, -} from "./helpers.ts"; +import { readMessageToUint, writeUintMessage } from "./helpers.ts"; import { multi } from "./workerQueue.ts"; - import { signalsForWorker, workerSignal } from "./signal.ts"; - -const decoder = new TextEncoder(); - -const listOfFunctions = [ - async () => { - let sum = 0; - - // Increase or decrease the loop count for more or less work - const iterations = 10000; - - for (let i = 0; i < iterations; i++) { - sum += performance.now(); - } - - return decoder.encode(sum.toString()); - }, -]; - -const sharedSab = workerData.sab as SharedArrayBuffer; - -const signals = signalsForWorker({ - sharedSab, -}); - -const status = setArrayBuffers.status(signals.sab); -const id = setArrayBuffers.id(signals.sab); - -const workerSig = workerSignal(signals); - -const readMsg = readMessageToUint(signals); -const writeMsg = writeUintMessage(signals); - -const queue = multi({ - jobs: listOfFunctions, - writer: writeMsg, - status, -}); - -while (true) { - switch (workerSig.curretSignal()) { - case 127: { - if (queue.someHasFinished()) { - queue.write(); - continue; +import { getFunctions } from "./fixpoint.ts"; + +const mainLoop = async () => { + const sharedSab = workerData.sab as SharedArrayBuffer; + + const signals = signalsForWorker({ + sharedSab, + }); + + const listOfFunctions = await getFunctions({ + list: workerData.list, + isWorker: true, + ids: workerData.ids, + }) + .then( + (objs) => + objs.map( + (obj) => obj.f, + ), + ); + + const status = signals.status; + const id = signals.id; + + const workerSig = workerSignal(signals); + + const readMsg = readMessageToUint(signals); + const writeMsg = writeUintMessage(signals); + + const queue = multi({ + jobs: listOfFunctions, + writer: writeMsg, + status, + }); + + while (true) { + switch (workerSig.curretSignal()) { + case 127: { + if (queue.someHasFinished()) { + queue.write(); + break; + } + + if (queue.allDone()) { + workerSig.finishedAllTasks(); + break; + } + workerSig.messageWasRead(); + break; } - - if (queue.allDone()) { - workerSig.finishedAllTasks(); - continue; - } - workerSig.messageWasRead(); - break; + case 224: + { + queue.add([id[0], null, status[1], 224]); + } + break; + case 192: + { + queue.add([id[0], readMsg(), status[1], 192]); + } + + break; } - case 224: - { - queue.add([id[0], null, status[1], 224]); - } - break; - case 192: - { - queue.add([id[0], readMsg(), status[1], 192]); - } - break; + // Process the next job + await queue.nextJob(); } +}; - // Process the next job - await queue.nextJob(); -} +mainLoop(); diff --git a/experimental/workerQueue.ts b/experimental/workerQueue.ts index 5283725..e55371d 100644 --- a/experimental/workerQueue.ts +++ b/experimental/workerQueue.ts @@ -10,7 +10,17 @@ type ArgumetnsForMulti = { export const multi = ({ jobs, max, writer, status }: ArgumetnsForMulti) => { const queue = Array.from( { length: max ?? 10 }, - () => [false, false, false, 0, null, 0, new Uint8Array(), 224] as QueueList, + () => + [ + false, + false, + false, + 0, + new Uint8Array(), + 0, + new Uint8Array(), + 224, + ] as QueueList, ); return { @@ -20,7 +30,6 @@ export const multi = ({ jobs, max, writer, status }: ArgumetnsForMulti) => { // Add a task to the queue. add: (element: PartialQueueList) => { const freeSlot = queue.findIndex((task) => !task[0]); - if (freeSlot !== -1) { queue[freeSlot][0] = true; queue[freeSlot][3] = element[0]; @@ -71,9 +80,11 @@ export const multi = ({ jobs, max, writer, status }: ArgumetnsForMulti) => { if (taskIndex !== -1) { queue[taskIndex][1] = true; // Lock the task try { - queue[taskIndex][6] = await jobs[queue[taskIndex][5]]( - queue[taskIndex][4], - ); // Execute the job + queue[taskIndex][6] = queue[taskIndex][7] === 224 + ? await jobs[queue[taskIndex][5]]() + : await jobs[queue[taskIndex][5]]( + queue[taskIndex][4], + ); queue[taskIndex][2] = true; // Mark as solved } finally { queue[taskIndex][1] = false; // Unlock the task