Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(compiler): Fix topology for adjacent ons [LNG-257] #929

Merged
merged 10 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion integration-tests/aqua/examples/topology.aqua
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
aqua Toplogy
InversionSpaces marked this conversation as resolved.
Show resolved Hide resolved

export Testo, LocalPrint, topologyTest, topologyBug205, topologyBug394, topologyBug427, topologyBug257

import "@fluencelabs/aqua-lib/builtin.aqua"

service Testo("testo"):
Expand Down Expand Up @@ -50,4 +54,23 @@ func topologyBug427(peers: []string) -> []string:
results <- Opop.identity("some string")

join results[1]
<- results
<- results

service StrOp("op"):
identity(str: string) -> string

func idOnPeer(friend: string, friendRelay: string, str: string) -> string:
on friend via friendRelay:
result <- StrOp.identity(str)

<- result

func topologyBug257(friend: string, friendRelay: string) -> []string:
result: *string

on HOST_PEER_ID:
result <- StrOp.identity("host")
result <- idOnPeer(friend, friendRelay, "friend")
result <- idOnPeer(INIT_PEER_ID, HOST_PEER_ID, "init")

<- result
6 changes: 6 additions & 0 deletions integration-tests/src/__test__/examples.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
topologyBug205Call,
topologyBug394Call,
topologyBug427Call,
topologyBug257Call,
topologyCall,
} from "../examples/topologyCall.js";
import { foldJoinCall } from "../examples/foldJoinCall.js";
Expand Down Expand Up @@ -896,6 +897,11 @@ describe("Testing examples", () => {
expect(topologyResult).toEqual(selfPeerId);
});

it("topology.aqua bug 257", async () => {
let result = await topologyBug257Call(peer2);
expect(result).toEqual(["host", "friend", "init"]);
});

it("foldJoin.aqua", async () => {
let foldJoinResult = await foldJoinCall(relayPeerId1);
expect(foldJoinResult.length).toBeGreaterThanOrEqual(3);
Expand Down
7 changes: 7 additions & 0 deletions integration-tests/src/examples/topologyCall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
topologyBug205,
topologyBug394,
topologyBug427,
topologyBug257,
} from "../compiled/examples/topology.js";

export async function topologyBug394Call(
Expand Down Expand Up @@ -66,3 +67,9 @@ export async function topologyCall(
},
);
}

