Skip to content

Commit

Permalink
removing promise from add
Browse files Browse the repository at this point in the history
  • Loading branch information
mimiMonads committed Jan 1, 2025
1 parent 610a647 commit e899016
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 393 deletions.
65 changes: 65 additions & 0 deletions experimental/checker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import type { MultiQueue } from "./mainQueue.ts";
import type { MainSignal } from "./signal.ts";

export const checker = ({
signalBox,
queue,
}: {
queue: MultiQueue;
signalBox: MainSignal;
}) => {
return function check() {
const currentStatus = signalBox.updateLastSignal();

// DEBBUGING STEPS
//changeOfSignal(currentStatus);

// If has posted something
if (currentStatus < 126) {
// If worker posted a "response" (status=0), solve it
if (currentStatus === 0) {
queue.solve();

if (queue.canWrite()) {
queue.sendNextToWorker();
} else {
signalBox.readyToRead();
}

queueMicrotask(check);
return;
}

if (currentStatus === 2) {
if (queue.canWrite()) {
queue.sendNextToWorker();
queueMicrotask(check);
return;
}

signalBox.hasNoMoreMessages();

// DEBBUGING STEPS
// console.log("Finish by 2");
return;
}

signalBox.readyToRead();
queueMicrotask(check);
return;
}

// If worker is "done" or requests more (status=255)
if (currentStatus === 255) {
if (queue.canWrite()) {
queue.sendNextToWorker();
} else {
// DEBBUGING STEPS
// console.log("Finish by 255");
return;
}
}

queueMicrotask(check);
};
};
43 changes: 9 additions & 34 deletions experimental/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
import type { MainList, QueueList } from "./mainQueue.ts";
import type { SignalArguments } from "./signal.ts";

// Signals
type StatusSignalForVoid = 224;
export type StatusSignal = StatusSignalForVoid;

/**
* QueueList:
* - Represents the structure of a queue item in the worker thread.
* 0: Free - Whether the task is assigned.
* 1: Locked - Whether the task is locked/in-progress.
* 2: Solved - Whether the task has been completed.
* 3: TaskID - ID of the task.
* 4: RawArguments - Input arguments for the task.
* 5: FunctionID - ID of the function to execute.
* 6: WorkerResponse - Result of the task.
*/

// Generate unique task IDs.
export const genTaskID =
((counter = new Int32Array([0])) => () => (counter[0] += 1))();
Expand All @@ -41,30 +30,15 @@ export const setArrayBuffers = {
payload: (sab: SharedArrayBuffer) => new Uint8Array(sab, 8),
};

// Main thread signal management.
export const mainSignal = (status: Uint8Array) => ({
send: (): 192 => (status[0] = 192),
readyToRead: (): 127 => (status[0] = 127),
voidMessage: (): 224 => (status[0] = 224),
hasNoMoreMessages: (): 255 => (status[0] = 255),
});

// Worker thread signal management.
export const workerSignal = (status: Uint8Array) => ({
messageReady: (): 0 => (status[0] = 0),
messageWasRead: (): 1 => (status[0] = 1),
finishedAllTasks: (): 2 => (status[0] = 2),
});

