Skip to content

Commit

Permalink
semi stable
Browse files Browse the repository at this point in the history
  • Loading branch information
mimiMonads committed Jan 9, 2025
1 parent 11897c3 commit 184d3a5
Show file tree
Hide file tree
Showing 10 changed files with 425 additions and 300 deletions.
32 changes: 32 additions & 0 deletions experimental/bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { bench, boxplot, group, run, summary } from "mitata";
import { compose } from "./fixpoint.ts";
import { aaa } from "./functions.ts";

const EMPTYUI8 = new Uint8Array([1, 2, 3]);

const f = aaa.f;

const { termminate, resolver } = compose({
threads: 1,
})({
aaa,
});

boxplot(async () => {
group("5", () => {
summary(() => {
bench("main * 5", async () => {
await f(EMPTYUI8), await f(EMPTYUI8);
});

bench(" 5 thread ", async () => {
await resolver.aaa(EMPTYUI8), await resolver.aaa(EMPTYUI8);
});
});
});
});
await run();

await resolver.aaa(new Uint8Array([5, 3, 2, 1, 0])).then(console.log);

termminate();
189 changes: 189 additions & 0 deletions experimental/fixpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import { getCallerFile } from "./helpers.ts";
import { genTaskID } from "./helpers.ts";
import { createContext } from "./main.ts";
import type { PromiseMap } from "./mainQueue.ts";

type Args = "void" | "uint8";
const symbol = Symbol.for("FIXEDPOINT");

type FixPoint<A extends Args> = {
args: A;
f: (
args: A extends "void" ? void : Uint8Array,
) => Promise<Uint8Array>;
};

type SecondPart = {
statusSignal: 224 | 192;
[symbol]: string;
id: number;
importedFrom: string;
};

type Composed = {
args: Args;
f: Function;
} & SecondPart;

type ReturnFixed<A extends Args> = FixPoint<A> & SecondPart;

export const fixedPoint = <A extends Args>(
I: FixPoint<A>,
): ReturnFixed<A> => {
return ({
...I,
statusSignal: I.args === "void" ? 224 : 192,
id: genTaskID(),
importedFrom: new URL(getCallerFile(2)).href,
[symbol]: "vixeny",
});
};

type UnionReturnFixed = ReturnFixed<Args>;

type FunctionMapType<T extends Record<string, Composed>> = {
[K in keyof T]: T[K]["f"];
};

export type GetFunctions = ReturnType<typeof getFunctions>;

export const getFunctions = async ({ list, ids }: {
list: string[];
isWorker: boolean;
ids: number[];
}) => {
const results = await Promise.all(
list.map(async (imports) => {
const module = await import(imports);
return Object.entries(module) // Use `Object.entries` to include names
.filter(
([_, value]): value is ReturnFixed<any> =>
typeof value === "object" &&
value !== null &&
!Array.isArray(value) &&
Object.getOwnPropertySymbols(value).some(
(sym) => sym === Symbol.for("FIXEDPOINT"),
),
)
.map(([name, value]) => ({
//@ts-ignore trust me
...value,
name,
}));
}),
);

// Flatten the results, filter by IDs, and sort
const flattenedResults = results
.flat()
.filter((obj) => ids.includes(obj.id))
.sort((a, b) => a.name.localeCompare(b.name));

return flattenedResults as unknown as (UnionReturnFixed & { name: string })[];
};

const toListAndIds = (args: Record<string, Composed>, filter?: string) => {
const result = Object.values(args)
.reduce(
(acc, v) => (
acc[0].add(v.importedFrom), acc[1].add(v.id), acc
),
[
new Set<string>(),
new Set<number>(),
] as [
Set<string>,
Set<number>,
],
);

if (filter) {
console.log(filter);
result[0].delete(filter);
console.log(result);
}

return Object.fromEntries([
["list", [...result[0]]],
["ids", [...result[1]]],
]) as {
list: string[];
ids: number[];
};
};

const loopingBetweenThreads =
((n) => (functions: Function[]) => (max: number) => (args: any) =>
n === max ? functions[n = 0](args) : functions[n++](args))(0);