export async function topologyBug257Call(
peer2: IFluenceClient,
): Promise<string[]> {
return await topologyBug257(peer2.getPeerId(), peer2.getRelayPeerId());
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ abstract class ChainCursor[C <: ChainCursor[C, T], T](make: NonEmptyList[ChainZi

def toPrevSibling: Option[C] = tree.head.moveLeft.map(p => make(tree.copy(p)))

def nextSiblings: LazyList[C] =
LazyList.unfold(this)(_.toNextSibling.map(c => c -> c))

def allToLeft: LazyList[C] =
LazyList.unfold(this)(_.moveLeft.map(c => c -> c))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ case class OpModelTreeCursor(
override lazy val toNextSibling: Option[OpModelTreeCursor] =
super.toNextSibling.map(_.copy(cachedParent = cachedParent))

override lazy val nextSiblings: LazyList[OpModelTreeCursor] =
super.nextSiblings.map(_.copy(cachedParent = cachedParent))

override def moveDown(focusOn: ChainZipper[OpModel.Tree]): OpModelTreeCursor =
super.moveDown(focusOn).copy(cachedParent = Some(this))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ object PathFinder extends Logging {
)

// TODO: Is it always correct to do so?
toOn.peerId.fold(path)(p => path :+ p)
toOn.peerId
.filterNot(path.lastOption.contains)
.fold(path)(path :+ _)
}

private def findPath(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import aqua.types.{ArrayType, BoxType, CanonStreamType, ScalarType, StreamType}

import cats.Eval
import cats.data.Chain.{==:, nil}
import cats.data.OptionT
import cats.data.{Chain, NonEmptyChain, NonEmptyList, OptionT}
import cats.free.Cofree
import cats.syntax.traverse.*
Expand Down Expand Up @@ -93,20 +94,36 @@ case class Topology private (
)
.memoize

// Find path of first `ForceExecModel` (call, canon, join) in this subtree
// Find path of first `ForceExecModel` (call, canon) in this subtree
lazy val firstExecutesOn: Eval[Option[TopologyPath]] =
(cursor.op match {
case _: ForceExecModel => pathOn.map(_.some)
case _ => children.collectFirstSomeM(_.firstExecutesOn)
}).memoize

// Find path of last `ForceExecModel` (call, canon, join) in this subtree
// Find path of last `ForceExecModel` (call, canon) in this subtree
lazy val lastExecutesOn: Eval[Option[TopologyPath]] =
(cursor.op match {
case _: ForceExecModel => pathOn.map(_.some)
case _ => children.reverse.collectFirstSomeM(_.lastExecutesOn)
}).memoize

// Find path of first `ForceExecModel` (call, canon) to right of this subtree
lazy val nextExecutesOn: Eval[Option[TopologyPath]] =
parent
.flatTraverse(p =>
p.cursor.op match {
case _: SeqGroupModel =>
OptionT(
cursor.nextSiblings.collectFirstSomeM(
_.topology.firstExecutesOn
)
).orElseF(p.nextExecutesOn).value
case _ =>
p.nextExecutesOn
}
)

lazy val currentPeerId: Option[ValueModel] = pathOn.value.peerId

// Path of current relay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,19 @@ trait After {
case ExitStrategy.ToRelay =>
(current.endsOn, current.relayOn).mapN(PathFinder.findPathEnforce)
case ExitStrategy.Full =>
(current.endsOn, current.afterOn).mapN(PathFinder.findPath)
(current.endsOn, current.afterOn, current.nextExecutesOn).mapN {
// This is an important optimization:
// If next execution forcing node is at the same path as
// `afterOn` of this node, then leave the last
// hop for it to handle
case (ends, after, next) if next.forall(_ == after) =>
PathFinder.findPath(ends, after)
// Otherwise, force return to relay.
// Returning to `after` would generate unnecessary hops,
// but is it always correct to return to relay here?
case (ends, after, _) =>
PathFinder.findPathEnforce(ends, after.toRelay)
}
}

// If exit is forced, make a path outside this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,57 +1196,103 @@ class TopologySpec extends AnyFlatSpec with Matchers {
proc.equalsOrShowDiff(expected) shouldEqual true
}

it should "handle error rethrow for sequential `on`" in {
val model = OnModel(initPeer, Chain.one(relay)).wrap(
SeqModel.wrap(
callModel(1),
onRethrowModel(otherPeerL, otherRelay)(
callModel(2)
),
onRethrowModel(otherPeer2, otherRelay2)(
callModel(3)
it should "handle sequential `on`" in {
def test(
peer1: ValueRaw,
relay1: ValueRaw,
peer2: ValueRaw,
relay2: ValueRaw
) = {

val model = OnModel(initPeer, Chain.one(relay)).wrap(
SeqModel.wrap(
callModel(1),
onRethrowModel(peer1, relay1)(
callModel(2)
),
onRethrowModel(peer2, relay2)(
callModel(3)
)
)
)
)

val proc = Topology.resolve(model).value
val proc = Topology.resolve(model).value

val expected = SeqRes.wrap(
callRes(1, initPeer),
XorRes.wrap(
SeqRes.wrap(
through(relay),
through(otherRelay),
callRes(2, otherPeerL),
through(otherRelay),
through(relay)
),
SeqRes.wrap(
through(otherRelay),
through(relay),
through(initPeer),
failErrorRes
)
),
XorRes.wrap(
SeqRes.wrap(
through(relay),
through(otherRelay2),
callRes(3, otherPeer2)
),
SeqRes.wrap(
through(otherRelay2),
through(relay),
through(initPeer),
failErrorRes
)
val firstOnRes =
// If the first `on` is `on INIT_PEER_ID via HOST_PEER_ID`
if (peer1 == initPeer && relay1 == relay)
XorRes.wrap(
SeqRes.wrap(
callRes(2, peer1),
through(relay1)
),
failErrorRes
)
else
XorRes.wrap(
SeqRes.wrap(
through(relay),
through(relay1),
callRes(2, peer1),
through(relay1),
through(relay) // TODO: LNG-259
),
SeqRes.wrap(
through(relay1),
through(relay),
through(initPeer),
failErrorRes
)
)

val secondOnRes =
// If the second `on` is `on INIT_PEER_ID via HOST_PEER_ID`
if (peer2 == initPeer && relay2 == relay)
XorRes.wrap(
callRes(3, peer2),
failErrorRes
)
else
XorRes.wrap(
SeqRes.wrap(
through(relay), // TODO: LNG-259
through(relay2),
callRes(3, peer2)
),
SeqRes.wrap(
through(relay2),
through(relay),
through(initPeer),
failErrorRes
)
)

val expected = SeqRes.wrap(
callRes(1, initPeer),
firstOnRes,
secondOnRes
)

proc.equalsOrShowDiff(expected) shouldEqual true
}

val peerRelays = List(
(initPeer, relay),
(otherPeerN(0), otherRelayN(0)),
(otherPeerN(1), otherRelayN(1))
)

proc.equalsOrShowDiff(expected) shouldEqual true
for {
first <- peerRelays
second <- peerRelays
// Skip identical `on`s
if first != second
(p1, r1) = first
(p2, r2) = second
} test(p1, r1, p2, r2)
}

it should "handle error rethrow for sequential `on` without `via`" in {
it should "handle sequential `on` without `via`" in {
val model = OnModel(initPeer, Chain.one(relay)).wrap(
SeqModel.wrap(
callModel(1),
Expand Down
Loading