From e6df19b659d8c4d0fd60cfa6fd3d00a0f29a8f0d Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Wed, 6 Sep 2023 20:11:54 -0700 Subject: [PATCH 01/10] Refactor Reactor::{connect, connectMulti, canConnect} 0. `canConnect` doesn't experience situations where throwing Error is required. 1. `canConnect` is solely for testing and should not throw. 2. `connect` and `connectMulti` should throw. Simply return result from `canConnect` and throw in functions that calls them. --- src/core/reactor.ts | 66 +++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 075e863ac..603a568e9 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -61,6 +61,18 @@ export interface Call extends Write, Read { invoke: (args: A) => R | undefined; } +export enum CanConnectResult { + SUCCESS = 0, + SELF_LOOP = "Source port and destination port are the same.", + DESTINATION_OCCUPIED = "Destination port is already occupied.", + DOWNSTREAM_WRITE_CONFLICT = "Write conflict: port is already occupied.", + NOT_IN_SCOPE = "Source and destination ports are not in scope.", + RT_CONNECTION_OUTSIDE_CONTAINER = "New connection is outside of container.", + RT_DIRECT_FEED_THROUGH = "New connection introduces direct feed through.", + RT_CYCLE = "New connection introduces cycle.", + MUTATION_CAUSALITY_LOOP = "New connection will change the causal effect of the mutation that triggered this connection." +} + /** * Abstract class for a schedulable action. It is intended as a wrapper for a * regular action. In addition to a get method, it also has a schedule method @@ -1091,10 +1103,13 @@ export abstract class Reactor extends Component { * @param src The start point of a new connection. * @param dst The end point of a new connection. */ - public canConnect(src: IOPort, dst: IOPort): boolean { + public canConnect( + src: IOPort, + dst: IOPort + ): CanConnectResult { // Immediate rule out trivial self loops. if (src === dst) { - throw Error("Source port and destination port are the same."); + return CanConnectResult.SELF_LOOP; } // Check the race condition @@ -1102,7 +1117,7 @@ export abstract class Reactor extends Component { // in addReaction) const deps = this._dependencyGraph.getUpstreamNeighbors(dst); // FIXME this will change with multiplex ports if (deps !== undefined && deps.size > 0) { - throw Error("Destination port is already occupied."); + return CanConnectResult.DESTINATION_OCCUPIED; } if (!this._runtime.isRunning()) { @@ -1114,10 +1129,13 @@ export abstract class Reactor extends Component { // Rule out write conflicts. // - (between reactors) if (this._dependencyGraph.getDownstreamNeighbors(dst).size > 0) { - return false; + return CanConnectResult.DOWNSTREAM_WRITE_CONFLICT; } - return this._isInScope(src, dst); + if (!this._isInScope(src, dst)) { + return CanConnectResult.NOT_IN_SCOPE; + } + return CanConnectResult.SUCCESS; } else { // Attempt to make a connection while executing. // Check the local dependency graph to figure out whether this change @@ -1131,7 +1149,7 @@ export abstract class Reactor extends Component { src._isContainedBy(this) && dst._isContainedBy(this) ) { - throw Error("New connection is outside of container."); + return CanConnectResult.RT_CONNECTION_OUTSIDE_CONTAINER; } // Take the local graph and merge in all the causality interfaces @@ -1148,23 +1166,21 @@ export abstract class Reactor extends Component { // 1) check for loops const hasCycle = graph.hasCycle(); + if (hasCycle) { + return CanConnectResult.RT_CYCLE; + } // 2) check for direct feed through. // FIXME: This doesn't handle while direct feed thorugh cases. - let hasDirectFeedThrough = false; - if (src instanceof InPort && dst instanceof OutPort) { - hasDirectFeedThrough = dst.getContainer() === src.getContainer(); - } - // Throw error cases - if (hasDirectFeedThrough && hasCycle) { - throw Error("New connection introduces direct feed through and cycle."); - } else if (hasCycle) { - throw Error("New connection introduces cycle."); - } else if (hasDirectFeedThrough) { - throw Error("New connection introduces direct feed through."); + if ( + src instanceof InPort && + dst instanceof OutPort && + dst.getContainer() === src.getContainer() + ) { + return CanConnectResult.RT_DIRECT_FEED_THROUGH; } - return true; + return CanConnectResult.SUCCESS; } } @@ -1258,11 +1274,14 @@ export abstract class Reactor extends Component { if (dst === undefined || dst === null) { throw new Error("Cannot connect unspecified destination"); } - if (this.canConnect(src, dst)) { - this._uncheckedConnect(src, dst); - } else { - throw new Error(`ERROR connecting ${src} to ${dst}`); + const canConnectResult = this.canConnect(src, dst); + // I know, this looks a bit weird. But + if (canConnectResult !== CanConnectResult.SUCCESS) { + throw new Error( + `ERROR connecting ${src} to ${dst}. Reason is ${canConnectResult.valueOf()}` + ); } + this._uncheckedConnect(src, dst); } protected _connectMulti( @@ -1316,7 +1335,8 @@ export abstract class Reactor extends Component { } for (let i = 0; i < leftPorts.length && i < rightPorts.length; i++) { - if (!this.canConnect(leftPorts[i], rightPorts[i])) { + const canConnectResult = this.canConnect(leftPorts[i], rightPorts[i]); + if (canConnectResult !== CanConnectResult.SUCCESS) { throw new Error( `ERROR connecting ${leftPorts[i]} to ${rightPorts[i]} From 95c0338d9f70c153e69358443d97b3165118033c Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Mon, 11 Sep 2023 21:44:14 -0700 Subject: [PATCH 02/10] Change tests accordingly so they will pass Most of them were achieved by a dirty hack: by changing `toBe(true)` to be `toBeFalsy()` and vice versa. The negation of truthiness is because if connection can be established, we return 0, which is falsy. Otherwise we return an error enum which is truthy. This is a bit C-esque and not JS-y, but let's keep it this way as of now. Throw-catch would technically work, but it's not technically an error because we are **testing** if a connection can be established. Returning an error would also technically work, and we can add some type matching to make it work more smoothly, but as of now I intend to keep it this way. One WIP would be to change `toBeTruthy` to the actual error code, but then some tests might break. We will need to investigate instead of coding tests based on the results. --- __tests__/HierarchicalSingleEvent.test.ts | 5 +- __tests__/SingleEvent.test.ts | 8 +-- __tests__/connection.test.ts | 9 ++-- __tests__/hierarchy.ts | 64 ++++++++++------------- 4 files changed, 38 insertions(+), 48 deletions(-) diff --git a/__tests__/HierarchicalSingleEvent.test.ts b/__tests__/HierarchicalSingleEvent.test.ts index 45e7b9989..77eb80349 100644 --- a/__tests__/HierarchicalSingleEvent.test.ts +++ b/__tests__/HierarchicalSingleEvent.test.ts @@ -4,7 +4,8 @@ import { Parameter, OutPort, InPort, - TimeValue + TimeValue, + CanConnectResult } from "../src/core/internal"; import {SingleEvent} from "../src/share/SingleEvent"; @@ -93,7 +94,7 @@ describe("HierarchicalSingleEvent", function () { seTest.seContainer.child.o, seTest.logContainer.child.i ) - ).toBe(false); + ).toBe(CanConnectResult.NOT_IN_SCOPE); seTest._start(); }); diff --git a/__tests__/SingleEvent.test.ts b/__tests__/SingleEvent.test.ts index ac89fb1f3..ab49c55fd 100644 --- a/__tests__/SingleEvent.test.ts +++ b/__tests__/SingleEvent.test.ts @@ -32,12 +32,8 @@ describe("SingleEvent", function () { expect(expect(seTest.singleEvent).toBeInstanceOf(SingleEvent)); expect(expect(seTest.logger).toBeInstanceOf(Logger)); - expect(function () { - seTest.canConnect(seTest.singleEvent.o, seTest.logger.i); - }).toThrow(new Error("Destination port is already occupied.")); - expect(seTest.canConnect(seTest.logger.i, seTest.singleEvent.o)).toBe( - false - ); + expect(seTest.canConnect(seTest.singleEvent.o, seTest.logger.i)).toBeTruthy(); + expect(seTest.canConnect(seTest.logger.i, seTest.singleEvent.o)).toBeTruthy(); seTest._start(); }); diff --git a/__tests__/connection.test.ts b/__tests__/connection.test.ts index 3d8725e30..4798b7bc3 100644 --- a/__tests__/connection.test.ts +++ b/__tests__/connection.test.ts @@ -5,7 +5,8 @@ import { OutPort, InPort, TimeUnit, - TimeValue + TimeValue, + CanConnectResult } from "../src/core/internal"; describe("Check canConnect", () => { @@ -30,19 +31,19 @@ describe("Check canConnect", () => { it("canConnect success out->in", () => { expect(this.canConnect(this.source.out, this.destination.in)).toBe( - true + CanConnectResult.SUCCESS ); }); it("canConnect success out->out", () => { expect(this.canConnect(this.source.out, this.destination.out)).toBe( - true + CanConnectResult.SUCCESS ); }); it("canConnect failure", () => { expect(this.canConnect(this.destination.in, this.source.out)).toBe( - false + CanConnectResult.NOT_IN_SCOPE ); }); } diff --git a/__tests__/hierarchy.ts b/__tests__/hierarchy.ts index ec7fda22f..0515e861e 100644 --- a/__tests__/hierarchy.ts +++ b/__tests__/hierarchy.ts @@ -1,4 +1,4 @@ -import {Reactor, App, InPort, OutPort} from "../src/core/internal"; +import {Reactor, App, InPort, OutPort, CanConnectResult} from "../src/core/internal"; class InOut extends Reactor { a = new InPort(this); @@ -36,65 +36,57 @@ describe("Container to Contained", () => { it("testing canConnect", () => { expect( app.container.canConnect(app.container.a, app.container.contained.a) - ).toBe(true); + ).toBe(CanConnectResult.SUCCESS); expect( app.container.canConnect(app.container.contained.a, app.container.a) - ).toBe(false); + ).toBe(CanConnectResult.NOT_IN_SCOPE); expect( app.container.canConnect( app.container.a, app.container.b ) - ).toBe(true); + ).toBe(CanConnectResult.SUCCESS); expect( app.container.canConnect( app.container.contained.a, app.container.contained.b ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect( app.container.contained.b, app.container.contained.a ) - ).toBe(true); + ).toBeFalsy(); expect( app.container.canConnect(app.container.a, app.container.contained.b) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.contained.b, app.container.a) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.b, app.container.contained.a) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.contained.a, app.container.b) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.b, app.container.contained.b) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.contained.b, app.container.b) - ).toBe(true); - - expect(app.container.canConnect(app.container.contained.a, app.foo.a)).toBe( - false - ); - expect(app.container.canConnect(app.container.contained.a, app.foo.b)).toBe( - false - ); - expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBe( - false - ); - expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBe( - false - ); - - expect(app.container.canConnect(app.foo.a, app.container.b)).toBe(false); - expect(app.container.canConnect(app.foo.a, app.container.a)).toBe(false); + ).toBeFalsy(); + + expect(app.container.canConnect(app.container.contained.a, app.foo.a)).toBeTruthy(); + expect(app.container.canConnect(app.container.contained.a, app.foo.b)).toBeTruthy(); + expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBeTruthy(); + expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBeTruthy(); + + expect(app.container.canConnect(app.foo.a, app.container.b)).toBeTruthy(); + expect(app.container.canConnect(app.foo.a, app.container.a)).toBeTruthy(); // expect(app.container.contained).toBeDefined(); @@ -104,49 +96,49 @@ describe("Container to Contained", () => { app.container.contained.containedAgain.a, app.container.contained.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.container.contained.b ) - ).toBe(true); + ).toBeFalsy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.a, app.container.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.container.b ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.a, app.foo.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.foo.b ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.a, app.foo.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.foo.b ) - ).toBe(false); + ).toBeTruthy(); // } }); }); From 651a73b31583c3307d5c244153845867edac0916 Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Mon, 18 Sep 2023 14:06:58 -0700 Subject: [PATCH 03/10] Implemented connectable port 1. Implemented ConnectablePort that can be used as `Variable` 2. Refactored ConnectablePort with function overloading at the same time to facilitate better type check (but due to TS limitation there's no type narrowing, see https://github.com/microsoft/TypeScript/issues/22609) 3. Made tests conform to the new connection API --- __tests__/InvalidMutations.ts | 8 +++---- __tests__/SimpleMutation.test.ts | 2 +- __tests__/disconnect.test.ts | 6 ++--- __tests__/mutations.test.ts | 2 +- src/core/port.ts | 14 +++++++++++- src/core/reactor.ts | 39 ++++++++++++++++++++++++-------- 6 files changed, 51 insertions(+), 20 deletions(-) diff --git a/__tests__/InvalidMutations.ts b/__tests__/InvalidMutations.ts index 17b62f6bf..2345eb393 100644 --- a/__tests__/InvalidMutations.ts +++ b/__tests__/InvalidMutations.ts @@ -40,23 +40,23 @@ class R1 extends Reactor { function (this, __in1, __in2, __out1, __out2) { test("expect error on creating creating direct feed through", () => { expect(() => { - this.connect(__in2, __out2); + this.connect(__in2.asConnectable(), __out2.asConnectable()); }).toThrowError("New connection introduces direct feed through."); }); test("expect error when creating connection outside container", () => { expect(() => { - this.connect(__out2, __in2); + this.connect(__out2.asConnectable(), __in2.asConnectable()); }).toThrowError("New connection is outside of container."); }); const R2 = new R1(this.getReactor()); test("expect error on mutation creating race condition on an output port", () => { expect(() => { - this.connect(R2.out1, __out1); + this.connect(R2.out1.asConnectable(), __out1.asConnectable()); }).toThrowError("Destination port is already occupied."); }); test("expect error on spawning and creating loop within a reactor", () => { expect(() => { - this.connect(R2.out1, R2.in1); + this.connect(R2.out1.asConnectable(), R2.in1.asConnectable()); }).toThrowError("New connection introduces cycle."); }); } diff --git a/__tests__/SimpleMutation.test.ts b/__tests__/SimpleMutation.test.ts index e85d06766..bd1345584 100644 --- a/__tests__/SimpleMutation.test.ts +++ b/__tests__/SimpleMutation.test.ts @@ -46,7 +46,7 @@ class R2 extends Reactor { function (this, __in, __out) { test("expect error to be thrown", () => { expect(() => { - this.connect(__out, __in); + this.connect(__out.asConnectable(), __in.asConnectable()); }).toThrowError("New connection is outside of container."); }); } diff --git a/__tests__/disconnect.test.ts b/__tests__/disconnect.test.ts index b25b31cb2..c876c3365 100644 --- a/__tests__/disconnect.test.ts +++ b/__tests__/disconnect.test.ts @@ -54,11 +54,11 @@ class R1 extends Reactor { const R2 = new R1(this.getReactor()); test("expect that disconnecting an existing connection will not result in an error being thrown", () => { expect(() => { - this.connect(R2.out2, R2.in2); + this.connect(R2.out2.asConnectable(), R2.in2.asConnectable()); this.disconnect(R2.out2, R2.in2); - this.connect(R2.out2, R2.in2); + this.connect(R2.out2.asConnectable(), R2.in2.asConnectable()); this.disconnect(R2.out2); - this.connect(R2.out2, R2.in2); + this.connect(R2.out2.asConnectable(), R2.in2.asConnectable()); }).not.toThrow(); }); } diff --git a/__tests__/mutations.test.ts b/__tests__/mutations.test.ts index 090e90053..55c949642 100644 --- a/__tests__/mutations.test.ts +++ b/__tests__/mutations.test.ts @@ -92,7 +92,7 @@ class Computer extends Reactor { continue; } const x = new AddOne(this.getReactor(), id); - this.connect(src, x.input); + this.connect(src.asConnectable(), x.input.asConnectable()); } } }); diff --git a/src/core/port.ts b/src/core/port.ts index e6206e821..8870d1cd0 100644 --- a/src/core/port.ts +++ b/src/core/port.ts @@ -7,7 +7,8 @@ import type { Absent, MultiReadWrite, ReadWrite, - Variable + Variable, + Read } from "./internal"; import {Trigger, Log} from "./internal"; @@ -59,6 +60,13 @@ export abstract class Port extends Trigger { } } +export class ConnectablePort implements Read { + public get = (): Absent => undefined; + public getPort = (): IOPort => this.port; + + constructor(public port: IOPort) {} +} + /** * Abstract class for a writable port. It is intended as a wrapper for a * regular port. In addition to a get method, it also has a set method and @@ -103,6 +111,10 @@ export abstract class IOPort extends Port { } } + public asConnectable(): ConnectablePort { + return new ConnectablePort(this); + } + /** * Only the holder of the key may obtain a writable port. * @param key diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 603a568e9..7fa4263cc 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -42,7 +42,8 @@ import { Startup, Shutdown, WritableMultiPort, - Dummy + Dummy, + ConnectablePort } from "./internal"; import {v4 as uuidv4} from "uuid"; import {Bank} from "./bank"; @@ -440,16 +441,31 @@ export abstract class Reactor extends Component { * @param src * @param dst */ + + public connect( + src: ConnectablePort, + dst: ConnectablePort + ): void; + public connect( + src: CallerPort, + dst: CalleePort + ): void; public connect( - src: CallerPort | IOPort, - dst: CalleePort | IOPort + ...[src, dst]: + | [ConnectablePort, ConnectablePort] + | [CallerPort, CalleePort] ): void { if (src instanceof CallerPort && dst instanceof CalleePort) { this.reactor._connectCall(src, dst); - } else if (src instanceof IOPort && dst instanceof IOPort) { - this.reactor._connect(src, dst); + } else if ( + src instanceof ConnectablePort && + dst instanceof ConnectablePort + ) { + this.reactor._connect(src.getPort(), dst.getPort()); } else { - // ERROR + throw Error( + "Logically unreachable code: src and dst type mismatch, Caller(ee) port cannot be connected to IOPort." + ); } } @@ -1804,10 +1820,13 @@ interface UtilityFunctions { } export interface MutationSandbox extends ReactionSandbox { - connect: ( - src: CallerPort | IOPort, - dst: CalleePort | IOPort - ) => void; + connect: { + (src: ConnectablePort, dst: ConnectablePort): void; + ( + src: CallerPort, + dst: CalleePort + ): void; + }; disconnect: (src: IOPort, dst?: IOPort) => void; From 641ce7a1ea82c4e5a8275bdcee4b1ec885eabb81 Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Fri, 22 Sep 2023 17:40:13 -0700 Subject: [PATCH 04/10] Adapt Benchmarks to the new ConnectablePort 1. Adapt Sieve by simply using ConnectablePort 2. Adapt Online Facility Location by using ConnectablePort *and* making necessary changes to reflect the actual logic, as discussed with @lhstrh. In this case, two sources: an OutPort (port A), and a mutation, wants to write to an OutPort (port B), but at different times. The correct thing to do, then, will be to make the mutation have its own OutPort (port C) to write to, and when it wants to write to port B, it disconnects A->B, makes connection C->B, and write to C. --- src/benchmark/FacilityLocation.ts | 106 +++++++++++++++++------------- src/benchmark/Sieve.ts | 9 ++- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/src/benchmark/FacilityLocation.ts b/src/benchmark/FacilityLocation.ts index bf72e0bd0..54eb637d8 100644 --- a/src/benchmark/FacilityLocation.ts +++ b/src/benchmark/FacilityLocation.ts @@ -4,7 +4,7 @@ * * @author Hokeun Kim (hokeunkim@berkeley.edu) */ -import type {WritablePort} from "../core/internal"; +import type {IOPort} from "../core/internal"; import { Log, TimeValue, @@ -428,6 +428,7 @@ export class Quadrant extends Reactor { childrenBoundaries = new State(new Array()); totalCost = new State(0); + trollPort = new OutPort(this); constructor( parent: Reactor, @@ -525,14 +526,19 @@ export class Quadrant extends Reactor { this.writable(this.toSecondChild), this.writable(this.toThirdChild), this.writable(this.toFourthChild), - this.writable(this.toAccumulator), + this.toFirstChild.asConnectable(), + this.toSecondChild.asConnectable(), + this.toThirdChild.asConnectable(), + this.toFourthChild.asConnectable(), + this.toAccumulator.asConnectable(), this.localFacilities, this.knownFacilities, this.maxDepthOfKnownOpenFacility, this.supportCustomers, this.hasChildren, this.childrenBoundaries, - this.totalCost + this.totalCost, + this.writable(this.trollPort) ], function ( this, @@ -544,20 +550,26 @@ export class Quadrant extends Reactor { depth, fromProducer, toProducer, - toFirstChild, - toSecondChild, - toThirdChild, - toFourthChild, - toAccumulator, + toFirstChildW, + toSecondChildW, + toThirdChildW, + toFourthChildW, + toFirstChildC, + toSecondChildC, + toThirdChildC, + toFourthChildC, + toAccumulatorC, localFacilities, knownFacilities, maxDepthOfKnownOpenFacility, supportCustomers, hasChildren, childrenBoundaries, - totalCost + totalCost, + trollPort ) { const thisReactor = this.getReactor(); + const toAccumulatorUpstreams: Array> = []; // Helper functions for mutation reaction. const notifyParentOfFacility = function (p: Point): void { @@ -622,11 +634,12 @@ export class Quadrant extends Reactor { // console.log(`Children boundaries: ${childrenBoundaries.get()[0]}, ${childrenBoundaries.get()[1]}, ${childrenBoundaries.get()[2]}, ${childrenBoundaries.get()[3]}`) const accumulator = new Accumulator(thisReactor); - const toAccumulatorOfQuadrant = ( - toAccumulator as unknown as WritablePort - ).getPort(); // Connect Accumulator's output to Quadrant's output. - this.connect(accumulator.toNextAccumulator, toAccumulatorOfQuadrant); + toAccumulatorUpstreams.push(accumulator.toNextAccumulator); + this.connect( + accumulator.toNextAccumulator.asConnectable(), + toAccumulatorC + ); const firstChild = new Quadrant( thisReactor, @@ -640,11 +653,11 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toFirstChildPort = ( - toFirstChild as unknown as WritablePort - ).getPort(); - this.connect(toFirstChildPort, firstChild.fromProducer); - this.connect(firstChild.toAccumulator, accumulator.fromFirstQuadrant); + this.connect(toFirstChildC, firstChild.fromProducer.asConnectable()); + this.connect( + firstChild.toAccumulator.asConnectable(), + accumulator.fromFirstQuadrant.asConnectable() + ); const secondChild = new Quadrant( thisReactor, @@ -658,13 +671,13 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toSecondChildPort = ( - toSecondChild as unknown as WritablePort - ).getPort(); - this.connect(toSecondChildPort, secondChild.fromProducer); this.connect( - secondChild.toAccumulator, - accumulator.fromSecondQuadrant + toSecondChildC, + secondChild.fromProducer.asConnectable() + ); + this.connect( + secondChild.toAccumulator.asConnectable(), + accumulator.fromSecondQuadrant.asConnectable() ); const thirdChild = new Quadrant( @@ -679,11 +692,11 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toThirdChildPort = ( - toThirdChild as unknown as WritablePort - ).getPort(); - this.connect(toThirdChildPort, thirdChild.fromProducer); - this.connect(thirdChild.toAccumulator, accumulator.fromThirdQuadrant); + this.connect(toThirdChildC, thirdChild.fromProducer.asConnectable()); + this.connect( + thirdChild.toAccumulator.asConnectable(), + accumulator.fromThirdQuadrant.asConnectable() + ); const fourthChild = new Quadrant( thisReactor, @@ -697,13 +710,13 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toFourthChildPort = ( - toFourthChild as unknown as WritablePort - ).getPort(); - this.connect(toFourthChildPort, fourthChild.fromProducer); this.connect( - fourthChild.toAccumulator, - accumulator.fromFourthQuadrant + toFourthChildC, + fourthChild.fromProducer.asConnectable() + ); + this.connect( + fourthChild.toAccumulator.asConnectable(), + accumulator.fromFourthQuadrant.asConnectable() ); supportCustomers.set(new Array()); @@ -733,16 +746,16 @@ export class Quadrant extends Reactor { if (childrenBoundaries.get()[i].contains(point)) { switch (i) { case 0: - toFirstChild.set(msg); + toFirstChildW.set(msg); break; case 1: - toSecondChild.set(msg); + toSecondChildW.set(msg); break; case 2: - toThirdChild.set(msg); + toThirdChildW.set(msg); break; case 3: - toFourthChild.set(msg); + toFourthChildW.set(msg); break; } break; @@ -754,7 +767,12 @@ export class Quadrant extends Reactor { case RequestExitMsg: if (!hasChildren.get()) { // No children, number of facilities will be counted on parent's side. - toAccumulator.set( + toAccumulatorUpstreams.forEach((val) => { + this.disconnect(val, toAccumulatorC.getPort()); + }); + this.connect(trollPort.getPort().asConnectable(), toAccumulatorC); + + trollPort.set( new ConfirmExitMsg( 0, // facilities supportCustomers.get().length, // supportCustomers @@ -762,10 +780,10 @@ export class Quadrant extends Reactor { ) ); } else { - toFirstChild.set(msg); - toSecondChild.set(msg); - toThirdChild.set(msg); - toFourthChild.set(msg); + toFirstChildW.set(msg); + toSecondChildW.set(msg); + toThirdChildW.set(msg); + toFourthChildW.set(msg); } break; default: diff --git a/src/benchmark/Sieve.ts b/src/benchmark/Sieve.ts index 4b88776ed..e64327b2d 100644 --- a/src/benchmark/Sieve.ts +++ b/src/benchmark/Sieve.ts @@ -1,5 +1,4 @@ import { - type WritablePort, Parameter, InPort, OutPort, @@ -66,11 +65,12 @@ class Filter extends Reactor { [ this.inp, this.writable(this.out), + this.out.asConnectable(), this.startPrime, this.hasChild, this.localPrimes ], - function (this, inp, out, prime, hasChild, localPrimes) { + function (this, inp, outW, outC, prime, hasChild, localPrimes) { const p = inp.get(); if (p !== undefined) { const seen = localPrimes.get(); @@ -96,14 +96,13 @@ class Filter extends Reactor { // let x = this.create(Filter, [this.getReactor(), p]) // console.log("CREATED: " + x._getFullyQualifiedName()) // FIXME: weird hack. Maybe just accept writable ports as well? - const port = (out as unknown as WritablePort).getPort(); - this.connect(port, n.inp); + this.connect(outC, n.inp.asConnectable()); // FIXME: this updates the dependency graph, but it doesn't redo the topological sort // For a pipeline like this one, it is not necessary, but in general it is. // Can we avoid redoing the entire sort? hasChild.set(true); } else { - out.set(p); + outW.set(p); } } } From 96955428664cdcd9358581db53338d4c88749f85 Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Tue, 26 Sep 2023 03:23:53 -0700 Subject: [PATCH 05/10] Added some tests for `ConnectablePort` Specifically ones that are related to causality graph and `ConnectablePort` - that is, declaring `ConnectablePort` in a mutation should not create dependency between them, but connection between ports should. --- __tests__/mutations.test.ts | 60 ++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/__tests__/mutations.test.ts b/__tests__/mutations.test.ts index 55c949642..59645c245 100644 --- a/__tests__/mutations.test.ts +++ b/__tests__/mutations.test.ts @@ -4,7 +4,8 @@ import { Timer, OutPort, InPort, - TimeValue + TimeValue, + IOPort } from "../src/core/internal"; class Source extends Reactor { @@ -189,3 +190,60 @@ describe("Creating reactors at runtime", function () { // }); // }); +describe("Test the result from refactor-canconnect: referencing ConnectablePort should not introduce any change in the causality graph", () => { + class InnocentReactor extends Reactor { + public inp = new InPort(this); + public outp = new OutPort(this); + public cip = new InPort(this); + public oip = new OutPort(this); + public child = new (class InnocentChild extends Reactor { + public oip = new OutPort(this); + constructor(parent: InnocentReactor) { + super(parent); + } + })(this); + + constructor(parent: TestApp) { + super(parent); + + this.addMutation( + [this.startup], + [ + this.inp, + this.writable(this.outp), + this.cip.asConnectable(), + this.oip.asConnectable(), + this.child.oip.asConnectable() + ], + function (this, a0, a1, a2, a3, a4) { + // This is current failing as we disallow direct feedthrough. Nevertheless I'm unsure why that is the case as of now? + // this.connect(a2, a3); + this.connect(a2, a4); + } + ); + } + } + + class TestApp extends App { + public child = new InnocentReactor(this); + constructor(done: () => void) { + super(undefined, undefined, undefined, () => { + const mut = this.child["_mutations"][1]; // M0 is the shutdown mutation; M1 is our mutation + const pg = this._getPrecedenceGraph(); + // child.oip should be an island in the causality graph + expect(pg.getUpstreamNeighbors(this.child.oip).size).toBe(0); + expect(pg.getDownstreamNeighbors(this.child.oip).size).toBe(0); + // The only dependency child.child.oip should have is child.cip + expect(pg.getUpstreamNeighbors(this.child.child.oip).size).toBe(1); + expect( + pg.getUpstreamNeighbors(this.child.child.oip).has(this.child.cip) + ).toBeTruthy(); + done(); + }); + } + } + test("test dependencies", (done) => { + const tapp = new TestApp(done); + tapp._start(); + }); +}); From bfa01562fa7d667c0afb4027b8f208458899e74e Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Wed, 13 Sep 2023 17:29:56 -0700 Subject: [PATCH 06/10] Added a bonus test for connection that reflects bugs in the current logic --- __tests__/connection.bonus.test.ts | 99 ++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 __tests__/connection.bonus.test.ts diff --git a/__tests__/connection.bonus.test.ts b/__tests__/connection.bonus.test.ts new file mode 100644 index 000000000..b63a71070 --- /dev/null +++ b/__tests__/connection.bonus.test.ts @@ -0,0 +1,99 @@ +import {App, InPort, OutPort, Reactor} from "../src/core/internal"; + +// Readers might wonder why this test case exist; +// This is mainly because in the past, we assume direct feedthrough is forbidden, and will not establish the connection if we try to do so. However, it is possible that such a thing happen without introducing any causality issue. +describe("Direct feedthrough without causality issue", () => { + class BigReactor extends App { + public children: SmallReactor; + + constructor() { + super(undefined, false, false); + this.children = new SmallReactor(this); + } + } + + class SmallReactor extends Reactor { + public inp: InPort; + public outp: OutPort; + + constructor(parent: Reactor) { + super(parent); + this.inp = new InPort(this); + this.outp = new OutPort(this); + this.addMutation( + [this.startup], + [this.inp, this.outp], + function (this, inp, outp) { + it("test", () => { + expect(this.getReactor().canConnect(inp, outp)).toBeFalsy(); + }); + } + ); + } + } + + const root = new BigReactor(); + root._start(); +}); + +describe("Causality loop that can't be detected by only checking local graph", () => { + class FeedThrougher extends Reactor { + public inp = new InPort(this); + public outp = new OutPort(this); + + constructor(parent: Reactor) { + super(parent); + this.addReaction( + [this.inp], + [this.inp, this.writable(this.outp)], + function (this, inp, outp) { + // nop troll + } + ); + } + } + + class Looper extends Reactor { + public leftChild = new FeedThrougher(this); + public rightChild = new FeedThrougher(this); + + public inp = new InPort(this); + public outp = new OutPort(this); + + constructor(parent: Reactor) { + super(parent); + this.addMutation( + [this.startup], + [ + this.inp.asConnectable(), + this.outp.asConnectable(), + this.leftChild.inp.asConnectable(), + this.leftChild.outp.asConnectable(), + this.rightChild.inp.asConnectable(), + this.rightChild.outp.asConnectable() + ], + function (this, inp, outp, inp_lc, outp_lc, inp_rc, outp_rc) { + this.connect(inp, inp_lc); + this.connect(outp_rc, outp); + } + ); + } + } + + class TestApp extends App { + public child = new Looper(this); + + constructor() { + super(undefined, undefined, undefined, () => {}); + this._connect(this.child.outp, this.child.inp); + it("Test a connection that would create zero delay loop cannot be made", () => { + expect( + this.canConnect(this.child.leftChild.outp, this.child.rightChild.inp) + ).toBeTruthy(); + }); + } + } + + const app = new TestApp(); + app._start(); +}); From 3248225815346865e0dc521c7cd7e17e0346357b Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Mon, 2 Oct 2023 14:38:36 -0700 Subject: [PATCH 07/10] Change CanConnect logic: remove blanket direct feedthrough ban, and always check against the global precedence graph. --- __tests__/InvalidMutations.ts | 5 ----- src/core/reactor.ts | 21 +++++++++++++++++++-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/__tests__/InvalidMutations.ts b/__tests__/InvalidMutations.ts index 2345eb393..977cad095 100644 --- a/__tests__/InvalidMutations.ts +++ b/__tests__/InvalidMutations.ts @@ -38,11 +38,6 @@ class R1 extends Reactor { [this.in1], [this.in1, this.in2, this.out1, this.out2], function (this, __in1, __in2, __out1, __out2) { - test("expect error on creating creating direct feed through", () => { - expect(() => { - this.connect(__in2.asConnectable(), __out2.asConnectable()); - }).toThrowError("New connection introduces direct feed through."); - }); test("expect error when creating connection outside container", () => { expect(() => { this.connect(__out2.asConnectable(), __in2.asConnectable()); diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 7fa4263cc..f96e8e022 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -1168,7 +1168,24 @@ export abstract class Reactor extends Component { return CanConnectResult.RT_CONNECTION_OUTSIDE_CONTAINER; } - // Take the local graph and merge in all the causality interfaces + /** + * TODO (axmmisaka): The following code is commented for multiple reasons: + * The causality interface check is not fully implemented so new checks are failing + * Second, direct feedthrough itself would not cause any problem *per se*. + * To ensure there is no cycle, the safest way is to check against the global dependency graph. + */ + + let app = this as Reactor; + while (app._getContainer() !== app) { + app = app._getContainer(); + } + const graph = app._getPrecedenceGraph(); + graph.addEdge(src, dst); + if (graph.hasCycle()) { + return CanConnectResult.RT_CYCLE; + } + + /* // Take the local graph and merge in all the causality interfaces // of contained reactors. Then: const graph = new PrecedenceGraph | Reaction>(); graph.addAll(this._dependencyGraph); @@ -1194,7 +1211,7 @@ export abstract class Reactor extends Component { dst.getContainer() === src.getContainer() ) { return CanConnectResult.RT_DIRECT_FEED_THROUGH; - } + } */ return CanConnectResult.SUCCESS; } From 9d367114d87618e05e327aa2002f657e57c4b38d Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Tue, 5 Sep 2023 21:07:50 -0700 Subject: [PATCH 08/10] Add necessary capabilities for reactors to create other reactors and make connections 0. Added _addChild and _addSibling necessary for mutation APIs 1. Solved issues related to hierarchy and ownership. --- src/core/reactor.ts | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/core/reactor.ts b/src/core/reactor.ts index f96e8e022..b20f30e55 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -166,6 +166,11 @@ export abstract class Reactor extends Component { */ private readonly _keyChain = new Map(); + // This is the keychain for creation, i.e. if Reactor R's mutation created reactor B, + // then R is B's creator, even if they are siblings. R should have access to B, + // at least semantically......? + private readonly _creatorKeyChain = new Map(); + /** * This graph has in it all the dependencies implied by this container's * ports, reactions, and connections. @@ -400,6 +405,9 @@ export abstract class Reactor extends Component { return owner._getKey(component, this._keyChain.get(owner)); } } + return component + .getContainer() + ._getKey(component, this._creatorKeyChain.get(component.getContainer())); } /** @@ -1608,6 +1616,36 @@ export abstract class Reactor extends Component { toString(): string { return this._getFullyQualifiedName(); } + + protected _addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + const newReactor = new constructor(this, ...args); + return newReactor; + } + + protected _addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + if (this._getContainer() == null) { + throw new Error( + `Reactor ${this} does not have a parent. Sibling is not well-defined.` + ); + } + if (this._getContainer() === this) { + throw new Error( + `Reactor ${this} is self-contained. Adding sibling creates logical issue.` + ); + } + const newReactor = this._getContainer()._addChild( + constructor, + ...args + ); + this._creatorKeyChain.set(newReactor, newReactor._key); + return newReactor; + } } /* From bd2290b76256043f896eaf16ef01bc5ae1dffc25 Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Tue, 5 Sep 2023 21:26:24 -0700 Subject: [PATCH 09/10] Mutation Sandbox API --- src/core/reactor.ts | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/core/reactor.ts b/src/core/reactor.ts index b20f30e55..765b9117b 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -503,6 +503,20 @@ export abstract class Reactor extends Component { public delete(reactor: Reactor): void { reactor._delete(); } + + public addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addChild(constructor, ...args); + } + + public addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addSibling(constructor, ...args); + } }; /** @@ -1639,10 +1653,7 @@ export abstract class Reactor extends Component { `Reactor ${this} is self-contained. Adding sibling creates logical issue.` ); } - const newReactor = this._getContainer()._addChild( - constructor, - ...args - ); + const newReactor = this._getContainer()._addChild(constructor, ...args); this._creatorKeyChain.set(newReactor, newReactor._key); return newReactor; } @@ -1889,6 +1900,16 @@ export interface MutationSandbox extends ReactionSandbox { getReactor: () => Reactor; // Container + addChild: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + + addSibling: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + // FIXME: // forkJoin(constructor: new () => Reactor, ): void; } From bdfd1a32a9d0d88b22c74489333f90d3980c0d41 Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Wed, 18 Oct 2023 23:18:15 -0700 Subject: [PATCH 10/10] Added mutation tests, unsure if they are correct --- __tests__/mutations.test.ts | 98 +++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/__tests__/mutations.test.ts b/__tests__/mutations.test.ts index 59645c245..93f64af7a 100644 --- a/__tests__/mutations.test.ts +++ b/__tests__/mutations.test.ts @@ -247,3 +247,101 @@ describe("Test the result from refactor-canconnect: referencing ConnectablePort tapp._start(); }); }); + +/* ++-----------+ +------------------------+ +| Useless | | +--------------+ | +| Reactor |------>--->| Mutation 1 | | +| | | +--------------+ | ++-----------+ | Mutation | | + | Holder \ | + | +--------v-----+ | + | | Mutation 2 | | + | +--------------+ | + +------------------------+ +*/ + +describe("Mutation phase 1: a mutation should not be able to make connection that directly lays on their upstream", () => { + class MutationHolder extends Reactor { + public inport: InPort; + public mut1inport: InPort; + public mut2inport: InPort; + public trigger: InPort; + + constructor(parent: Reactor) { + super(parent); + + this.inport = new InPort(this); + this.mut1inport = new InPort(this); + this.mut2inport = new InPort(this); + this.trigger = new InPort(this); + + // M1 + this.addMutation( + [this.trigger, this.mut1inport], + [ + this.inport.asConnectable(), + this.mut1inport.asConnectable(), + this.mut2inport.asConnectable() + ], + function (this, inp, m1, m2) { + expect(() => { + this.connect(inp, m1); + }).toThrow(); + expect(() => { + this.connect(inp, m2); + }).toReturn(); + } + ); + + // M2 + this.addMutation( + [this.trigger, this.mut2inport], + [ + this.inport.asConnectable(), + this.mut1inport.asConnectable(), + this.mut2inport.asConnectable() + ], + function (this, inp, m1, m2) { + expect(() => { + this.connect(inp, m1); + }).toThrow(); + expect(() => { + this.connect(inp, m2); + }).toThrow(); + } + ); + } + } + + class TestApp extends App { + public child = new MutationHolder(this); + public trigger = new OutPort(this); + + constructor(done: () => void) { + super(undefined, undefined, undefined, () => { + done(); + }); + + this.addMutation( + [this.startup], + [this.trigger.asConnectable(), this.child.trigger.asConnectable()], + function (this, myTrigger, childTrigger) { + this.connect(myTrigger, childTrigger); + } + ); + this.addReaction( + [this.startup], + [this.writable(this.trigger)], + function (this, trigger) { + trigger.set(true); + } + ); + } + } + + test("test mutation", (done) => { + const tapp = new TestApp(done); + tapp._start(); + }); +});