Skip to content

Commit

Permalink
Merge branch 'main' into feat/tracing-error-exit-LNG-250
Browse files Browse the repository at this point in the history
  • Loading branch information
InversionSpaces authored Oct 17, 2023
2 parents 5abd15d + ba15d9e commit fe60ce4
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 47 deletions.
25 changes: 24 additions & 1 deletion integration-tests/aqua/examples/topology.aqua
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
aqua Topology

export Testo, LocalPrint, topologyTest, topologyBug205, topologyBug394, topologyBug427, topologyBug257

import "@fluencelabs/aqua-lib/builtin.aqua"

service Testo("testo"):
Expand Down Expand Up @@ -50,4 +54,23 @@ func topologyBug427(peers: []string) -> []string:
results <- Opop.identity("some string")

join results[1]
<- results
<- results

service StrOp("op"):
identity(str: string) -> string

func idOnPeer(friend: string, friendRelay: string, str: string) -> string:
on friend via friendRelay:
result <- StrOp.identity(str)

<- result

func topologyBug257(friend: string, friendRelay: string) -> []string:
result: *string

on HOST_PEER_ID:
result <- StrOp.identity("host")
result <- idOnPeer(friend, friendRelay, "friend")
result <- idOnPeer(INIT_PEER_ID, HOST_PEER_ID, "init")

<- result
6 changes: 6 additions & 0 deletions integration-tests/src/__test__/examples.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
topologyBug205Call,
topologyBug394Call,
topologyBug427Call,
topologyBug257Call,
topologyCall,
} from "../examples/topologyCall.js";
import { foldJoinCall } from "../examples/foldJoinCall.js";
Expand Down Expand Up @@ -896,6 +897,11 @@ describe("Testing examples", () => {
expect(topologyResult).toEqual(selfPeerId);
});

it("topology.aqua bug 257", async () => {
let result = await topologyBug257Call(peer2);
expect(result).toEqual(["host", "friend", "init"]);
});

it("foldJoin.aqua", async () => {
let foldJoinResult = await foldJoinCall(relayPeerId1);
expect(foldJoinResult.length).toBeGreaterThanOrEqual(3);
Expand Down
7 changes: 7 additions & 0 deletions integration-tests/src/examples/topologyCall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
topologyBug205,
topologyBug394,
topologyBug427,
topologyBug257,
} from "../compiled/examples/topology.js";

export async function topologyBug394Call(
Expand Down Expand Up @@ -66,3 +67,9 @@ export async function topologyCall(
},
);
}

