mirror of
https://github.com/fluencelabs/aqua.git
synced 2024-12-04 22:50:18 +00:00
fix(compiler): Fix topology for adjacent on
s [LNG-257] (#929)
* Always enforce path
* Revert "Always enforce path"
This reverts commit 5ee1e37c9e
.
* Add integration test
* Force return to relay
* Remove unused import
* Add comment
* Add unit test
* Add TODO
* Remove println
* Fix typo
This commit is contained in:
parent
f39bca928a
commit
ba15d9e06a
@ -1,3 +1,7 @@
|
||||
aqua Topology
|
||||
|
||||
export Testo, LocalPrint, topologyTest, topologyBug205, topologyBug394, topologyBug427, topologyBug257
|
||||
|
||||
import "@fluencelabs/aqua-lib/builtin.aqua"
|
||||
|
||||
service Testo("testo"):
|
||||
@ -51,3 +55,22 @@ func topologyBug427(peers: []string) -> []string:
|
||||
|
||||
join results[1]
|
||||
<- results
|
||||
|
||||
service StrOp("op"):
|
||||
identity(str: string) -> string
|
||||
|
||||
func idOnPeer(friend: string, friendRelay: string, str: string) -> string:
|
||||
on friend via friendRelay:
|
||||
result <- StrOp.identity(str)
|
||||
|
||||
<- result
|
||||
|
||||
func topologyBug257(friend: string, friendRelay: string) -> []string:
|
||||
result: *string
|
||||
|
||||
on HOST_PEER_ID:
|
||||
result <- StrOp.identity("host")
|
||||
result <- idOnPeer(friend, friendRelay, "friend")
|
||||
result <- idOnPeer(INIT_PEER_ID, HOST_PEER_ID, "init")
|
||||
|
||||
<- result
|
@ -51,6 +51,7 @@ import {
|
||||
topologyBug205Call,
|
||||
topologyBug394Call,
|
||||
topologyBug427Call,
|
||||
topologyBug257Call,
|
||||
topologyCall,
|
||||
} from "../examples/topologyCall.js";
|
||||
import { foldJoinCall } from "../examples/foldJoinCall.js";
|
||||
@ -896,6 +897,11 @@ describe("Testing examples", () => {
|
||||
expect(topologyResult).toEqual(selfPeerId);
|
||||
});
|
||||
|
||||
it("topology.aqua bug 257", async () => {
|
||||
let result = await topologyBug257Call(peer2);
|
||||
expect(result).toEqual(["host", "friend", "init"]);
|
||||
});
|
||||
|
||||
it("foldJoin.aqua", async () => {
|
||||
let foldJoinResult = await foldJoinCall(relayPeerId1);
|
||||
expect(foldJoinResult.length).toBeGreaterThanOrEqual(3);
|
||||
|
@ -6,6 +6,7 @@ import {
|
||||
topologyBug205,
|
||||
topologyBug394,
|
||||
topologyBug427,
|
||||
topologyBug257,
|
||||
} from "../compiled/examples/topology.js";
|
||||
|
||||
export async function topologyBug394Call(
|
||||
@ -66,3 +67,9 @@ export async function topologyCall(
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
export async function topologyBug257Call(
|
||||
peer2: IFluenceClient,
|
||||
): Promise<string[]> {
|
||||
return await topologyBug257(peer2.getPeerId(), peer2.getRelayPeerId());
|
||||
}
|
||||
|
@ -58,6 +58,9 @@ abstract class ChainCursor[C <: ChainCursor[C, T], T](make: NonEmptyList[ChainZi
|
||||
|
||||
def toPrevSibling: Option[C] = tree.head.moveLeft.map(p => make(tree.copy(p)))
|
||||
|
||||
def nextSiblings: LazyList[C] =
|
||||
LazyList.unfold(this)(_.toNextSibling.map(c => c -> c))
|
||||
|
||||
def allToLeft: LazyList[C] =
|
||||
LazyList.unfold(this)(_.moveLeft.map(c => c -> c))
|
||||
|
||||
|
@ -29,6 +29,9 @@ case class OpModelTreeCursor(
|
||||
override lazy val toNextSibling: Option[OpModelTreeCursor] =
|
||||
super.toNextSibling.map(_.copy(cachedParent = cachedParent))
|
||||
|
||||
override lazy val nextSiblings: LazyList[OpModelTreeCursor] =
|
||||
super.nextSiblings.map(_.copy(cachedParent = cachedParent))
|
||||
|
||||
override def moveDown(focusOn: ChainZipper[OpModel.Tree]): OpModelTreeCursor =
|
||||
super.moveDown(focusOn).copy(cachedParent = Some(this))
|
||||
|
||||
|
@ -45,7 +45,9 @@ object PathFinder extends Logging {
|
||||
)
|
||||
|
||||
// TODO: Is it always correct to do so?
|
||||
toOn.peerId.fold(path)(p => path :+ p)
|
||||
toOn.peerId
|
||||
.filterNot(path.lastOption.contains)
|
||||
.fold(path)(path :+ _)
|
||||
}
|
||||
|
||||
private def findPath(
|
||||
|
@ -11,6 +11,7 @@ import aqua.types.{ArrayType, BoxType, CanonStreamType, ScalarType, StreamType}
|
||||
|
||||
import cats.Eval
|
||||
import cats.data.Chain.{==:, nil}
|
||||
import cats.data.OptionT
|
||||
import cats.data.{Chain, NonEmptyChain, NonEmptyList, OptionT}
|
||||
import cats.free.Cofree
|
||||
import cats.syntax.traverse.*
|
||||
@ -93,20 +94,36 @@ case class Topology private (
|
||||
)
|
||||
.memoize
|
||||
|
||||
// Find path of first `ForceExecModel` (call, canon, join) in this subtree
|
||||
// Find path of first `ForceExecModel` (call, canon) in this subtree
|
||||
lazy val firstExecutesOn: Eval[Option[TopologyPath]] =
|
||||
(cursor.op match {
|
||||
case _: ForceExecModel => pathOn.map(_.some)
|
||||
case _ => children.collectFirstSomeM(_.firstExecutesOn)
|
||||
}).memoize
|
||||
|
||||
// Find path of last `ForceExecModel` (call, canon, join) in this subtree
|
||||
// Find path of last `ForceExecModel` (call, canon) in this subtree
|
||||
lazy val lastExecutesOn: Eval[Option[TopologyPath]] =
|
||||
(cursor.op match {
|
||||
case _: ForceExecModel => pathOn.map(_.some)
|
||||
case _ => children.reverse.collectFirstSomeM(_.lastExecutesOn)
|
||||
}).memoize
|
||||
|
||||
// Find path of first `ForceExecModel` (call, canon) to right of this subtree
|
||||
lazy val nextExecutesOn: Eval[Option[TopologyPath]] =
|
||||
parent
|
||||
.flatTraverse(p =>
|
||||
p.cursor.op match {
|
||||
case _: SeqGroupModel =>
|
||||
OptionT(
|
||||
cursor.nextSiblings.collectFirstSomeM(
|
||||
_.topology.firstExecutesOn
|
||||
)
|
||||
).orElseF(p.nextExecutesOn).value
|
||||
case _ =>
|
||||
p.nextExecutesOn
|
||||
}
|
||||
)
|
||||
|
||||
lazy val currentPeerId: Option[ValueModel] = pathOn.value.peerId
|
||||
|
||||
// Path of current relay
|
||||
|
@ -41,7 +41,19 @@ trait After {
|
||||
case ExitStrategy.ToRelay =>
|
||||
(current.endsOn, current.relayOn).mapN(PathFinder.findPathEnforce)
|
||||
case ExitStrategy.Full =>
|
||||
(current.endsOn, current.afterOn).mapN(PathFinder.findPath)
|
||||
(current.endsOn, current.afterOn, current.nextExecutesOn).mapN {
|
||||
// This is an important optimization:
|
||||
// If next execution forcing node is at the same path as
|
||||
// `afterOn` of this node, then leave the last
|
||||
// hop for it to handle
|
||||
case (ends, after, next) if next.forall(_ == after) =>
|
||||
PathFinder.findPath(ends, after)
|
||||
// Otherwise, force return to relay.
|
||||
// Returning to `after` would generate unnecessary hops,
|
||||
// but is it always correct to return to relay here?
|
||||
case (ends, after, _) =>
|
||||
PathFinder.findPathEnforce(ends, after.toRelay)
|
||||
}
|
||||
}
|
||||
|
||||
// If exit is forced, make a path outside this node
|
||||
|
@ -1196,14 +1196,21 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
proc.equalsOrShowDiff(expected) shouldEqual true
|
||||
}
|
||||
|
||||
it should "handle error rethrow for sequential `on`" in {
|
||||
it should "handle sequential `on`" in {
|
||||
def test(
|
||||
peer1: ValueRaw,
|
||||
relay1: ValueRaw,
|
||||
peer2: ValueRaw,
|
||||
relay2: ValueRaw
|
||||
) = {
|
||||
|
||||
val model = OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
SeqModel.wrap(
|
||||
callModel(1),
|
||||
onRethrowModel(otherPeerL, otherRelay)(
|
||||
onRethrowModel(peer1, relay1)(
|
||||
callModel(2)
|
||||
),
|
||||
onRethrowModel(otherPeer2, otherRelay2)(
|
||||
onRethrowModel(peer2, relay2)(
|
||||
callModel(3)
|
||||
)
|
||||
)
|
||||
@ -1211,42 +1218,81 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
val proc = Topology.resolve(model).value
|
||||
|
||||
val firstOnRes =
|
||||
// If the first `on` is `on INIT_PEER_ID via HOST_PEER_ID`
|
||||
if (peer1 == initPeer && relay1 == relay)
|
||||
XorRes.wrap(
|
||||
SeqRes.wrap(
|
||||
callRes(2, peer1),
|
||||
through(relay1)
|
||||
),
|
||||
failErrorRes
|
||||
)
|
||||
else
|
||||
XorRes.wrap(
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
through(relay1),
|
||||
callRes(2, peer1),
|
||||
through(relay1),
|
||||
through(relay) // TODO: LNG-259
|
||||
),
|
||||
SeqRes.wrap(
|
||||
through(relay1),
|
||||
through(relay),
|
||||
through(initPeer),
|
||||
failErrorRes
|
||||
)
|
||||
)
|
||||
|
||||
val secondOnRes =
|
||||
// If the second `on` is `on INIT_PEER_ID via HOST_PEER_ID`
|
||||
if (peer2 == initPeer && relay2 == relay)
|
||||
XorRes.wrap(
|
||||
callRes(3, peer2),
|
||||
failErrorRes
|
||||
)
|
||||
else
|
||||
XorRes.wrap(
|
||||
SeqRes.wrap(
|
||||
through(relay), // TODO: LNG-259
|
||||
through(relay2),
|
||||
callRes(3, peer2)
|
||||
),
|
||||
SeqRes.wrap(
|
||||
through(relay2),
|
||||
through(relay),
|
||||
through(initPeer),
|
||||
failErrorRes
|
||||
)
|
||||
)
|
||||
|
||||
val expected = SeqRes.wrap(
|
||||
callRes(1, initPeer),
|
||||
XorRes.wrap(
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
through(otherRelay),
|
||||
callRes(2, otherPeerL),
|
||||
through(otherRelay),
|
||||
through(relay)
|
||||
),
|
||||
SeqRes.wrap(
|
||||
through(otherRelay),
|
||||
through(relay),
|
||||
through(initPeer),
|
||||
failErrorRes
|
||||
)
|
||||
),
|
||||
XorRes.wrap(
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
through(otherRelay2),
|
||||
callRes(3, otherPeer2)
|
||||
),
|
||||
SeqRes.wrap(
|
||||
through(otherRelay2),
|
||||
through(relay),
|
||||
through(initPeer),
|
||||
failErrorRes
|
||||
)
|
||||
)
|
||||
firstOnRes,
|
||||
secondOnRes
|
||||
)
|
||||
|
||||
proc.equalsOrShowDiff(expected) shouldEqual true
|
||||
}
|
||||
|
||||
it should "handle error rethrow for sequential `on` without `via`" in {
|
||||
val peerRelays = List(
|
||||
(initPeer, relay),
|
||||
(otherPeerN(0), otherRelayN(0)),
|
||||
(otherPeerN(1), otherRelayN(1))
|
||||
)
|
||||
|
||||
for {
|
||||
first <- peerRelays
|
||||
second <- peerRelays
|
||||
// Skip identical `on`s
|
||||
if first != second
|
||||
(p1, r1) = first
|
||||
(p2, r2) = second
|
||||
} test(p1, r1, p2, r2)
|
||||
}
|
||||
|
||||
it should "handle sequential `on` without `via`" in {
|
||||
val model = OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
SeqModel.wrap(
|
||||
callModel(1),
|
||||
|
Loading…
Reference in New Issue
Block a user