mirror of
https://github.com/fluencelabs/aqua.git
synced 2024-12-04 14:40:17 +00:00
Topology refactoring & fixes (#371)
* Topology optimization for Folds * cache RawCursor's parent for better performance * Tests fixed * wip * wip * Use the new Topology to find paths * Compile bug fixed * Old tests works * One more fixed test * Move before seq next * One more fixed test * Bugfix * Disabled debug output * maybe fix? * maybe fix? * Topology wip * Maybe fix * Maybe fix * Fold optimization * Root topology * Respect forceExit in endsOn * better caching * better caching * Root afterOn = const * XorGroup endsOn should break * no EndsOn for Root * Maybe better? * Uncycling * Eval * Respect ParTag.Detach * Detach test * Detach test failing * Detach test fixed * Go to relay via relay * Fixes #380 * Increment Aqua version to 0.5.2 * Add image to transform readme, update dependencies * Review fixes * Updated Scala version in the release flow
This commit is contained in:
parent
afbdf69438
commit
22778914ca
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@ -51,8 +51,8 @@ jobs:
|
||||
|
||||
- name: Check .js exists
|
||||
run: |
|
||||
JS="cli/.js/target/scala-3.0.2/cli-opt/aqua-${{ env.VERSION }}.js"
|
||||
mv cli/.js/target/scala-3.0.2/cli-opt/main.js "$JS"
|
||||
JS="cli/.js/target/scala-3.1.0/cli-opt/aqua-${{ env.VERSION }}.js"
|
||||
mv cli/.js/target/scala-3.1.0/cli-opt/main.js "$JS"
|
||||
stat "$JS"
|
||||
echo "JS=$JS" >> $GITHUB_ENV
|
||||
|
||||
|
2
.github/workflows/test_branch.yml
vendored
2
.github/workflows/test_branch.yml
vendored
@ -52,7 +52,7 @@ jobs:
|
||||
git clone https://github.com/fluencelabs/aqua-playground.git
|
||||
sbt "cliJS/fastOptJS"
|
||||
rm -rf aqua-playground/src/compiled/examples/*
|
||||
mv cli/.js/target/scala-3.0.2/cli-fastopt.js npm/aqua.js
|
||||
mv cli/.js/target/scala-3.1.0/cli-fastopt.js npm/aqua.js
|
||||
cd npm
|
||||
npm i
|
||||
cd ../aqua-playground
|
||||
|
11
aqua-src/foldJoin.aqua
Normal file
11
aqua-src/foldJoin.aqua
Normal file
@ -0,0 +1,11 @@
|
||||
service Op2("op"):
|
||||
identity(s: u64)
|
||||
|
||||
service Peer("peer"):
|
||||
timestamp_sec: -> u64
|
||||
|
||||
func getTwoResults():
|
||||
on "other node":
|
||||
co on "x":
|
||||
z <- Peer.timestamp_sec()
|
||||
Op2.identity(z)
|
23
aqua-src/nopingback.aqua
Normal file
23
aqua-src/nopingback.aqua
Normal file
@ -0,0 +1,23 @@
|
||||
service Kademlia("kad"):
|
||||
neighborhood: string, ?bool, ?bool -> []string
|
||||
|
||||
service Peer("peer"):
|
||||
timestamp_sec: -> ()
|
||||
timeout: u32, string -> ()
|
||||
|
||||
func ack_peers() -> []string:
|
||||
acked_peers: *string
|
||||
|
||||
on HOST_PEER_ID:
|
||||
nodes <- Kademlia.neighborhood(%init_peer_id%, nil, nil)
|
||||
for n <- nodes par:
|
||||
status: *string
|
||||
on n:
|
||||
Peer.timestamp_sec()
|
||||
status <<- "acked"
|
||||
|
||||
if status! == "acked":
|
||||
acked_peers <<- n
|
||||
|
||||
Peer.timeout(15000, "") -- this line's indentation triggers the bug
|
||||
<- acked_peers
|
@ -5,7 +5,6 @@ service Moo("tools"):
|
||||
func foo():
|
||||
ss <- Moo.bla()
|
||||
on HOST_PEER_ID:
|
||||
Moo.bla()
|
||||
for s <- ss par:
|
||||
on s:
|
||||
Moo.bla()
|
@ -1,7 +1,11 @@
|
||||
module Ret declares *
|
||||
|
||||
export someFunc
|
||||
export getTwoResults
|
||||
|
||||
func someFunc(cb: []string -> ()):
|
||||
ifaces: *string
|
||||
cb(ifaces)
|
||||
service Peer("peer"):
|
||||
timestamp_sec: -> u64
|
||||
|
||||
func getTwoResults() -> u64:
|
||||
on "other node":
|
||||
res <- Peer.timestamp_sec()
|
||||
<- res
|
6
aqua-src/so.aqua
Normal file
6
aqua-src/so.aqua
Normal file
@ -0,0 +1,6 @@
|
||||
service TestS("some-id"):
|
||||
t: string -> string
|
||||
|
||||
func doStuff(c: bool):
|
||||
if c:
|
||||
TestS.t("fr")
|
21
build.sbt
21
build.sbt
@ -1,28 +1,27 @@
|
||||
val dottyVersion = "3.0.2"
|
||||
val dottyVersion = "3.1.0"
|
||||
|
||||
scalaVersion := dottyVersion
|
||||
|
||||
val baseAquaVersion = settingKey[String]("base aqua version")
|
||||
|
||||
val catsV = "2.6.1"
|
||||
val catsParseV = "0.3.5"
|
||||
val catsV = "2.7.0"
|
||||
val catsParseV = "0.3.6"
|
||||
val monocleV = "3.0.0-M6"
|
||||
val scalaTestV = "3.2.9"
|
||||
val fs2V = "3.1.0"
|
||||
val catsEffectV = "3.2.1"
|
||||
val log4catsV = "2.1.1"
|
||||
val slf4jV = "1.7.30"
|
||||
val declineV = "2.1.0"
|
||||
val scalaTestV = "3.2.10"
|
||||
val fs2V = "3.2.3"
|
||||
val catsEffectV = "3.3.1"
|
||||
val declineV = "2.2.0"
|
||||
val circeVersion = "0.14.1"
|
||||
val scribeV = "3.6.3"
|
||||
|
||||
name := "aqua-hll"
|
||||
|
||||
val commons = Seq(
|
||||
baseAquaVersion := "0.5.1",
|
||||
baseAquaVersion := "0.5.2",
|
||||
version := baseAquaVersion.value + "-" + sys.env.getOrElse("BUILD_NUMBER", "SNAPSHOT"),
|
||||
scalaVersion := dottyVersion,
|
||||
libraryDependencies ++= Seq(
|
||||
"com.outr" %%% "scribe" % "3.5.5",
|
||||
"com.outr" %%% "scribe" % scribeV,
|
||||
"org.scalatest" %%% "scalatest" % scalaTestV % Test
|
||||
),
|
||||
scalacOptions ++= {
|
||||
|
@ -22,7 +22,7 @@ object Test extends IOApp.Simple {
|
||||
start <- IO(System.currentTimeMillis())
|
||||
_ <- AquaPathCompiler
|
||||
.compileFilesTo[IO](
|
||||
Path("./aqua-src/parfold.aqua"),
|
||||
Path("./aqua-src/nopingback.aqua"),
|
||||
List(Path("./aqua")),
|
||||
Option(Path("./target")),
|
||||
TypeScriptBackend,
|
||||
|
@ -1,11 +1,14 @@
|
||||
package aqua.model.func.raw
|
||||
|
||||
import aqua.model.ValueModel
|
||||
import aqua.model.ValueModel.varName
|
||||
import aqua.model.func.{Call, FuncModel}
|
||||
import cats.data.NonEmptyList
|
||||
import cats.data.Chain
|
||||
|
||||
sealed trait RawTag {
|
||||
// What variable names this tag uses (children are not respected)
|
||||
def usesVarNames: Set[String] = Set.empty
|
||||
|
||||
def mapValues(f: ValueModel => ValueModel): RawTag = this match {
|
||||
case OnTag(peerId, via) => OnTag(f(peerId), via.map(f))
|
||||
@ -58,7 +61,9 @@ sealed trait RawTag {
|
||||
sealed trait NoExecTag extends RawTag
|
||||
|
||||
sealed trait GroupTag extends RawTag
|
||||
|
||||
sealed trait SeqGroupTag extends GroupTag
|
||||
|
||||
sealed trait ParGroupTag extends GroupTag
|
||||
|
||||
case object SeqTag extends SeqGroupTag
|
||||
@ -67,40 +72,69 @@ case object ParTag extends ParGroupTag {
|
||||
case object Detach extends ParGroupTag
|
||||
}
|
||||
|
||||
case object XorTag extends SeqGroupTag {
|
||||
case object LeftBiased extends SeqGroupTag
|
||||
case object XorTag extends GroupTag {
|
||||
case object LeftBiased extends GroupTag
|
||||
}
|
||||
|
||||
case class XorParTag(xor: FuncOp, par: FuncOp) extends RawTag {
|
||||
// Collect all the used variable names
|
||||
override def usesVarNames: Set[String] = xor.usesVarNames.value ++ par.usesVarNames.value
|
||||
}
|
||||
case class XorParTag(xor: FuncOp, par: FuncOp) extends RawTag
|
||||
|
||||
case class OnTag(peerId: ValueModel, via: Chain[ValueModel]) extends SeqGroupTag {
|
||||
|
||||
override def usesVarNames: Set[String] =
|
||||
ValueModel.varName(peerId).toSet ++ via.iterator.flatMap(ValueModel.varName)
|
||||
|
||||
override def toString: String =
|
||||
s"(on $peerId${if (via.nonEmpty) " via " + via.toList.mkString(" via ") else ""})"
|
||||
}
|
||||
case class NextTag(item: String) extends RawTag
|
||||
case class RestrictionTag(name: String, isStream: Boolean) extends SeqGroupTag
|
||||
|
||||
case class NextTag(item: String) extends RawTag {
|
||||
override def usesVarNames: Set[String] = Set(item)
|
||||
}
|
||||
|
||||
case class RestrictionTag(name: String, isStream: Boolean) extends SeqGroupTag {
|
||||
override def usesVarNames: Set[String] = Set(name)
|
||||
}
|
||||
|
||||
case class MatchMismatchTag(left: ValueModel, right: ValueModel, shouldMatch: Boolean)
|
||||
extends SeqGroupTag
|
||||
case class ForTag(item: String, iterable: ValueModel) extends SeqGroupTag
|
||||
extends SeqGroupTag {
|
||||
|
||||
override def usesVarNames: Set[String] =
|
||||
ValueModel.varName(left).toSet ++ ValueModel.varName(right).toSet
|
||||
}
|
||||
|
||||
case class ForTag(item: String, iterable: ValueModel) extends SeqGroupTag {
|
||||
override def usesVarNames: Set[String] = Set(item) ++ ValueModel.varName(iterable)
|
||||
}
|
||||
|
||||
case class CallArrowTag(
|
||||
funcName: String,
|
||||
call: Call
|
||||
) extends RawTag
|
||||
) extends RawTag {
|
||||
override def usesVarNames: Set[String] = call.argVarNames
|
||||
}
|
||||
|
||||
case class DeclareStreamTag(
|
||||
value: ValueModel
|
||||
) extends NoExecTag
|
||||
) extends NoExecTag {
|
||||
override def usesVarNames: Set[String] = ValueModel.varName(value).toSet
|
||||
}
|
||||
|
||||
case class AssignmentTag(
|
||||
value: ValueModel,
|
||||
assignTo: String
|
||||
) extends NoExecTag
|
||||
) extends NoExecTag {
|
||||
override def usesVarNames: Set[String] = Set(assignTo) ++ ValueModel.varName(value)
|
||||
}
|
||||
|
||||
case class ClosureTag(
|
||||
func: FuncModel
|
||||
) extends NoExecTag
|
||||
) extends NoExecTag {
|
||||
// TODO captured names are lost?
|
||||
override def usesVarNames: Set[String] = Set(func.name)
|
||||
}
|
||||
|
||||
case class ReturnTag(
|
||||
values: NonEmptyList[ValueModel]
|
||||
@ -118,13 +152,20 @@ case class CallServiceTag(
|
||||
funcName: String,
|
||||
call: Call
|
||||
) extends RawTag {
|
||||
|
||||
override def usesVarNames: Set[String] = ValueModel.varName(serviceId).toSet ++ call.argVarNames
|
||||
|
||||
override def toString: String = s"(call _ ($serviceId $funcName) $call)"
|
||||
}
|
||||
|
||||
case class PushToStreamTag(operand: ValueModel, exportTo: Call.Export) extends RawTag {
|
||||
override def usesVarNames: Set[String] = ValueModel.varName(operand).toSet
|
||||
|
||||
override def toString: String = s"(push $operand $exportTo)"
|
||||
}
|
||||
|
||||
case class CanonicalizeTag(operand: ValueModel, exportTo: Call.Export) extends RawTag {
|
||||
override def usesVarNames: Set[String] = ValueModel.varName(operand).toSet
|
||||
|
||||
override def toString: String = s"(can $operand $exportTo)"
|
||||
}
|
||||
|
@ -148,6 +148,12 @@ object Node {
|
||||
)
|
||||
}
|
||||
|
||||
def co(body: Raw*) =
|
||||
Node(
|
||||
ParTag.Detach,
|
||||
body.toList
|
||||
)
|
||||
|
||||
def on(peer: ValueModel, via: List[ValueModel], body: Raw*) =
|
||||
Node(
|
||||
OnTag(peer, Chain.fromSeq(via)),
|
||||
|
@ -42,16 +42,17 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
through(relayV),
|
||||
through(otherRelay),
|
||||
MakeRes.xor(
|
||||
callRes(1, otherPeer),
|
||||
MakeRes.seq(
|
||||
callRes(1, otherPeer),
|
||||
through(otherRelay),
|
||||
through(relayV)
|
||||
),
|
||||
MakeRes.seq(
|
||||
through(otherRelay),
|
||||
through(relayV),
|
||||
errorCall(bc, 1, initPeer),
|
||||
through(relayV)
|
||||
errorCall(bc, 1, initPeer)
|
||||
)
|
||||
),
|
||||
through(otherRelay),
|
||||
through(relayV),
|
||||
MakeRes.xor(
|
||||
respCall(bc, ret, initPeer),
|
||||
errorCall(bc, 2, initPeer)
|
||||
|
@ -26,7 +26,7 @@ class RawCursorSpec extends AnyFlatSpec with Matchers {
|
||||
)
|
||||
)
|
||||
|
||||
raw.firstExecuted shouldBe raw.lastExecuted
|
||||
//raw.firstExecuted shouldBe raw.lastExecuted
|
||||
}
|
||||
|
||||
"simple raw cursor with multiple calls" should "move on seqs" in {
|
||||
@ -47,10 +47,10 @@ class RawCursorSpec extends AnyFlatSpec with Matchers {
|
||||
)
|
||||
)
|
||||
|
||||
raw.lastExecuted shouldBe raw.firstExecuted.get.seqNext.get.seqNext.get.seqNext
|
||||
raw.lastExecuted.get.seqPrev shouldBe raw.firstExecuted.get.seqNext.get.seqNext
|
||||
raw.lastExecuted.get.seqPrev.get.seqPrev shouldBe raw.firstExecuted.get.seqNext
|
||||
raw.lastExecuted.get.seqPrev shouldBe raw.firstExecuted.get.seqNext.get.seqNext
|
||||
// raw.lastExecuted shouldBe raw.firstExecuted.get.seqNext.get.seqNext.get.seqNext
|
||||
// raw.lastExecuted.get.seqPrev shouldBe raw.firstExecuted.get.seqNext.get.seqNext
|
||||
// raw.lastExecuted.get.seqPrev.get.seqPrev shouldBe raw.firstExecuted.get.seqNext
|
||||
// raw.lastExecuted.get.seqPrev shouldBe raw.firstExecuted.get.seqNext.get.seqNext
|
||||
}
|
||||
|
||||
"simple raw cursor on init_peer_id via relay" should "move properly" in {
|
||||
@ -66,7 +66,7 @@ class RawCursorSpec extends AnyFlatSpec with Matchers {
|
||||
)
|
||||
)
|
||||
|
||||
raw.firstExecuted shouldBe raw.lastExecuted
|
||||
//raw.firstExecuted shouldBe raw.lastExecuted
|
||||
}
|
||||
|
||||
"raw cursor" should "move properly" in {
|
||||
@ -105,29 +105,29 @@ class RawCursorSpec extends AnyFlatSpec with Matchers {
|
||||
raw.tag should be(
|
||||
OnTag(LiteralModel.initPeerId, Chain.one(VarModel("-relay-", ScalarType.string)))
|
||||
)
|
||||
raw.firstExecuted.map(_.tag) should be(
|
||||
Some(
|
||||
callService(LiteralModel.quote("calledOutside"), "fn", Call(Nil, Nil)).tree.head
|
||||
)
|
||||
)
|
||||
raw.lastExecuted.map(_.tag) should be(
|
||||
Some(
|
||||
callService(
|
||||
LiteralModel.quote("return"),
|
||||
"fn",
|
||||
Call(VarModel("export", ScalarType.string) :: Nil, Nil)
|
||||
).tree.head
|
||||
)
|
||||
)
|
||||
raw.lastExecuted.flatMap(_.seqPrev).flatMap(_.lastExecuted).map(_.tag) should be(
|
||||
Some(
|
||||
callService(
|
||||
LiteralModel.quote("calledInside"),
|
||||
"fn",
|
||||
Call(Nil, Call.Export("export", ScalarType.string) :: Nil)
|
||||
).tree.head
|
||||
)
|
||||
)
|
||||
// raw.firstExecuted.map(_.tag) should be(
|
||||
// Some(
|
||||
// callService(LiteralModel.quote("calledOutside"), "fn", Call(Nil, Nil)).tree.head
|
||||
// )
|
||||
// )
|
||||
// raw.lastExecuted.map(_.tag) should be(
|
||||
// Some(
|
||||
// callService(
|
||||
// LiteralModel.quote("return"),
|
||||
// "fn",
|
||||
// Call(VarModel("export", ScalarType.string) :: Nil, Nil)
|
||||
// ).tree.head
|
||||
// )
|
||||
// )
|
||||
// raw.lastExecuted.flatMap(_.seqPrev).flatMap(_.lastExecuted).map(_.tag) should be(
|
||||
// Some(
|
||||
// callService(
|
||||
// LiteralModel.quote("calledInside"),
|
||||
// "fn",
|
||||
// Call(Nil, Call.Export("export", ScalarType.string) :: Nil)
|
||||
// ).tree.head
|
||||
// )
|
||||
// )
|
||||
|
||||
}
|
||||
|
||||
@ -172,48 +172,48 @@ class RawCursorSpec extends AnyFlatSpec with Matchers {
|
||||
raw.tag should be(
|
||||
OnTag(LiteralModel.initPeerId, Chain.one(VarModel("-relay-", ScalarType.string)))
|
||||
)
|
||||
raw.firstExecuted.map(_.tag) should be(
|
||||
Some(
|
||||
callService(LiteralModel.quote("calledOutside"), "fn", Call(Nil, Nil)).tree.head
|
||||
)
|
||||
)
|
||||
raw.lastExecuted.map(_.tag) should be(
|
||||
Some(
|
||||
callService(
|
||||
LiteralModel.quote("return"),
|
||||
"fn",
|
||||
Call(VarModel("export", ScalarType.string) :: Nil, Nil)
|
||||
).tree.head
|
||||
)
|
||||
)
|
||||
raw.lastExecuted.flatMap(_.seqPrev).flatMap(_.lastExecuted).map(_.tag) should be(
|
||||
Some(
|
||||
callService(
|
||||
LiteralModel.quote("calledInside"),
|
||||
"fn",
|
||||
Call(Nil, Call.Export("export", ScalarType.string) :: Nil)
|
||||
).tree.head
|
||||
)
|
||||
)
|
||||
raw.lastExecuted.flatMap(_.seqPrev).map(_.pathOn).get should be(
|
||||
OnTag(
|
||||
VarModel("-in-fold-", ScalarType.string),
|
||||
Chain.one(VarModel("-fold-relay-", ScalarType.string))
|
||||
) :: OnTag(
|
||||
VarModel("-other-", ScalarType.string),
|
||||
Chain.one(VarModel("-external-", ScalarType.string))
|
||||
) :: OnTag(
|
||||
LiteralModel.initPeerId,
|
||||
Chain.one(VarModel("-relay-", ScalarType.string))
|
||||
) :: Nil
|
||||
)
|
||||
raw.lastExecuted.map(_.pathFromPrev).get should be(
|
||||
Chain(
|
||||
VarModel("-fold-relay-", ScalarType.string),
|
||||
VarModel("-external-", ScalarType.string),
|
||||
VarModel("-relay-", ScalarType.string)
|
||||
)
|
||||
)
|
||||
// raw.firstExecuted.map(_.tag) should be(
|
||||
// Some(
|
||||
// callService(LiteralModel.quote("calledOutside"), "fn", Call(Nil, Nil)).tree.head
|
||||
// )
|
||||
// )
|
||||
// raw.lastExecuted.map(_.tag) should be(
|
||||
// Some(
|
||||
// callService(
|
||||
// LiteralModel.quote("return"),
|
||||
// "fn",
|
||||
// Call(VarModel("export", ScalarType.string) :: Nil, Nil)
|
||||
// ).tree.head
|
||||
// )
|
||||
// )
|
||||
// raw.lastExecuted.flatMap(_.seqPrev).flatMap(_.lastExecuted).map(_.tag) should be(
|
||||
// Some(
|
||||
// callService(
|
||||
// LiteralModel.quote("calledInside"),
|
||||
// "fn",
|
||||
// Call(Nil, Call.Export("export", ScalarType.string) :: Nil)
|
||||
// ).tree.head
|
||||
// )
|
||||
// )
|
||||
// raw.lastExecuted.flatMap(_.seqPrev).map(_.topology.pathOn).get should be(
|
||||
// OnTag(
|
||||
// VarModel("-in-fold-", ScalarType.string),
|
||||
// Chain.one(VarModel("-fold-relay-", ScalarType.string))
|
||||
// ) :: OnTag(
|
||||
// VarModel("-other-", ScalarType.string),
|
||||
// Chain.one(VarModel("-external-", ScalarType.string))
|
||||
// ) :: OnTag(
|
||||
// LiteralModel.initPeerId,
|
||||
// Chain.one(VarModel("-relay-", ScalarType.string))
|
||||
// ) :: Nil
|
||||
// )
|
||||
// raw.lastExecuted.map(_.topology.pathBefore).get should be(
|
||||
// Chain(
|
||||
// VarModel("-fold-relay-", ScalarType.string),
|
||||
// VarModel("-external-", ScalarType.string),
|
||||
// VarModel("-relay-", ScalarType.string)
|
||||
// )
|
||||
// )
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ import aqua.Node
|
||||
import aqua.model.VarModel
|
||||
import aqua.model.func.Call
|
||||
import aqua.model.func.raw.FuncOps
|
||||
import aqua.model.transform.res.{MakeRes, ResolvedOp, XorRes}
|
||||
import aqua.model.transform.res.{MakeRes, ResolvedOp, SeqRes, XorRes}
|
||||
import aqua.types.ScalarType
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
@ -13,6 +13,7 @@ import org.scalatest.flatspec.AnyFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
||||
class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
import Node._
|
||||
|
||||
"topology resolver" should "do nothing on init peer" in {
|
||||
@ -440,7 +441,7 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
// this example doesn't create a hop on relay after fold
|
||||
// but the test create it, so there is not a one-on-one simulation
|
||||
// change it or write an integration test
|
||||
"topology resolver" should "create returning hops on chain of 'on'" ignore {
|
||||
"topology resolver" should "create returning hops on chain of 'on'" in {
|
||||
val init =
|
||||
on(
|
||||
initPeer,
|
||||
@ -470,14 +471,16 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
val expected: Node.Res =
|
||||
MakeRes.seq(
|
||||
callRes(0, initPeer),
|
||||
callRes(1, otherRelay)
|
||||
through(relay),
|
||||
callRes(0, otherPeer),
|
||||
MakeRes.fold("i", valueArray, MakeRes.par(callRes(2, otherPeer2), nextRes("i"))),
|
||||
through(relay),
|
||||
callRes(3, initPeer)
|
||||
)
|
||||
proc.equalsOrPrintDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
// This behavior is correct, but as two seqs are not flattened, it's a question how to make the matching result structure
|
||||
"topology resolver" should "create returning hops on nested 'on'" ignore {
|
||||
"topology resolver" should "create returning hops on nested 'on'" in {
|
||||
val init =
|
||||
on(
|
||||
initPeer,
|
||||
@ -505,17 +508,16 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
val expected: Node.Res =
|
||||
MakeRes.seq(
|
||||
callRes(0, initPeer),
|
||||
callRes(1, otherRelay),
|
||||
through(relay),
|
||||
callRes(1, otherPeer),
|
||||
through(otherRelay2),
|
||||
MakeRes.fold(
|
||||
"i",
|
||||
valueArray,
|
||||
MakeRes.seq(
|
||||
through(otherRelay2),
|
||||
callRes(2, otherPeer2),
|
||||
through(otherRelay2),
|
||||
nextRes("i")
|
||||
)
|
||||
callRes(2, otherPeer2),
|
||||
nextRes("i")
|
||||
),
|
||||
through(otherRelay2),
|
||||
through(relay),
|
||||
callRes(3, initPeer)
|
||||
)
|
||||
@ -524,7 +526,7 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
}
|
||||
|
||||
// https://github.com/fluencelabs/aqua/issues/205
|
||||
"topology resolver" should "optimize path over fold" ignore {
|
||||
"topology resolver" should "optimize path over fold" in {
|
||||
val i = VarModel("i", ScalarType.string)
|
||||
val init =
|
||||
on(
|
||||
@ -546,12 +548,14 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
val expected: Node.Res =
|
||||
MakeRes.seq(
|
||||
through(relay),
|
||||
through(otherRelay),
|
||||
MakeRes.fold(
|
||||
"i",
|
||||
valueArray,
|
||||
MakeRes.seq(
|
||||
callRes(1, i),
|
||||
through(otherRelay),
|
||||
callRes(1, i)
|
||||
),
|
||||
MakeRes.seq(
|
||||
through(otherRelay),
|
||||
nextRes("i")
|
||||
)
|
||||
@ -561,4 +565,92 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
proc.equalsOrPrintDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
"topology resolver" should "handle detach" in {
|
||||
val init =
|
||||
on(
|
||||
initPeer,
|
||||
relay :: Nil,
|
||||
co(on(otherPeer, Nil, callTag(1, Call.Export(varNode.name, varNode.`type`) :: Nil))),
|
||||
callTag(2, Nil, varNode :: Nil)
|
||||
)
|
||||
|
||||
val proc = Topology.resolve(init)
|
||||
|
||||
val expected: Node.Res =
|
||||
MakeRes.seq(
|
||||
through(relay),
|
||||
MakeRes.par(
|
||||
MakeRes.seq(
|
||||
callRes(1, otherPeer, Some(Call.Export(varNode.name, varNode.`type`))),
|
||||
through(relay),
|
||||
through(initPeer) // pingback
|
||||
)
|
||||
),
|
||||
callRes(2, initPeer, None, varNode :: Nil)
|
||||
)
|
||||
|
||||
proc.equalsOrPrintDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
"topology resolver" should "handle moved detach" in {
|
||||
val init =
|
||||
on(
|
||||
initPeer,
|
||||
relay :: Nil,
|
||||
on(
|
||||
otherPeer2,
|
||||
Nil,
|
||||
co(on(otherPeer, Nil, callTag(1, Call.Export(varNode.name, varNode.`type`) :: Nil))),
|
||||
callTag(2, Nil, varNode :: Nil)
|
||||
)
|
||||
)
|
||||
|
||||
val proc = Topology.resolve(init)
|
||||
|
||||
val expected: Node.Res =
|
||||
MakeRes.seq(
|
||||
through(relay),
|
||||
MakeRes.par(
|
||||
MakeRes.seq(
|
||||
callRes(1, otherPeer, Some(Call.Export(varNode.name, varNode.`type`))),
|
||||
through(otherPeer2) // pingback
|
||||
)
|
||||
),
|
||||
callRes(2, otherPeer2, None, varNode :: Nil)
|
||||
)
|
||||
|
||||
proc.equalsOrPrintDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
"topology resolver" should "handle detach moved to relay" in {
|
||||
val init =
|
||||
on(
|
||||
initPeer,
|
||||
relay :: Nil,
|
||||
on(
|
||||
relay,
|
||||
Nil,
|
||||
co(on(otherPeer, Nil, callTag(1, Call.Export(varNode.name, varNode.`type`) :: Nil)))
|
||||
),
|
||||
callTag(2, Nil, varNode :: Nil)
|
||||
)
|
||||
|
||||
val proc = Topology.resolve(init)
|
||||
|
||||
val expected: Node.Res =
|
||||
MakeRes.seq(
|
||||
through(relay),
|
||||
MakeRes.par(
|
||||
MakeRes.seq(
|
||||
callRes(1, otherPeer, Some(Call.Export(varNode.name, varNode.`type`))),
|
||||
through(relay), // pingback
|
||||
through(initPeer) // pingback
|
||||
)
|
||||
),
|
||||
callRes(2, initPeer, None, varNode :: Nil)
|
||||
)
|
||||
|
||||
proc.equalsOrPrintDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
}
|
||||
|
BIN
model/transform/img.png
Normal file
BIN
model/transform/img.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 85 KiB |
33
model/transform/readme.md
Normal file
33
model/transform/readme.md
Normal file
@ -0,0 +1,33 @@
|
||||
# Topology transformations
|
||||
|
||||
Beware cycles!
|
||||
|
||||
![img.png](img.png)
|
||||
|
||||
- Before: where execution happened before entering into this tag
|
||||
- Begin: where execution is expected to be in the beginning of this tag
|
||||
|
||||
Usually tags handle their beforePaths: inject pathway from location Before to location Begin, to ensure that execution can get there.
|
||||
|
||||
- End: where execution is expected to end when all the children of the tag are executed
|
||||
- After: where execution flow should go after this tag is handled
|
||||
|
||||
Usually Before == previous End or parent's Begin, and After == next Begin or parent's After.
|
||||
|
||||
- Finally: either After, if tag takes care of getting from its internal scope to the outer scope, or End if it doesn't
|
||||
|
||||
Usually tags do not care about the afterPath and inject nothing. But in some cases this path is super necessary, e.g. returning from the Par branch must be done within that Par branch, as doing it later is too late.
|
||||
|
||||
| Tag | Before | Begin | End | After | Finally | Force Exit |
|
||||
|------------|-------------------------|--------------------|--------------------|-----------------------------|-----------------------------------|------------------|
|
||||
| Default | parent.begin OR _path_ | _path_ | **<-** begin | **<-** ends | force ? **<-** after: **<-** ends | _false_ |
|
||||
| seq | - | - | lastChild.finally | - | - | - |
|
||||
| seq/* | prev.finally OR default | - | - | next.begin OR parent.after | - | - |
|
||||
| xor/*:0 | - | - | - | parent.after | - | hasExecLater |
|
||||
| xor/*:1 | prev.ends | - | - | parent.after | - | hasExecLater |
|
||||
| xor | - | - | lastChild.finally | - | - | - |
|
||||
| par/* | - | - | **<-** before | parent.after | - | exportsUsedLater |
|
||||
| for | - | fc.begins(until i) | - | - | - | - |
|
||||
| noExec | - | - | **<-** begin | - | - | - |
|
||||
|
||||
|
@ -12,7 +12,7 @@ abstract class ChainCursor[C <: ChainCursor[C, T], T](make: NonEmptyList[ChainZi
|
||||
val tree: NonEmptyList[ChainZipper[T]]
|
||||
|
||||
// Parent element, if not at root
|
||||
def parent: Option[T] = tree.tail.headOption.map(_.current)
|
||||
def parent: Option[T] = moveUp.map(_.current)
|
||||
|
||||
// The closest element
|
||||
def current: T = tree.head.current
|
||||
@ -34,13 +34,14 @@ abstract class ChainCursor[C <: ChainCursor[C, T], T](make: NonEmptyList[ChainZi
|
||||
)
|
||||
|
||||
// Path to this position: just drop siblings
|
||||
def path: NonEmptyList[T] = tree.map(_.current)
|
||||
lazy val path: NonEmptyList[T] = tree.map(_.current)
|
||||
|
||||
// Move cursor up
|
||||
// TODO: ensure this cursor's data is cached properly
|
||||
def moveUp: Option[C] = NonEmptyList.fromList(tree.tail).map(make)
|
||||
|
||||
// Path to root, in form of Cursors; this is skipped
|
||||
def pathToRoot: LazyList[C] = LazyList.unfold(this)(_.moveUp.map(c => c -> c))
|
||||
val pathToRoot: LazyList[C] = LazyList.unfold(this)(_.moveUp.map(c => c -> c))
|
||||
|
||||
// Move down: need a ChainZipper that's below
|
||||
def moveDown(focusOn: ChainZipper[T]): C = make(focusOn :: tree)
|
||||
|
@ -24,14 +24,17 @@ object MakeRes {
|
||||
def seq(first: Res, second: Res, more: Res*): Res =
|
||||
Cofree[Chain, ResolvedOp](SeqRes, Eval.later(first +: second +: Chain.fromSeq(more)))
|
||||
|
||||
def par(first: Res, second: Res, more: Res*): Res =
|
||||
Cofree[Chain, ResolvedOp](ParRes, Eval.later(first +: second +: Chain.fromSeq(more)))
|
||||
def par(first: Res, more: Res*): Res =
|
||||
Cofree[Chain, ResolvedOp](ParRes, Eval.later(first +: Chain.fromSeq(more)))
|
||||
|
||||
def xor(first: Res, second: Res): Res =
|
||||
Cofree[Chain, ResolvedOp](XorRes, Eval.later(Chain(first, second)))
|
||||
|
||||
def fold(item: String, iter: ValueModel, body: Res): Res =
|
||||
Cofree[Chain, ResolvedOp](FoldRes(item, iter), Eval.now(Chain.one(body)))
|
||||
def fold(item: String, iter: ValueModel, body0: Res, body: Res*): Res =
|
||||
Cofree[Chain, ResolvedOp](
|
||||
FoldRes(item, iter),
|
||||
Eval.now(Chain.one(body0) ++ Chain.fromSeq(body))
|
||||
)
|
||||
|
||||
def noop(onPeer: ValueModel): Res =
|
||||
leaf(CallServiceRes(LiteralModel.quote("op"), "noop", CallRes(Nil, None), onPeer))
|
||||
|
@ -10,53 +10,19 @@ import scala.annotation.tailrec
|
||||
|
||||
object PathFinder extends Logging {
|
||||
|
||||
def find(from: RawCursor, to: RawCursor, isExit: Boolean = false): Chain[ValueModel] = {
|
||||
|
||||
val fromOn = Chain.fromSeq(from.pathOn).reverse
|
||||
val toOn = Chain.fromSeq(to.pathOn).reverse
|
||||
|
||||
val wasHandled =
|
||||
!isExit &&
|
||||
to.leftSiblings.isEmpty &&
|
||||
to.moveUp.exists(_.pathOn == to.pathOn) &&
|
||||
!to.parentTag.exists(_.isInstanceOf[ParGroupTag])
|
||||
|
||||
if (wasHandled) {
|
||||
logger.trace("Was handled")
|
||||
logger.trace(" :: " + from)
|
||||
logger.trace(" -> " + to)
|
||||
Chain.empty
|
||||
} else {
|
||||
logger.trace("Find path")
|
||||
logger.trace(" :: " + from)
|
||||
logger.trace(" -> " + to)
|
||||
findPath(
|
||||
fromOn,
|
||||
toOn,
|
||||
from.currentPeerId,
|
||||
to.currentPeerId
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def optimizePath(
|
||||
peerIds: Chain[ValueModel],
|
||||
prefix: Chain[ValueModel],
|
||||
suffix: Chain[ValueModel]
|
||||
): Chain[ValueModel] = {
|
||||
val optimized = peerIds
|
||||
.foldLeft(Chain.empty[ValueModel]) {
|
||||
case (acc, p) if acc.lastOption.contains(p) => acc
|
||||
case (acc, p) if acc.contains(p) => acc.takeWhile(_ != p) :+ p
|
||||
case (acc, p) => acc :+ p
|
||||
}
|
||||
logger.trace(s"PEER IDS: $optimized")
|
||||
logger.trace(s"PREFIX: $prefix")
|
||||
logger.trace(s"SUFFIX: $suffix")
|
||||
logger.trace(s"OPTIMIZED WITH PREFIX AND SUFFIX: $optimized")
|
||||
val noPrefix = skipPrefix(optimized, prefix, optimized)
|
||||
skipSuffix(noPrefix, suffix, noPrefix)
|
||||
}
|
||||
/**
|
||||
* Finds the path – chain of peers to visit to get from [[fromOn]] to [[toOn]]
|
||||
* @param fromOn Previous location
|
||||
* @param toOn Next location
|
||||
* @return Chain of peers to visit in between
|
||||
*/
|
||||
def findPath(fromOn: List[OnTag], toOn: List[OnTag]): Chain[ValueModel] =
|
||||
findPath(
|
||||
Chain.fromSeq(fromOn).reverse,
|
||||
Chain.fromSeq(toOn).reverse,
|
||||
fromOn.headOption.map(_.peerId),
|
||||
toOn.headOption.map(_.peerId)
|
||||
)
|
||||
|
||||
def findPath(
|
||||
fromOn: Chain[OnTag],
|
||||
@ -89,6 +55,33 @@ object PathFinder extends Logging {
|
||||
optimized
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes cycles from the path
|
||||
*
|
||||
* @param peerIds peers to walk trough
|
||||
* @param prefix getting from the previous peer
|
||||
* @param suffix getting to the next peer
|
||||
* @return optimal path with no duplicates
|
||||
*/
|
||||
def optimizePath(
|
||||
peerIds: Chain[ValueModel],
|
||||
prefix: Chain[ValueModel],
|
||||
suffix: Chain[ValueModel]
|
||||
): Chain[ValueModel] = {
|
||||
val optimized = peerIds
|
||||
.foldLeft(Chain.empty[ValueModel]) {
|
||||
case (acc, p) if acc.lastOption.contains(p) => acc
|
||||
case (acc, p) if acc.contains(p) => acc.takeWhile(_ != p) :+ p
|
||||
case (acc, p) => acc :+ p
|
||||
}
|
||||
logger.trace(s"PEER IDS: $optimized")
|
||||
logger.trace(s"PREFIX: $prefix")
|
||||
logger.trace(s"SUFFIX: $suffix")
|
||||
logger.trace(s"OPTIMIZED WITH PREFIX AND SUFFIX: $optimized")
|
||||
val noPrefix = skipPrefix(optimized, prefix, optimized)
|
||||
skipSuffix(noPrefix, suffix, noPrefix)
|
||||
}
|
||||
|
||||
@tailrec
|
||||
def skipPrefix[T](chain: Chain[T], prefix: Chain[T], init: Chain[T]): Chain[T] =
|
||||
(chain, prefix) match {
|
||||
|
@ -2,21 +2,36 @@ package aqua.model.transform.topology
|
||||
|
||||
import aqua.model.ValueModel
|
||||
import aqua.model.func.raw.*
|
||||
import aqua.model.func.raw.FuncOp.Tree
|
||||
import cats.Eval
|
||||
import cats.data.{Chain, NonEmptyList, OptionT}
|
||||
import aqua.model.transform.cursor._
|
||||
import cats.syntax.traverse._
|
||||
import aqua.model.transform.cursor.*
|
||||
import cats.syntax.traverse.*
|
||||
import cats.free.Cofree
|
||||
import scribe.Logging
|
||||
|
||||
// Can be heavily optimized by caching parent cursors, not just list of zippers
|
||||
case class RawCursor(tree: NonEmptyList[ChainZipper[FuncOp.Tree]])
|
||||
extends ChainCursor[RawCursor, FuncOp.Tree](RawCursor.apply) with Logging {
|
||||
case class RawCursor(
|
||||
tree: NonEmptyList[ChainZipper[FuncOp.Tree]],
|
||||
cachedParent: Option[RawCursor] = None
|
||||
) extends ChainCursor[RawCursor, FuncOp.Tree](RawCursor.apply(_, None)) with Logging {
|
||||
|
||||
override def moveUp: Option[RawCursor] = cachedParent.orElse(super.moveUp)
|
||||
|
||||
override lazy val toPrevSibling: Option[RawCursor] =
|
||||
super.toPrevSibling.map(_.copy(cachedParent = cachedParent))
|
||||
|
||||
override lazy val toNextSibling: Option[RawCursor] =
|
||||
super.toNextSibling.map(_.copy(cachedParent = cachedParent))
|
||||
|
||||
override def moveDown(focusOn: ChainZipper[Tree]): RawCursor =
|
||||
super.moveDown(focusOn).copy(cachedParent = Some(this))
|
||||
|
||||
def tag: RawTag = current.head
|
||||
|
||||
def parentTag: Option[RawTag] = parent.map(_.head)
|
||||
|
||||
def hasChildren: Boolean =
|
||||
lazy val hasChildren: Boolean =
|
||||
current.tailForced.nonEmpty
|
||||
|
||||
lazy val toFirstChild: Option[RawCursor] =
|
||||
@ -25,61 +40,30 @@ case class RawCursor(tree: NonEmptyList[ChainZipper[FuncOp.Tree]])
|
||||
lazy val toLastChild: Option[RawCursor] =
|
||||
ChainZipper.last(current.tail.value).map(moveDown)
|
||||
|
||||
lazy val children: LazyList[RawCursor] =
|
||||
LazyList.unfold(toFirstChild)(_.map(c => c -> c.toNextSibling))
|
||||
|
||||
def findInside(f: RawCursor => Boolean): LazyList[RawCursor] =
|
||||
children.flatMap(_.findInside(f)).prependedAll(Option.when(f(this))(this))
|
||||
|
||||
lazy val topology: Topology = Topology.make(this)
|
||||
|
||||
lazy val tagsPath: NonEmptyList[RawTag] = path.map(_.head)
|
||||
|
||||
lazy val pathOn: List[OnTag] = tagsPath.collect { case o: OnTag =>
|
||||
o
|
||||
}
|
||||
|
||||
// Assume that the very first tag is `on` tag
|
||||
lazy val rootOn: Option[RawCursor] = moveUp
|
||||
.flatMap(_.rootOn)
|
||||
.orElse(tag match {
|
||||
case _: OnTag =>
|
||||
Some(this)
|
||||
case _ => None
|
||||
})
|
||||
|
||||
// The closest peerId
|
||||
lazy val currentPeerId: Option[ValueModel] =
|
||||
pathOn.headOption.map(_.peerId)
|
||||
|
||||
// Cursor to the last sequentially executed operation, if any
|
||||
lazy val lastExecuted: Option[RawCursor] = tag match {
|
||||
case XorTag => toFirstChild.flatMap(_.lastExecuted)
|
||||
case _: SeqGroupTag => toLastChild.flatMap(_.lastExecuted)
|
||||
case _: ParGroupTag =>
|
||||
None // ParGroup builds exit path within itself; there's no "lastExecuted", they are many
|
||||
case _: NoExecTag => moveLeft.flatMap(_.lastExecuted)
|
||||
case _ => Some(this)
|
||||
}
|
||||
|
||||
lazy val firstExecuted: Option[RawCursor] = tag match {
|
||||
case _: SeqGroupTag => toFirstChild.flatMap(_.firstExecuted)
|
||||
case _: ParGroupTag =>
|
||||
None // As many branches are executed simultaneously, no definition of first
|
||||
case _: NoExecTag => moveRight.flatMap(_.firstExecuted)
|
||||
case _ => Some(this)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sequentially previous cursor
|
||||
* @return
|
||||
*/
|
||||
lazy val seqPrev: Option[RawCursor] =
|
||||
parentTag.flatMap {
|
||||
case p: SeqGroupTag if leftSiblings.nonEmpty =>
|
||||
toPrevSibling.flatMap(c => c.lastExecuted orElse c.seqPrev)
|
||||
case p =>
|
||||
moveUp.flatMap(_.seqPrev)
|
||||
// Whether the current branch contains any AIR-executable code or not
|
||||
lazy val isNoExec: Boolean =
|
||||
tag match {
|
||||
case _: NoExecTag => true
|
||||
case _: GroupTag => children.forall(_.isNoExec)
|
||||
case _ => false
|
||||
}
|
||||
|
||||
lazy val seqNext: Option[RawCursor] =
|
||||
parentTag.flatMap {
|
||||
case _: SeqGroupTag if rightSiblings.nonEmpty =>
|
||||
toNextSibling.flatMap(c => c.firstExecuted orElse c.seqNext)
|
||||
case _ => moveUp.flatMap(_.seqNext)
|
||||
}
|
||||
def hasExecLater: Boolean =
|
||||
!allToRight.forall(_.isNoExec)
|
||||
|
||||
// Whether variables exported from this branch are used later in the code or not
|
||||
def exportsUsedLater: Boolean =
|
||||
FuncOp(current).exportsVarNames.map(ns => ns.nonEmpty && checkNamesUsedLater(ns)).value
|
||||
|
||||
// TODO write a test
|
||||
def checkNamesUsedLater(names: Set[String]): Boolean =
|
||||
@ -88,50 +72,6 @@ case class RawCursor(tree: NonEmptyList[ChainZipper[FuncOp.Tree]])
|
||||
.map(FuncOp(_))
|
||||
.exists(_.usesVarNames.value.intersect(names).nonEmpty)
|
||||
|
||||
lazy val pathFromPrev: Chain[ValueModel] = pathFromPrevD()
|
||||
|
||||
def pathFromPrevD(forExit: Boolean = false): Chain[ValueModel] =
|
||||
parentTag.fold(Chain.empty[ValueModel]) {
|
||||
case _: GroupTag =>
|
||||
seqPrev
|
||||
.orElse(rootOn)
|
||||
.fold(Chain.empty[ValueModel])(PathFinder.find(_, this, isExit = forExit))
|
||||
case _ =>
|
||||
Chain.empty
|
||||
}
|
||||
|
||||
lazy val pathToNext: Chain[ValueModel] = parentTag.fold(Chain.empty[ValueModel]) {
|
||||
case _: ParGroupTag =>
|
||||
val exports = FuncOp(current).exportsVarNames.value
|
||||
if (exports.nonEmpty && checkNamesUsedLater(exports))
|
||||
seqNext.fold(Chain.empty[ValueModel])(nxt =>
|
||||
PathFinder.find(this, nxt) ++
|
||||
// we need to "wake" the target peer to enable join behaviour
|
||||
Chain.fromOption(nxt.currentPeerId)
|
||||
)
|
||||
else Chain.empty
|
||||
case XorTag if leftSiblings.nonEmpty =>
|
||||
lastExecuted
|
||||
.flatMap(le =>
|
||||
seqNext
|
||||
.map(nxt => PathFinder.find(le, nxt, isExit = true) -> nxt)
|
||||
.flatMap {
|
||||
case (path, nxt) if path.isEmpty && currentPeerId == nxt.currentPeerId =>
|
||||
nxt.pathFromPrevD(true).reverse.initLast.map(_._1)
|
||||
case (path, nxt) =>
|
||||
path.initLast.map {
|
||||
case (init, last)
|
||||
if nxt.pathFromPrevD(forExit = true).headOption.contains(last) =>
|
||||
init
|
||||
case (init, last) => init :+ last
|
||||
}
|
||||
}
|
||||
)
|
||||
.getOrElse(Chain.empty)
|
||||
case _ =>
|
||||
Chain.empty
|
||||
}
|
||||
|
||||
def cata[A](wrap: ChainZipper[Cofree[Chain, A]] => Chain[Cofree[Chain, A]])(
|
||||
folder: RawCursor => OptionT[Eval, ChainZipper[Cofree[Chain, A]]]
|
||||
): Eval[Chain[Cofree[Chain, A]]] =
|
||||
@ -141,7 +81,9 @@ case class RawCursor(tree: NonEmptyList[ChainZipper[FuncOp.Tree]])
|
||||
toFirstChild
|
||||
.map(folderCursor =>
|
||||
LazyList
|
||||
.unfold(folderCursor) { _.toNextSibling.map(cursor => cursor -> cursor) }
|
||||
.unfold(folderCursor) {
|
||||
_.toNextSibling.map(cursor => cursor -> cursor)
|
||||
}
|
||||
.prepended(folderCursor)
|
||||
)
|
||||
.getOrElse(LazyList.empty)
|
||||
|
@ -1,23 +1,401 @@
|
||||
package aqua.model.transform.topology
|
||||
|
||||
import aqua.model.ValueModel.varName
|
||||
import aqua.model.transform.cursor.ChainZipper
|
||||
import aqua.model.func.raw.*
|
||||
import aqua.model.transform.res.*
|
||||
import aqua.model.{LiteralModel, ValueModel, VarModel}
|
||||
import aqua.types.{BoxType, ScalarType}
|
||||
import cats.Eval
|
||||
import cats.data.Chain.nil
|
||||
import cats.data.Chain.{==:, nil}
|
||||
import cats.data.{Chain, NonEmptyChain, NonEmptyList, OptionT}
|
||||
import cats.free.Cofree
|
||||
import cats.syntax.traverse.*
|
||||
import cats.syntax.apply.*
|
||||
import scribe.Logging
|
||||
|
||||
/**
|
||||
* Wraps all the logic for topology reasoning about the tag in the AST represented by the [[cursor]]
|
||||
*
|
||||
* @param cursor Pointer to the current place in the AST
|
||||
* @param before Strategy of calculating where the previous executions happened
|
||||
* @param begins Strategy of calculating where execution of this tag/node should begin
|
||||
* @param ends Strategy of calculating where execution of this tag/node happens
|
||||
* @param after Strategy of calculating where the next execution should happen and whether we need to move there or not
|
||||
*/
|
||||
case class Topology private (
|
||||
cursor: RawCursor,
|
||||
before: Topology.Before,
|
||||
begins: Topology.Begins,
|
||||
ends: Topology.Ends,
|
||||
after: Topology.After
|
||||
) {
|
||||
|
||||
val pathOn: Eval[List[OnTag]] = Eval
|
||||
.later(cursor.tagsPath.collect { case o: OnTag =>
|
||||
o
|
||||
})
|
||||
.memoize
|
||||
|
||||
lazy val firstExecutesOn: Eval[Option[List[OnTag]]] =
|
||||
(cursor.tag match {
|
||||
case _: CallServiceTag => pathOn.map(Some(_))
|
||||
case _ =>
|
||||
children
|
||||
.map(_.firstExecutesOn)
|
||||
.scanLeft[Eval[Option[List[OnTag]]]](Eval.now(None)) { case (acc, el) =>
|
||||
(acc, el).mapN(_ orElse _)
|
||||
}
|
||||
.collectFirst {
|
||||
case e if e.value.isDefined => e
|
||||
}
|
||||
.getOrElse(Eval.now(None))
|
||||
}).memoize
|
||||
|
||||
lazy val lastExecutesOn: Eval[Option[List[OnTag]]] =
|
||||
(cursor.tag match {
|
||||
case _: CallServiceTag => pathOn.map(Some(_))
|
||||
case _ =>
|
||||
children
|
||||
.map(_.lastExecutesOn)
|
||||
.scanRight[Eval[Option[List[OnTag]]]](Eval.now(None)) { case (acc, el) =>
|
||||
(acc, el).mapN(_ orElse _)
|
||||
}
|
||||
.collectFirst {
|
||||
case e if e.value.isDefined => e
|
||||
}
|
||||
.getOrElse(Eval.now(None))
|
||||
}).memoize
|
||||
|
||||
lazy val currentPeerId: Option[ValueModel] = pathOn.value.headOption.map(_.peerId)
|
||||
|
||||
lazy val prevSibling: Option[Topology] = cursor.toPrevSibling.map(_.topology)
|
||||
|
||||
lazy val nextSibling: Option[Topology] = cursor.toNextSibling.map(_.topology)
|
||||
|
||||
lazy val firstChild: Option[Topology] = cursor.toFirstChild.map(_.topology)
|
||||
|
||||
lazy val lastChild: Option[Topology] = cursor.toLastChild.map(_.topology)
|
||||
|
||||
lazy val children: LazyList[Topology] = cursor.children.map(_.topology)
|
||||
|
||||
def findInside(f: Topology => Boolean): LazyList[Topology] =
|
||||
children.flatMap(_.findInside(f)).prependedAll(Option.when(f(this))(this))
|
||||
|
||||
val parent: Option[Topology] = cursor.moveUp.map(_.topology)
|
||||
|
||||
val parents: LazyList[Topology] =
|
||||
LazyList.unfold(parent)(p => p.map(pp => pp -> pp.parent))
|
||||
|
||||
lazy val forTag: Option[ForTag] = Option(cursor.tag).collect { case ft: ForTag =>
|
||||
ft
|
||||
}
|
||||
|
||||
lazy val isForTag: Boolean = forTag.isDefined
|
||||
|
||||
// Before the left boundary of this element, what was the scope
|
||||
lazy val beforeOn: Eval[List[OnTag]] = before.beforeOn(this).memoize
|
||||
|
||||
// Inside the left boundary of this element, what should be the scope
|
||||
lazy val beginsOn: Eval[List[OnTag]] = begins.beginsOn(this).memoize
|
||||
|
||||
// After this element is done, what is the scope
|
||||
lazy val endsOn: Eval[List[OnTag]] = ends.endsOn(this).memoize
|
||||
|
||||
// After this element is done, where should it move to prepare for the next one
|
||||
lazy val afterOn: Eval[List[OnTag]] = after.afterOn(this).memoize
|
||||
|
||||
// Usually we don't care about exiting from where this tag ends into the outer scope
|
||||
// But for some cases, like par branches, its necessary, so the exit can be forced
|
||||
lazy val forceExit: Eval[Boolean] = after.forceExit(this).memoize
|
||||
|
||||
// Where we finally are, after exit enforcement is applied
|
||||
lazy val finallyOn: Eval[List[OnTag]] = after.finallyOn(this).memoize
|
||||
|
||||
lazy val pathBefore: Eval[Chain[ValueModel]] = begins.pathBefore(this).memoize
|
||||
|
||||
lazy val pathAfter: Eval[Chain[ValueModel]] = after.pathAfter(this).memoize
|
||||
}
|
||||
|
||||
object Topology extends Logging {
|
||||
type Tree = Cofree[Chain, RawTag]
|
||||
type Res = Cofree[Chain, ResolvedOp]
|
||||
|
||||
def resolve(op: Tree): Res = {
|
||||
val resolved = resolveOnMoves(op).value
|
||||
// Returns a peerId to go to in case it equals the last relay: useful when we do execution on the relay
|
||||
private def findRelayPathEnforcement(bef: List[OnTag], beg: List[OnTag]): Chain[ValueModel] =
|
||||
Chain.fromOption(
|
||||
beg.headOption
|
||||
.map(_.peerId)
|
||||
.filter(lastPeerId => beg.tail.headOption.exists(_.via.lastOption.contains(lastPeerId)))
|
||||
.filter(lastPeerId => !bef.headOption.exists(_.peerId == lastPeerId))
|
||||
)
|
||||
|
||||
trait Before {
|
||||
|
||||
def beforeOn(current: Topology): Eval[List[OnTag]] =
|
||||
// Go to the parent, see where it begins
|
||||
current.parent.map(_.beginsOn) getOrElse
|
||||
// This means, we have no parent; then we're where we should be
|
||||
current.pathOn
|
||||
}
|
||||
|
||||
trait Begins {
|
||||
def beginsOn(current: Topology): Eval[List[OnTag]] = current.pathOn
|
||||
|
||||
def pathBefore(current: Topology): Eval[Chain[ValueModel]] =
|
||||
(current.beforeOn, current.beginsOn).mapN { case (bef, beg) =>
|
||||
(PathFinder.findPath(bef, beg), bef, beg)
|
||||
}.flatMap { case (pb, bef, beg) =>
|
||||
// Handle the case when we need to go through the relay, but miss the hop as it's the first
|
||||
// peer where we go, but there's no service calls there
|
||||
current.firstExecutesOn.map {
|
||||
case Some(where) if where != beg =>
|
||||
pb ++ findRelayPathEnforcement(bef, beg)
|
||||
case _ => pb
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait Ends {
|
||||
|
||||
def endsOn(current: Topology): Eval[List[OnTag]] =
|
||||
current.beginsOn
|
||||
|
||||
protected def lastChildFinally(current: Topology): Eval[List[OnTag]] =
|
||||
current.lastChild.map(lc =>
|
||||
lc.forceExit.flatMap {
|
||||
case true => current.afterOn
|
||||
case false => lc.endsOn
|
||||
}
|
||||
) getOrElse current.beginsOn
|
||||
}
|
||||
|
||||
trait After {
|
||||
def forceExit(current: Topology): Eval[Boolean] = Eval.now(false)
|
||||
|
||||
def afterOn(current: Topology): Eval[List[OnTag]] = current.pathOn
|
||||
|
||||
protected def afterParent(current: Topology): Eval[List[OnTag]] =
|
||||
current.parent.map(
|
||||
_.afterOn
|
||||
) getOrElse current.pathOn
|
||||
|
||||
// In case exit is performed and pathAfter is inserted, we're actually where
|
||||
// execution is expected to continue After this node is handled
|
||||
final def finallyOn(current: Topology): Eval[List[OnTag]] =
|
||||
current.forceExit.flatMap {
|
||||
case true => current.afterOn
|
||||
case false => current.endsOn
|
||||
}
|
||||
|
||||
// If exit is forced, make a path outside this node
|
||||
// – from where it ends to where execution is expected to continue
|
||||
def pathAfter(current: Topology): Eval[Chain[ValueModel]] =
|
||||
current.forceExit.flatMap {
|
||||
case true =>
|
||||
(current.endsOn, current.afterOn).mapN(PathFinder.findPath)
|
||||
case false =>
|
||||
Eval.now(Chain.empty)
|
||||
}
|
||||
}
|
||||
|
||||
object Default extends Before with Begins with Ends with After {
|
||||
override def toString: String = "<default>"
|
||||
}
|
||||
|
||||
// Parent == Seq, On
|
||||
object SeqGroupBranch extends Before with After {
|
||||
override def toString: String = "<seq>/*"
|
||||
|
||||
// If parent is seq, then before this node we are where previous node, if any, ends
|
||||
override def beforeOn(current: Topology): Eval[List[OnTag]] =
|
||||
current.prevSibling
|
||||
.map(_.finallyOn) getOrElse super.beforeOn(current)
|
||||
|
||||
override def afterOn(current: Topology): Eval[List[OnTag]] =
|
||||
current.nextSibling.map(_.beginsOn) getOrElse afterParent(current)
|
||||
|
||||
}
|
||||
|
||||
object SeqGroup extends Ends {
|
||||
override def toString: String = "<seq>"
|
||||
|
||||
override def endsOn(current: Topology): Eval[List[OnTag]] =
|
||||
lastChildFinally(current)
|
||||
}
|
||||
|
||||
// Parent == Xor
|
||||
object XorBranch extends Before with After {
|
||||
override def toString: String = "<xor>/*"
|
||||
|
||||
override def beforeOn(current: Topology): Eval[List[OnTag]] =
|
||||
current.prevSibling.map(_.endsOn) getOrElse super.beforeOn(current)
|
||||
|
||||
// TODO: if this xor is in par that needs no forceExit, do not exit
|
||||
override def forceExit(current: Topology): Eval[Boolean] =
|
||||
Eval.later(current.cursor.moveUp.exists(_.hasExecLater))
|
||||
|
||||
override def afterOn(current: Topology): Eval[List[OnTag]] =
|
||||
afterParent(current)
|
||||
}
|
||||
|
||||
// Parent == Par
|
||||
object ParGroupBranch extends Ends with After {
|
||||
override def toString: String = "<par>/*"
|
||||
|
||||
override def forceExit(current: Topology): Eval[Boolean] =
|
||||
Eval.later(current.cursor.exportsUsedLater)
|
||||
|
||||
override def afterOn(current: Topology): Eval[List[OnTag]] =
|
||||
afterParent(current)
|
||||
|
||||
override def pathAfter(current: Topology): Eval[Chain[ValueModel]] =
|
||||
current.forceExit
|
||||
.flatMap[Chain[ValueModel]] {
|
||||
case false => Eval.now(Chain.empty[ValueModel])
|
||||
case true =>
|
||||
(current.endsOn, current.afterOn, current.lastExecutesOn).mapN {
|
||||
case (e, a, _) if e == a => Chain.empty[ValueModel]
|
||||
case (e, a, l) if l.contains(e) =>
|
||||
// Pingback in case no relays involved
|
||||
Chain.fromOption(a.headOption.map(_.peerId))
|
||||
case (e, a, _) =>
|
||||
// We wasn't at e, so need to get through the last peer in case it matches with the relay
|
||||
findRelayPathEnforcement(a, e) ++ Chain.fromOption(a.headOption.map(_.peerId))
|
||||
}
|
||||
}
|
||||
.flatMap { appendix =>
|
||||
// Ping the next (join) peer to enforce its data update
|
||||
super.pathAfter(current).map(_ ++ appendix)
|
||||
}
|
||||
|
||||
override def endsOn(current: Topology): Eval[List[OnTag]] = current.beforeOn
|
||||
}
|
||||
|
||||
object XorGroup extends Ends {
|
||||
override def toString: String = "<xor>"
|
||||
|
||||
// Xor tag ends where any child ends; can't get first one as it may lead to recursion
|
||||
override def endsOn(current: Topology): Eval[List[OnTag]] =
|
||||
lastChildFinally(current)
|
||||
|
||||
}
|
||||
|
||||
object Root extends Before with Ends with After {
|
||||
override def toString: String = "<root>"
|
||||
|
||||
override def beforeOn(current: Topology): Eval[List[OnTag]] = current.beginsOn
|
||||
|
||||
override def endsOn(current: Topology): Eval[List[OnTag]] = current.pathOn
|
||||
|
||||
override def afterOn(current: Topology): Eval[List[OnTag]] = current.pathOn
|
||||
|
||||
override def forceExit(current: Topology): Eval[Boolean] = Eval.now(false)
|
||||
}
|
||||
|
||||
object ParGroup extends Begins with Ends {
|
||||
override def toString: String = "<par>"
|
||||
|
||||
// Optimization: find the longest common prefix of all the par branches, and move it outside of this par
|
||||
// When branches will calculate their paths, they will take this move into account.
|
||||
// So less hops will be produced
|
||||
override def beginsOn(current: Topology): Eval[List[OnTag]] =
|
||||
current.children
|
||||
.map(_.beginsOn.map(_.reverse))
|
||||
.reduceLeftOption { case (b1e, b2e) =>
|
||||
(b1e, b2e).mapN { case (b1, b2) =>
|
||||
(b1 zip b2).takeWhile(_ == _).map(_._1)
|
||||
}
|
||||
}
|
||||
.map(_.map(_.reverse)) getOrElse super.beginsOn(current)
|
||||
|
||||
// Par block ends where all the branches end, if they have forced exit (not fire-and-forget)
|
||||
override def endsOn(current: Topology): Eval[List[OnTag]] =
|
||||
current.children
|
||||
.map(_.forceExit)
|
||||
.reduceLeftOption { case (a, b) =>
|
||||
(a, b).mapN(_ || _)
|
||||
}
|
||||
.map(_.flatMap {
|
||||
case true => current.afterOn
|
||||
case false => super.endsOn(current)
|
||||
}) getOrElse super.endsOn(current)
|
||||
}
|
||||
|
||||
object For extends Begins {
|
||||
override def toString: String = "<for>"
|
||||
|
||||
// Optimization: get all the path inside the For block out of the block, to avoid repeating
|
||||
// hops for every For iteration
|
||||
override def beginsOn(current: Topology): Eval[List[OnTag]] =
|
||||
(current.forTag zip current.firstChild.map(_.beginsOn)).map { case (f, b) =>
|
||||
// Take path until this for's iterator is used
|
||||
b.map(
|
||||
_.reverse
|
||||
.foldLeft((true, List.empty[OnTag])) {
|
||||
case ((true, acc), OnTag(_, r)) if r.exists(ValueModel.varName(_).contains(f.item)) =>
|
||||
(false, acc)
|
||||
case ((true, acc @ (OnTag(_, r @ (r0 ==: _)) :: _)), OnTag(p, _))
|
||||
if ValueModel.varName(p).contains(f.item) =>
|
||||
// This is to take the outstanding relay and force moving there
|
||||
(false, OnTag(r0, r) :: acc)
|
||||
case ((true, acc), on) => (true, on :: acc)
|
||||
case ((false, acc), _) => (false, acc)
|
||||
}
|
||||
._2
|
||||
)
|
||||
} getOrElse super.beginsOn(current)
|
||||
|
||||
}
|
||||
|
||||
object SeqNext extends Begins {
|
||||
override def toString: String = "<seq>/<next>"
|
||||
|
||||
override def beginsOn(current: Topology): Eval[List[OnTag]] =
|
||||
current.parents.find(_.isForTag).map(_.beginsOn) getOrElse super.beginsOn(current)
|
||||
}
|
||||
|
||||
def make(cursor: RawCursor): Topology =
|
||||
Topology(
|
||||
cursor,
|
||||
// Before
|
||||
cursor.parentTag match {
|
||||
case Some(XorTag) => XorBranch
|
||||
case Some(_: SeqGroupTag) => SeqGroupBranch
|
||||
case None => Root
|
||||
case _ => Default
|
||||
},
|
||||
// Begin
|
||||
(cursor.parentTag, cursor.tag) match {
|
||||
case (Some(_: SeqGroupTag), _: NextTag) =>
|
||||
SeqNext
|
||||
case (_, _: ForTag) =>
|
||||
For
|
||||
case (_, ParTag | ParTag.Detach) =>
|
||||
ParGroup
|
||||
case _ =>
|
||||
Default
|
||||
},
|
||||
// End
|
||||
cursor.tag match {
|
||||
case _: SeqGroupTag => SeqGroup
|
||||
case XorTag => XorGroup
|
||||
case ParTag | ParTag.Detach => ParGroup
|
||||
case _ if cursor.parentTag.isEmpty => Root
|
||||
case _ => Default
|
||||
},
|
||||
// After
|
||||
cursor.parentTag match {
|
||||
case Some(ParTag | ParTag.Detach) => ParGroupBranch
|
||||
case Some(XorTag) => XorBranch
|
||||
case Some(_: SeqGroupTag) => SeqGroupBranch
|
||||
case None => Root
|
||||
case _ => Default
|
||||
}
|
||||
)
|
||||
|
||||
def resolve(op: Tree, debug: Boolean = false): Res = {
|
||||
val resolved = resolveOnMoves(op, debug).value
|
||||
Cofree
|
||||
.cata[Chain, ResolvedOp, Res](resolved) {
|
||||
case (SeqRes, children) =>
|
||||
@ -46,10 +424,11 @@ object Topology extends Logging {
|
||||
else cz.current
|
||||
)
|
||||
|
||||
def resolveOnMoves(op: Tree): Eval[Res] = {
|
||||
val cursor = RawCursor(NonEmptyList.one(ChainZipper.one(op)))
|
||||
def resolveOnMoves(op: Tree, debug: Boolean): Eval[Res] = {
|
||||
val cursor = RawCursor(NonEmptyList.one(ChainZipper.one(op)), None)
|
||||
// TODO: remove var
|
||||
var i = 0
|
||||
|
||||
def nextI = {
|
||||
i = i + 1
|
||||
i
|
||||
@ -60,18 +439,37 @@ object Topology extends Logging {
|
||||
logger.debug(s"<:> $rc")
|
||||
val resolved =
|
||||
MakeRes
|
||||
.resolve(rc.currentPeerId, nextI)
|
||||
.resolve(rc.topology.currentPeerId, nextI)
|
||||
.lift
|
||||
.apply(rc.tag)
|
||||
|
||||
logger.trace("Resolved: " + resolved)
|
||||
|
||||
if (debug) {
|
||||
println(Console.BLUE + rc + Console.RESET)
|
||||
println(rc.topology)
|
||||
println("Before: " + rc.topology.beforeOn.value)
|
||||
println("Begin: " + rc.topology.beginsOn.value)
|
||||
println("PathBefore: " + rc.topology.pathBefore.value)
|
||||
|
||||
println(Console.CYAN + "Parent: " + rc.topology.parent + Console.RESET)
|
||||
|
||||
println("End : " + rc.topology.endsOn.value)
|
||||
println("After: " + rc.topology.afterOn.value)
|
||||
println("Exit : " + rc.topology.forceExit.value)
|
||||
println("PathAfter: " + rc.topology.pathAfter.value)
|
||||
println(Console.YELLOW + " - - - - -" + Console.RESET)
|
||||
}
|
||||
|
||||
val chainZipperEv = resolved.traverse(cofree =>
|
||||
Eval.later {
|
||||
(
|
||||
rc.topology.pathBefore.map(through(_)),
|
||||
rc.topology.pathAfter.map(through(_, reversed = true))
|
||||
).mapN { case (pathBefore, pathAfter) =>
|
||||
val cz = ChainZipper(
|
||||
through(rc.pathFromPrev),
|
||||
pathBefore,
|
||||
cofree,
|
||||
through(rc.pathToNext)
|
||||
pathAfter
|
||||
)
|
||||
if (cz.next.nonEmpty || cz.prev.nonEmpty) {
|
||||
logger.debug(s"Resolved $rc -> $cofree")
|
||||
|
Loading…
Reference in New Issue
Block a user