export async function topologyBug257Call(
peer2: IFluenceClient,
): Promise<string[]> {
return await topologyBug257(peer2.getPeerId(), peer2.getRelayPeerId());
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ abstract class ChainCursor[C <: ChainCursor[C, T], T](make: NonEmptyList[ChainZi

def toPrevSibling: Option[C] = tree.head.moveLeft.map(p => make(tree.copy(p)))

def nextSiblings: LazyList[C] =
LazyList.unfold(this)(_.toNextSibling.map(c => c -> c))

def allToLeft: LazyList[C] =
LazyList.unfold(this)(_.moveLeft.map(c => c -> c))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ case class OpModelTreeCursor(
override lazy val toNextSibling: Option[OpModelTreeCursor] =
super.toNextSibling.map(_.copy(cachedParent = cachedParent))

override lazy val nextSiblings: LazyList[OpModelTreeCursor] =
super.nextSiblings.map(_.copy(cachedParent = cachedParent))

override def moveDown(focusOn: ChainZipper[OpModel.Tree]): OpModelTreeCursor =
super.moveDown(focusOn).copy(cachedParent = Some(this))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ object PathFinder extends Logging {
)

// TODO: Is it always correct to do so?
toOn.peerId.fold(path)(p => path :+ p)
toOn.peerId
.filterNot(path.lastOption.contains)
.fold(path)(path :+ _)
}

private def findPath(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import aqua.types.{ArrayType, BoxType, CanonStreamType, ScalarType, StreamType}

import cats.Eval
import cats.data.Chain.{==:, nil}
import cats.data.OptionT
import cats.data.{Chain, NonEmptyChain, NonEmptyList, OptionT}
import cats.free.Cofree
import cats.syntax.traverse.*
Expand Down Expand Up @@ -93,20 +94,36 @@ case class Topology private (
)
.memoize

// Find path of first `ForceExecModel` (call, canon, join) in this subtree
// Find path of first `ForceExecModel` (call, canon) in this subtree
lazy val firstExecutesOn: Eval[Option[TopologyPath]] =
(cursor.op match {
case _: ForceExecModel => pathOn.map(_.some)
case _ => children.collectFirstSomeM(_.firstExecutesOn)
}).memoize

// Find path of last `ForceExecModel` (call, canon, join) in this subtree
// Find path of last `ForceExecModel` (call, canon) in this subtree
lazy val lastExecutesOn: Eval[Option[TopologyPath]] =
(cursor.op match {
case _: ForceExecModel => pathOn.map(_.some)
case _ => children.reverse.collectFirstSomeM(_.lastExecutesOn)
}).memoize

// Find path of first `ForceExecModel` (call, canon) to right of this subtree
lazy val nextExecutesOn: Eval[Option[TopologyPath]] =
parent
.flatTraverse(p =>
p.cursor.op match {
case _: SeqGroupModel =>
OptionT(
cursor.nextSiblings.collectFirstSomeM(
_.topology.firstExecutesOn
)
).orElseF(p.nextExecutesOn).value
case _ =>
p.nextExecutesOn
}
)

lazy val currentPeerId: Option[ValueModel] = pathOn.value.peerId

// Path of current relay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,19 @@ trait After {
case ExitStrategy.ToRelay =>
(current.endsOn, current.relayOn).mapN(PathFinder.findPathEnforce)
case ExitStrategy.Full =>
(current.endsOn, current.afterOn).mapN(PathFinder.findPath)
(current.endsOn, current.afterOn, current.nextExecutesOn).mapN {
// This is an important optimization:
// If next execution forcing node is at the same path as
// `afterOn` of this node, then leave the last
// hop for it to handle
case (ends, after, next) if next.forall(_ == after) =>
PathFinder.findPath(ends, after)
// Otherwise, force return to relay.
// Returning to `after` would generate unnecessary hops,
// but is it always correct to return to relay here?
case (ends, after, _) =>
PathFinder.findPathEnforce(ends, after.toRelay)
}
}

// If exit is forced, make a path outside this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,57 +1196,103 @@ class TopologySpec extends AnyFlatSpec with Matchers {
proc.equalsOrShowDiff(expected) shouldEqual true
}

it should "handle error rethrow for sequential `on`" in {
val model = OnModel(initPeer, Chain.one(relay)).wrap(
SeqModel.wrap(
callModel(1),
onRethrowModel(otherPeerL, otherRelay)(
callModel(2)
),
onRethrowModel(otherPeer2, otherRelay2)(
callModel(3)
it should "handle sequential `on`" in {
def test(
peer1: ValueRaw,
relay1: ValueRaw,
peer2: ValueRaw,
relay2: ValueRaw
) = {

val model = OnModel(initPeer, Chain.one(relay)).wrap(
SeqModel.wrap(
callModel(1),
onRethrowModel(peer1, relay1)(
callModel(2)
),
onRethrowModel(peer2, relay2)(
callModel(3)
)
)
)
)

val proc = Topology.resolve(model).value
val proc = Topology.resolve(model).value

val expected = SeqRes.wrap(
callRes(1, initPeer),
XorRes.wrap(
SeqRes.wrap(
through(relay),
through(otherRelay),
callRes(2, otherPeerL),
through(otherRelay),
through(relay)
),
SeqRes.wrap(
through(otherRelay),
through(relay),
through(initPeer),
failErrorRes
)
),
XorRes.wrap(
SeqRes.wrap(
through(relay),
through(otherRelay2),
callRes(3, otherPeer2)
),
SeqRes.wrap(
through(otherRelay2),
through(relay),
through(initPeer),
failErrorRes
)
val firstOnRes =
// If the first `on` is `on INIT_PEER_ID via HOST_PEER_ID`
if (peer1 == initPeer && relay1 == relay)
XorRes.wrap(
SeqRes.wrap(
callRes(2, peer1),
through(relay1)
),
failErrorRes
)
else
XorRes.wrap(
SeqRes.wrap(
through(relay),
through(relay1),
callRes(2, peer1),
through(relay1),
through(relay) // TODO: LNG-259
),
SeqRes.wrap(
through(relay1),
through(relay),
through(initPeer),
failErrorRes
)
)

val secondOnRes =
// If the second `on` is `on INIT_PEER_ID via HOST_PEER_ID`
if (peer2 == initPeer && relay2 == relay)
XorRes.wrap(
callRes(3, peer2),
failErrorRes
)
else
XorRes.wrap(
SeqRes.wrap(
through(relay), // TODO: LNG-259
through(relay2),
callRes(3, peer2)
),
SeqRes.wrap(
through(relay2),
through(relay),
through(initPeer),
failErrorRes
)
)

val expected = SeqRes.wrap(
callRes(1, initPeer),
firstOnRes,
secondOnRes
)

proc.equalsOrShowDiff(expected) shouldEqual true
}

val peerRelays = List(
(initPeer, relay),
(otherPeerN(0), otherRelayN(0)),
(otherPeerN(1), otherRelayN(1))
)

proc.equalsOrShowDiff(expected) shouldEqual true
for {
first <- peerRelays
second <- peerRelays
// Skip identical `on`s
if first != second
(p1, r1) = first
(p2, r2) = second
} test(p1, r1, p2, r2)
}

it should "handle error rethrow for sequential `on` without `via`" in {
it should "handle sequential `on` without `via`" in {
val model = OnModel(initPeer, Chain.one(relay)).wrap(
SeqModel.wrap(
callModel(1),
Expand Down

0 comments on commit fe60ce4

Please sign in to comment.