export const compose = ({
threads,
}: {
threads?: number;
}) =>
<T extends Record<string, Composed>>(args: T) => {
const promisesMap: PromiseMap = new Map();

const { list, ids } = toListAndIds(args);

const listOfFunctions = Object.entries(args).map(([k, v]) => ({
...v,
name: k,
}))
.sort((a, b) => a.name.localeCompare(b.name));

const workers = Array.from({
length: threads ?? 1,
})
.map((_) =>
createContext({
promisesMap,
list,
ids,
})
);

const map = workers.map(
(worker) => {
return listOfFunctions
.map((list, index) => ({ ...list, index }))
.reduce((acc, v) => {
{
acc.set(
v.name,
worker.resolver({
queue: worker.queue,
fnNumber: v.index,
statusSignal: v.statusSignal,
}),
);
}
return acc;
}, new Map<string, ReturnType<typeof worker.resolver>>());
},
)
.reduce((acc, map) => {
map.forEach(
(v, k) => {
const fun = acc.get(k);
if (fun) {
acc.set(k, [...fun, v]);
} else {
acc.set(k, [v]);
}
},
);

return acc;
}, new Map<string, Function[]>());

const resolve = new Map<string, (args: any) => Promise<any>>();
map.forEach((v, k) => {
resolve.set(k, loopingBetweenThreads(v)(v.length));
});

return {
termminate: () => workers.forEach((worker) => worker.kills()),
resolver: Object.fromEntries(resolve) as unknown as FunctionMapType<T>,
};
};
27 changes: 27 additions & 0 deletions experimental/functions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { fixedPoint } from "./fixpoint.ts";

export const aaa = fixedPoint({
args: "uint8",
f: async (arr) => {
// Simulate an expensive operation

let time = 100000;

while (time !== 0) {
performance.now();
time--;
}

return Array.from(arr).map((num) => num * 2);
},
});

export const bbb = fixedPoint({
args: "uint8",
f: async (arr) => new Uint8Array([2]),
});

export const ccc = fixedPoint({
args: "uint8",
f: async (arr) => new Uint8Array([3]),
});
79 changes: 23 additions & 56 deletions experimental/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,43 @@ type StatusSignalForMessage = 192;
export type StatusSignal = StatusSignalForVoid | StatusSignalForMessage;

// Generate unique task IDs.
export const genTaskID =
((counter = new Int32Array([0])) => () => (counter[0] += 1))();
export const genTaskID = ((counter: number) => () => counter++)(0);

// Get the current file's path.
export const currentPath = () => new URL(import.meta.url);

// Write a response to a buffer.
export const writeResponse = (buffer: Uint8Array) => (msg: Uint8Array) => {
buffer.fill(0);
buffer.set(msg, 0);
buffer[msg.length] = 10; // Terminator
};

// Manage shared buffers for communication.
export const setArrayBuffers = {
sab: (size = 1024) => new SharedArrayBuffer(size),

status: (sab: SharedArrayBuffer) => new Uint8Array(sab, 0, 2),

id: (sab: SharedArrayBuffer) => new Int32Array(sab, 4, 1),

payload: (sab: SharedArrayBuffer) => new Uint8Array(sab, 8),
};

// Read a message from a Uint8Array.
export const readMessageToUint = ({ payload }: SignalArguments) => () => {
const terminatorIndex = payload.lastIndexOf(10);
return payload.slice(0, terminatorIndex);
};
export const readMessageToUint =
({ payload, payloadLenght }: SignalArguments) => () =>
payload.slice(0, payloadLenght[0].valueOf());

// Write a Uint8Array message with task metadata.
export const writeUintMessage =
({ id, payload }: SignalArguments) => (task: QueueList) => {
payload.fill(0);
// If it's not null
if (task[6] !== null) {
payload.set(task[6], 0);
payload[task[6].length] = 10; // Terminator
} else {
payload[0] = 10;
}

id[0] = task[3]; // Task ID
({ id, payload, payloadLenght }: SignalArguments) => (task: QueueList) => {
payload.set(task[6], 0);
payloadLenght[0] = task[6].length;
id[0] = task[3];
};

export const sendUintMessage =
({ id, payload }: SignalArguments) => (task: MainList) => {
payload.fill(0);
// If it's not null
if (task[5] !== null) {
payload.set(task[5], 0);
payload[task[5].length] = 10; // Terminator
} else {
payload[0] = 10;
}

({ id, payload, payloadLenght }: SignalArguments) => (task: MainList) => {
payload.set(task[3], 0);
payloadLenght[0] = task[3].length;
id[0] = task[2];
};

export const optimalOrder = (n: number) => {
const a = Array.from(
{ length: (n * 2) },
(_, i) => i % 2 == 1,
);
a[a.length - 1] = true;
if (n > 3) {
a[a.length - 2] = true;
export const getCallerFile = (n: number) => {
const originalStackTrace = Error.prepareStackTrace;
Error.prepareStackTrace = (_, stack) => stack;
const err = new Error();
const stack = err.stack as unknown as NodeJS.CallSite[];
Error.prepareStackTrace = originalStackTrace;
// Get the caller file
const caller = stack[n]?.getFileName();

if (!caller) {
throw new Error("Unable to determine caller file.");
}

return ((n: number) => (m: number) => () => n === m ? a[n = 0] : a[n++])(0)(
(n * 2) - 1,
);
return caller;
};
Loading

0 comments on commit 184d3a5

Please sign in to comment.