// Read a message from a Uint8Array.
export const readMessageToUint = (buffer: Uint8Array) => () => {
const terminatorIndex = buffer.lastIndexOf(10);
return terminatorIndex >= 0 ? buffer.slice(0, terminatorIndex) : null;
export const readMessageToUint = ({ payload }: SignalArguments) => () => {
const terminatorIndex = payload.lastIndexOf(10);
return payload.slice(0, terminatorIndex);
};

// Write a Uint8Array message with task metadata.
export const writeUintMessage =
(idBuffer: Int32Array) => (payload: Uint8Array) => (task: QueueList) => {
({ id, payload }: SignalArguments) => (task: QueueList) => {
payload.fill(0);
// If it's not null
if (task[6] !== null) {
Expand All @@ -74,12 +48,11 @@ export const writeUintMessage =
payload[0] = 10;
}
// console.log("to send id: " + task[3]);
idBuffer[0] = task[3]; // Task ID
id[0] = task[3]; // Task ID
};

export const sendUintMessage =
(idBuffer: Int32Array) => (payload: Uint8Array) => (task: MainList) => {
idBuffer[0] = task[2];
({ id, payload }: SignalArguments) => (task: MainList) => {
payload.fill(0);
// If it's not null
if (task[5] !== null) {
Expand All @@ -88,6 +61,8 @@ export const sendUintMessage =
} else {
payload[0] = 10;
}

id[0] = task[2];
};

export const optimalOrder = (n: number) => {
Expand Down
172 changes: 64 additions & 108 deletions experimental/main.ts
Original file line number Diff line number Diff line change
@@ -1,117 +1,49 @@
// main.ts
import { Worker } from "node:worker_threads";
import { bench, boxplot, run } from "mitata";
import { multi, single } from "./mainQueue.ts";
import { multi, type MultiQueue } from "./mainQueue.ts";
import {
genTaskID,
mainSignal,
optimalOrder,
readMessageToUint,
sendUintMessage,
setArrayBuffers,
} from "./helpers.ts";

import { mainSignal, signalsForWorker } from "./signal.ts";

import { checker } from "./checker.ts";

const currentPath = import.meta.url;
const workerUrl = new URL(currentPath.replace("main.ts", "worker.ts"));

// ─────────────────────────────────────────────────────────────────────────────
// SHARED BUFFERS
// ─────────────────────────────────────────────────────────────────────────────
const sab = setArrayBuffers.sab();
const status = setArrayBuffers.status(sab);
const id = setArrayBuffers.id(sab);
const payload = setArrayBuffers.payload(sab);
const writer = sendUintMessage(id)(payload);
const signals = signalsForWorker();
const signalBox = mainSignal(signals);

const writer = sendUintMessage(signals);
const reader = readMessageToUint(signals);
const queue = multi({
writer,
status,
max: 10,
})();
//const queue = single({ writer, status });
// ─────────────────────────────────────────────────────────────────────────────
// MAIN THREAD
// ─────────────────────────────────────────────────────────────────────────────
const worker = new Worker(workerUrl, { type: "module", workerData: { sab } });
const mainSig = mainSignal(status);
const readMessage = readMessageToUint(payload);
mainSig.hasNoMoreMessages();

// ─────────────────────────────────────────────────────────────────────────────
// The "check" loop that never ends (so your tasks always get resolved)
// ─────────────────────────────────────────────────────────────────────────────

const changeOfSignal = ((store: number) => (status: number) => {
if (store === status) {
return;
}
store = status;
console.log("status change to:");
console.log(store);
})(0);
function check() {
const currentStatus = status[0];

// DEBBUGING STEPS
//changeOfSignal(currentStatus);

// If has posted something
if (currentStatus < 126) {
// If worker posted a "response" (status=0), solve it
if (currentStatus === 0) {
queue.solve(id[0], readMessage());

if (queue.canWrite()) {
queue.sendNextToWorker();
} else {
mainSig.readyToRead();
}

queueMicrotask(check);
return;
}

if (currentStatus === 2) {
if (queue.canWrite()) {
queue.sendNextToWorker();
queueMicrotask(check);
return;
}

mainSig.hasNoMoreMessages();

// DEBBUGING STEPS
// console.log("Finish by 2");
return;
}

mainSig.readyToRead();
queueMicrotask(check);
return;
}

// If worker is "done" or requests more (status=255)
if (currentStatus === 255) {
if (queue.canWrite()) {
queue.sendNextToWorker();
} else {
// DEBBUGING STEPS
// console.log("Finish by 255");
return;
}
}

queueMicrotask(check);
}
signalBox,
reader,
});
const check = checker({
signalBox,
queue,
});

//console.log("MAIN => Starting tasks...");
const worker = new Worker(workerUrl, {
type: "module",
workerData: { sab: signals.sab },
});

const decoder = new TextEncoder();

const f = async () => {
let sum = 0;

// Increase or decrease the loop count for more or less work
const iterations = 1_000;
const iterations = 10;

for (let i = 0; i < iterations; i++) {
sum += performance.now();
Expand All @@ -121,7 +53,7 @@ const f = async () => {
};

type Resolver = {
queue: ReturnType<ReturnType<typeof multi>>;
queue: MultiQueue;
fn: Function;
fnNumber: number;
status: Uint8Array;
Expand All @@ -140,38 +72,62 @@ const resolver = (args: Resolver) => {
return async () =>
seq() ? fn() : queue.isBusy() ? fn() : (
isActive(status),
await queue.add([
genTaskID(),
null,
fnNumber,
statusSignal,
])
queue.awaits(
queue.add([
genTaskID(),
null,
fnNumber,
statusSignal,
]),
)
);
};

const forTest = resolver({
//@ts-ignore
queue,
fn: f,
status,
status: signals.status,
fnNumber: 0,
statusSignal: 224,
});

await forTest().then((x) => new TextDecoder().decode(x)).then(console.log);
await forTest().then((x) => new TextDecoder().decode(x)).then(console.log);
await forTest().then((x) => new TextDecoder().decode(x)).then(console.log);
await forTest().then((x) => new TextDecoder().decode(x)).then(console.log);
await forTest().then((x) => new TextDecoder().decode(x)).then(console.log);

boxplot(async () => {
bench("main + 1 thread ", async () => {
await forTest();
await forTest();
await forTest();
await forTest();
queueMicrotask(check);
const a = queue.add([
genTaskID(),
null,
0,
224,
]),
c = queue.add([
genTaskID(),
null,
0,
224,
]),
b = queue.add([
genTaskID(),
null,
0,
224,
]);

return queue.awaitArray([a, b, c]);
});
bench("main ", async () => {
await Promise.all([
f(),
f(),
f(),
f(),
]);
const a = await f();
const b = await f();
const c = await f();

return [a, b, c];
});
});

Expand Down
Loading

0 comments on commit e899016

Please sign in to comment.