mirror of
https://github.com/fluencelabs/aqua.git
synced 2024-12-04 14:40:17 +00:00
148 topology fix (#169)
This commit is contained in:
parent
298d7cf767
commit
8586d70364
@ -12,6 +12,10 @@ case class Call(args: List[ValueModel], exportTo: Option[Call.Export]) {
|
||||
)
|
||||
|
||||
def mapExport(f: String => String): Call = copy(exportTo = exportTo.map(_.mapName(f)))
|
||||
|
||||
def argVarNames: Set[String] = args.collect { case VarModel(name, _, _) =>
|
||||
name
|
||||
}.toSet
|
||||
}
|
||||
|
||||
object Call {
|
||||
|
@ -58,7 +58,7 @@ case class FuncCallable(
|
||||
val treeWithValues = body.resolveValues(argsToData)
|
||||
|
||||
// Function body on its own defines some values; collect their names
|
||||
val treeDefines = treeWithValues.definesValueNames.value -- call.exportTo.map(_.name)
|
||||
val treeDefines = treeWithValues.definesVarNames.value -- call.exportTo.map(_.name)
|
||||
|
||||
// We have some names in scope (forbiddenNames), can't introduce them again; so find new names
|
||||
val shouldRename = findNewNames(forbiddenNames, treeDefines)
|
||||
@ -105,7 +105,7 @@ case class FuncCallable(
|
||||
|
||||
// Function defines new names inside its body – need to collect them
|
||||
// TODO: actually it's done and dropped – so keep and pass it instead
|
||||
val newNames = appliedOp.definesValueNames.value
|
||||
val newNames = appliedOp.definesVarNames.value
|
||||
// At the very end, will need to resolve what is used as results with the result values
|
||||
(
|
||||
noNames ++ newNames,
|
||||
|
@ -21,7 +21,7 @@ case class FuncOp(tree: Cofree[Chain, OpTag]) extends Model {
|
||||
def cata[T](folder: (OpTag, Chain[T]) => Eval[T]): Eval[T] =
|
||||
Cofree.cata(tree)(folder)
|
||||
|
||||
def definesValueNames: Eval[Set[String]] = cata[Set[String]] {
|
||||
def definesVarNames: Eval[Set[String]] = cata[Set[String]] {
|
||||
case (CallArrowTag(_, Call(_, Some(export))), acc) =>
|
||||
Eval.later(acc.foldLeft(Set(export.name))(_ ++ _))
|
||||
case (CallServiceTag(_, _, Call(_, Some(export)), _), acc) =>
|
||||
@ -30,6 +30,22 @@ case class FuncOp(tree: Cofree[Chain, OpTag]) extends Model {
|
||||
case (_, acc) => Eval.later(acc.foldLeft(Set.empty[String])(_ ++ _))
|
||||
}
|
||||
|
||||
def exportsVarNames: Eval[Set[String]] = cata[Set[String]] {
|
||||
case (CallArrowTag(_, Call(_, Some(export))), acc) =>
|
||||
Eval.later(acc.foldLeft(Set(export.name))(_ ++ _))
|
||||
case (CallServiceTag(_, _, Call(_, Some(export)), _), acc) =>
|
||||
Eval.later(acc.foldLeft(Set(export.name))(_ ++ _))
|
||||
case (_, acc) => Eval.later(acc.foldLeft(Set.empty[String])(_ ++ _))
|
||||
}
|
||||
|
||||
def usesVarNames: Eval[Set[String]] = cata[Set[String]] {
|
||||
case (CallArrowTag(_, call), acc) =>
|
||||
Eval.later(acc.foldLeft(call.argVarNames)(_ ++ _))
|
||||
case (CallServiceTag(_, _, call, _), acc) =>
|
||||
Eval.later(acc.foldLeft(call.argVarNames)(_ ++ _))
|
||||
case (_, acc) => Eval.later(acc.foldLeft(Set.empty[String])(_ ++ _))
|
||||
}
|
||||
|
||||
def resolveValues(vals: Map[String, ValueModel]): FuncOp =
|
||||
FuncOp(tree.map[OpTag](_.mapValues(_.resolveWith(vals))))
|
||||
|
||||
|
@ -52,8 +52,8 @@ case class MatchMismatchTag(left: ValueModel, right: ValueModel, shouldMatch: Bo
|
||||
case class ForTag(item: String, iterable: ValueModel) extends SeqGroupTag
|
||||
|
||||
case class MetaTag(
|
||||
skipTopology: Boolean,
|
||||
comment: Option[String],
|
||||
skipTopology: Boolean = false,
|
||||
comment: Option[String] = None,
|
||||
op: OpTag
|
||||
) extends OpTag
|
||||
|
||||
|
@ -1,16 +1,29 @@
|
||||
package aqua.model.topology
|
||||
|
||||
import Topology.Tree
|
||||
import aqua.model.func.body.{OnTag, OpTag, ParTag, SeqTag}
|
||||
import aqua.model.func.body.{MetaTag, OnTag, OpTag, ParTag, SeqTag, XorTag}
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
import cats.syntax.functor._
|
||||
import wvlet.log.LogSupport
|
||||
|
||||
case class Cursor(point: ChainZipper[Tree], loc: Location) {
|
||||
case class Cursor(point: ChainZipper[Tree], loc: Location) extends LogSupport {
|
||||
|
||||
def downLoc(tree: Tree): Location =
|
||||
loc.down(point.copy(current = tree))
|
||||
|
||||
def moveUp: Option[Cursor] =
|
||||
loc.path match {
|
||||
case cz :: tail =>
|
||||
Some(Cursor(cz.replaceInjecting(point), Location(tail)))
|
||||
case _ =>
|
||||
None
|
||||
}
|
||||
|
||||
def pathToRoot: LazyList[Cursor] =
|
||||
moveUp.to(LazyList).flatMap(c => c #:: c.pathToRoot)
|
||||
|
||||
def mapParent(f: Tree => Tree): Cursor =
|
||||
copy(loc =
|
||||
Location(
|
||||
@ -23,44 +36,68 @@ case class Cursor(point: ChainZipper[Tree], loc: Location) {
|
||||
|
||||
def prevOnTags: Chain[OnTag] =
|
||||
Chain
|
||||
.fromSeq(
|
||||
.fromOption(
|
||||
point.prev.lastOption
|
||||
.orElse(loc.lastLeftSeq.map(_._1.current))
|
||||
.toList
|
||||
.flatMap(Cursor.rightBoundary)
|
||||
.takeWhile {
|
||||
case ParTag => false
|
||||
case _ => true
|
||||
}
|
||||
.map(t => loc.pathOn -> t)
|
||||
.orElse(loc.lastLeftSeq.map(_.map(_.pathOn).swap.map(_.current)))
|
||||
)
|
||||
.collect { case o: OnTag =>
|
||||
o
|
||||
.flatMap(pt => pt._1.widen[OpTag] ++ Cursor.rightBoundary(pt._2))
|
||||
.takeWhile {
|
||||
case ParTag => false
|
||||
case MetaTag(_, _, ParTag) => false
|
||||
case _ => true
|
||||
}
|
||||
.collect {
|
||||
case o: OnTag =>
|
||||
o
|
||||
case MetaTag(false, _, o: OnTag) =>
|
||||
o
|
||||
}
|
||||
|
||||
def nextOnTags: Chain[OnTag] =
|
||||
Chain
|
||||
.fromSeq(
|
||||
.fromOption(
|
||||
loc.lastRightSeq
|
||||
.map(_._1.current)
|
||||
.toList
|
||||
.flatMap(Cursor.leftBoundary)
|
||||
.takeWhile {
|
||||
case ParTag => false
|
||||
case _ => true
|
||||
}
|
||||
.map(_.map(_.pathOn).swap.map(_.current))
|
||||
)
|
||||
.collect { case o: OnTag =>
|
||||
o
|
||||
.flatMap(pt => pt._1 ++ Cursor.leftBoundary(pt._2))
|
||||
.takeWhile {
|
||||
case ParTag => false
|
||||
case MetaTag(_, _, ParTag) => false
|
||||
case _ => true
|
||||
}
|
||||
.collect {
|
||||
case o: OnTag =>
|
||||
o
|
||||
case MetaTag(false, _, o: OnTag) =>
|
||||
o
|
||||
}
|
||||
}
|
||||
|
||||
object Cursor {
|
||||
|
||||
def rightBoundary(root: Tree): LazyList[OpTag] =
|
||||
root.head #:: LazyList.unfold(root.tail)(_.value.lastOption.map(lo => lo.head -> lo.tail))
|
||||
def rightBoundary(root: Tree): Chain[OpTag] =
|
||||
Chain.fromSeq(rightBoundaryLazy(root))
|
||||
|
||||
def leftBoundary(root: Tree): LazyList[OpTag] =
|
||||
root.head #:: LazyList.unfold(root.tail)(_.value.headOption.map(lo => lo.head -> lo.tail))
|
||||
def rightBoundaryLazy(root: Tree): LazyList[OpTag] =
|
||||
root.head #:: (root.head match {
|
||||
case XorTag =>
|
||||
root.tailForced.reverse.toList match {
|
||||
case _ :: v :: _ =>
|
||||
// Go through the main branch of xor
|
||||
rightBoundaryLazy(v)
|
||||
case v :: Nil => rightBoundaryLazy(v)
|
||||
case _ => LazyList.empty
|
||||
}
|
||||
case _ =>
|
||||
root.tailForced.lastOption.to(LazyList).flatMap(rightBoundaryLazy)
|
||||
})
|
||||
|
||||
def leftBoundary(root: Tree): Chain[OpTag] =
|
||||
Chain
|
||||
.fromSeq(
|
||||
root.head #:: LazyList.unfold(root.tail)(_.value.headOption.map(lo => lo.head -> lo.tail))
|
||||
)
|
||||
|
||||
def transform(root: Tree)(f: Cursor => List[Tree]): Option[Tree] = {
|
||||
def step(cursor: Cursor): Option[Tree] =
|
||||
|
@ -1,50 +1,49 @@
|
||||
package aqua.model.topology
|
||||
|
||||
import aqua.model.ValueModel
|
||||
import aqua.model.func.body.{OnTag, SeqGroupTag}
|
||||
import aqua.model.func.body.{MetaTag, OnTag, SeqGroupTag}
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
import wvlet.log.LogSupport
|
||||
|
||||
case class Location(path: List[ChainZipper[Topology.Tree]] = Nil) {
|
||||
case class Location(path: List[ChainZipper[Topology.Tree]] = Nil) extends LogSupport {
|
||||
def down(h: ChainZipper[Topology.Tree]): Location = copy(h :: path)
|
||||
|
||||
def lastOn: Option[OnTag] = path.map(_.current.head).collectFirst { case o: OnTag =>
|
||||
o
|
||||
}
|
||||
def lastOn: Option[OnTag] = pathOn.lastOption
|
||||
|
||||
def pathOn: List[OnTag] = path.map(_.current.head).collect { case o: OnTag =>
|
||||
o
|
||||
}
|
||||
def firstOn: Option[OnTag] = pathOn.headOption
|
||||
|
||||
def pathViaChain: Chain[ValueModel] = Chain.fromSeq(
|
||||
path
|
||||
.map(_.current.head)
|
||||
.collectFirst { case t: OnTag =>
|
||||
t.via.toList
|
||||
}
|
||||
.toList
|
||||
.flatten
|
||||
)
|
||||
lazy val pathOn: Chain[OnTag] = Chain
|
||||
.fromSeq(path.map(_.current.head).collect {
|
||||
case o: OnTag =>
|
||||
o
|
||||
case MetaTag(false, _, o: OnTag) => o
|
||||
})
|
||||
.reverse
|
||||
|
||||
def lastLeftSeq: Option[(ChainZipper[Topology.Tree], Location)] =
|
||||
path match {
|
||||
case (cz @ ChainZipper(prev, Cofree(_: SeqGroupTag, _), _)) :: tail if prev.nonEmpty =>
|
||||
case (cz @ ChainZipper(
|
||||
prev,
|
||||
Cofree(_: SeqGroupTag | MetaTag(false, _, _: SeqGroupTag), _),
|
||||
_
|
||||
)) :: tail if prev.nonEmpty =>
|
||||
cz.moveLeft.map(_ -> Location(tail))
|
||||
case _ :: tail => Location(tail).lastLeftSeq
|
||||
case _ :: tail =>
|
||||
Location(tail).lastLeftSeq
|
||||
case Nil => None
|
||||
}
|
||||
|
||||
def lastRightSeq: Option[(ChainZipper[Topology.Tree], Location)] =
|
||||
path match {
|
||||
case (cz @ ChainZipper(_, Cofree(_: SeqGroupTag, _), next)) :: tail if next.nonEmpty =>
|
||||
case (cz @ ChainZipper(
|
||||
_,
|
||||
Cofree(_: SeqGroupTag | MetaTag(false, _, _: SeqGroupTag), _),
|
||||
next
|
||||
)) :: tail if next.nonEmpty =>
|
||||
cz.moveRight.map(_ -> Location(tail))
|
||||
case _ :: tail => Location(tail).lastRightSeq
|
||||
case Nil => None
|
||||
}
|
||||
|
||||
path.collectFirst {
|
||||
case ChainZipper(prev, Cofree(_: SeqGroupTag, _), _) if prev.nonEmpty => prev.lastOption
|
||||
}.flatten
|
||||
}
|
||||
|
||||
object Location {
|
||||
|
@ -4,64 +4,22 @@ import aqua.model.{ValueModel, VarModel}
|
||||
import aqua.model.func.body._
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.data.Chain.{:==, ==:, nil}
|
||||
import cats.free.Cofree
|
||||
import ChainZipper.Matchers._
|
||||
import Location.Matchers._
|
||||
import aqua.types.{BoxType, ScalarType}
|
||||
import wvlet.log.LogSupport
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
object Topology {
|
||||
object Topology extends LogSupport {
|
||||
type Tree = Cofree[Chain, OpTag]
|
||||
|
||||
// Walks through peer IDs, doing a noop function on each
|
||||
// If same IDs are found in a row, does noop only once
|
||||
// if there's a chain like a -> b -> c -> ... -> b -> g, remove everything between b and b
|
||||
def through(peerIds: Chain[ValueModel], reversed: Boolean = false): Chain[Tree] =
|
||||
peerIds
|
||||
.foldLeft(Chain.empty[ValueModel]) {
|
||||
case (acc, p) if acc.lastOption.contains(p) => acc
|
||||
case (acc, p) if acc.contains(p) => acc.takeWhile(_ != p) :+ p
|
||||
case (acc, p) => acc :+ p
|
||||
}
|
||||
.map { v =>
|
||||
v.lastType match {
|
||||
case _: BoxType =>
|
||||
val itemName = "-via-peer-"
|
||||
|
||||
FuncOps.meta(
|
||||
FuncOps.fold(
|
||||
itemName,
|
||||
v,
|
||||
if (reversed)
|
||||
FuncOps.seq(
|
||||
FuncOps.next(itemName),
|
||||
FuncOps.noop(VarModel(itemName, ScalarType.string))
|
||||
)
|
||||
else
|
||||
FuncOps.seq(
|
||||
FuncOps.noop(VarModel(itemName, ScalarType.string)),
|
||||
FuncOps.next(itemName)
|
||||
)
|
||||
),
|
||||
skipTopology = true
|
||||
)
|
||||
case _ =>
|
||||
FuncOps.noop(v)
|
||||
}
|
||||
}
|
||||
.map(_.tree)
|
||||
|
||||
def mapTag(tag: OpTag, loc: Location): OpTag = tag match {
|
||||
case c: CallServiceTag if c.peerId.isEmpty =>
|
||||
c.copy(peerId = loc.lastOn.map(_.peerId))
|
||||
case t => t
|
||||
}
|
||||
|
||||
def resolve(op: Tree): Tree =
|
||||
Cofree
|
||||
.cata[Chain, OpTag, Tree](resolveOnMoves(op)) {
|
||||
case (SeqTag | _: OnTag, children) =>
|
||||
case (SeqTag | _: OnTag | MetaTag(false, _, SeqTag | _: OnTag), children) =>
|
||||
Eval.later(
|
||||
Cofree(
|
||||
SeqTag,
|
||||
@ -75,6 +33,11 @@ object Topology {
|
||||
}
|
||||
.value
|
||||
|
||||
def resolveOnMoves(op: Tree): Tree =
|
||||
Cursor
|
||||
.transform(op)(transformWalker)
|
||||
.getOrElse(op)
|
||||
|
||||
@tailrec
|
||||
private def transformWalker(c: Cursor): List[Tree] =
|
||||
c match {
|
||||
@ -85,47 +48,178 @@ object Topology {
|
||||
`current`(cf),
|
||||
loc @ `head`(parent: GroupTag) /: _
|
||||
) =>
|
||||
val cfu = cf.copy(mapTag(cf.head, loc))
|
||||
// Set the service call IDs
|
||||
val cfu = cf.copy(setServiceCallPeerId(cf.head, loc))
|
||||
|
||||
val getThere = (cfu.head, loc.pathOn) match {
|
||||
case (OnTag(pid, _), h :: _) if h.peerId == pid => Chain.empty[ValueModel]
|
||||
case (OnTag(_, via), h :: _) =>
|
||||
h.via.reverse ++ via
|
||||
case (_, _) => Chain.empty[ValueModel]
|
||||
}
|
||||
// We need to get there, finally
|
||||
val currentPeerId = Chain.fromOption(loc.lastOn.map(_.peerId))
|
||||
|
||||
val prevOn = c.prevOnTags
|
||||
debug("Going to handle: " + cf.head)
|
||||
|
||||
val prevPath = prevOn.map { case OnTag(_, v) =>
|
||||
v.reverse
|
||||
}
|
||||
.flatMap(identity)
|
||||
val fromPrevToCurrentPath = fromPrevToCurrent(c, currentPeerId)
|
||||
if (fromPrevToCurrentPath.nonEmpty) debug("BEFORE = " + fromPrevToCurrentPath)
|
||||
|
||||
val nextOn = parent match {
|
||||
case ParTag | XorTag => c.nextOnTags
|
||||
case _ => Chain.empty[OnTag]
|
||||
}
|
||||
val nextPath = (if (nextOn.nonEmpty) getThere.reverse else Chain.empty) ++ nextOn.map {
|
||||
case OnTag(_, v) =>
|
||||
v.reverse
|
||||
}
|
||||
.flatMap(identity) ++ Chain.fromOption(
|
||||
// Dirty fix for join behaviour
|
||||
nextOn.lastOption.filter(_ => parent == ParTag).map(_.peerId)
|
||||
)
|
||||
val fromCurrentToNextPath = fromCurrentToNext(parent, c, currentPeerId)
|
||||
if (fromCurrentToNextPath.nonEmpty) debug("NEXT = " + fromCurrentToNextPath)
|
||||
|
||||
if (prevOn.isEmpty && getThere.isEmpty) cfu :: Nil
|
||||
else
|
||||
(through(prevPath ++ loc.pathViaChain ++ getThere)
|
||||
.append(cfu) ++ through(nextPath, reversed = true)).toList
|
||||
(through(fromPrevToCurrentPath)
|
||||
.append(cfu) ++ through(fromCurrentToNextPath, reversed = true)).toList
|
||||
|
||||
case Cursor(ChainZipper(_, cf, _), loc) =>
|
||||
cf.copy(mapTag(cf.head, loc)) :: Nil
|
||||
cf.copy(setServiceCallPeerId(cf.head, loc)) :: Nil
|
||||
}
|
||||
|
||||
def resolveOnMoves(op: Tree): Tree =
|
||||
Cursor
|
||||
.transform(op)(transformWalker)
|
||||
.getOrElse(op)
|
||||
def fromPrevToCurrent(c: Cursor, currentPeerId: Chain[ValueModel]): Chain[ValueModel] = {
|
||||
val prevOn = c.prevOnTags
|
||||
val currentOn = c.loc.pathOn
|
||||
|
||||
val wasHandled = c.pathToRoot.collectFirst { case cc @ Cursor(_, `head`(_: GroupTag) /: _) =>
|
||||
cc.loc.pathOn
|
||||
}.exists(cclp =>
|
||||
cclp == currentOn && {
|
||||
val (c1, _) = skipCommonPrefix(prevOn, cclp)
|
||||
c1.isEmpty
|
||||
}
|
||||
)
|
||||
|
||||
// Need to get from there
|
||||
val prevPeerId =
|
||||
Chain.fromOption(prevOn.lastOption.map(_.peerId) orElse c.loc.firstOn.map(_.peerId))
|
||||
|
||||
if (wasHandled) Chain.empty[ValueModel]
|
||||
else
|
||||
findPath(prevOn, currentOn, prevPeerId, currentPeerId)
|
||||
}
|
||||
|
||||
def fromCurrentToNext(
|
||||
parent: OpTag,
|
||||
c: Cursor,
|
||||
currentPeerId: Chain[ValueModel]
|
||||
): Chain[ValueModel] = {
|
||||
// Usually we don't need to go next
|
||||
val nextOn = parent match {
|
||||
case ParTag =>
|
||||
val exports = FuncOp(c.point.current).exportsVarNames.value
|
||||
if (exports.isEmpty) Chain.empty[OnTag]
|
||||
else {
|
||||
val isUsed = c.pathToRoot.tail.collect {
|
||||
case Cursor(cz, `head`(gt: GroupTag) /: _) if gt != ParTag =>
|
||||
cz.next.map(FuncOp(_)).map(_.usesVarNames)
|
||||
}.exists(_.exists(_.value.intersect(exports).nonEmpty))
|
||||
if (isUsed) c.nextOnTags else Chain.empty[OnTag]
|
||||
}
|
||||
case XorTag if c.point.prev.nonEmpty => c.nextOnTags
|
||||
case _ => Chain.empty[OnTag]
|
||||
}
|
||||
val nextPeerId =
|
||||
if (nextOn.nonEmpty) Chain.fromOption(nextOn.lastOption.map(_.peerId)) else currentPeerId
|
||||
|
||||
val targetOn: Option[OnTag] = c.point.current.head match {
|
||||
case o: OnTag => Option(o)
|
||||
case _ => None
|
||||
}
|
||||
val currentOn = c.loc.pathOn
|
||||
val currentOnInside = targetOn.fold(currentOn)(currentOn :+ _)
|
||||
findPath(
|
||||
currentOnInside,
|
||||
nextOn,
|
||||
currentPeerId,
|
||||
nextPeerId
|
||||
)
|
||||
}
|
||||
|
||||
def optimizePath(
|
||||
peerIds: Chain[ValueModel],
|
||||
prefix: Chain[ValueModel],
|
||||
suffix: Chain[ValueModel]
|
||||
): Chain[ValueModel] = {
|
||||
val optimized = peerIds
|
||||
.foldLeft(Chain.empty[ValueModel]) {
|
||||
case (acc, p) if acc.lastOption.contains(p) => acc
|
||||
case (acc, p) if acc.contains(p) => acc.takeWhile(_ != p) :+ p
|
||||
case (acc, p) => acc :+ p
|
||||
}
|
||||
val noPrefix = skipPrefix(optimized, prefix, optimized)
|
||||
skipSuffix(noPrefix, suffix, noPrefix)
|
||||
}
|
||||
|
||||
def findPath(
|
||||
fromOn: Chain[OnTag],
|
||||
toOn: Chain[OnTag],
|
||||
fromPeer: Chain[ValueModel],
|
||||
toPeer: Chain[ValueModel]
|
||||
): Chain[ValueModel] = {
|
||||
val (from, to) = skipCommonPrefix(fromOn, toOn)
|
||||
val fromFix =
|
||||
if (from.isEmpty && fromPeer != toPeer) Chain.fromOption(fromOn.lastOption) else from
|
||||
val toFix = if (to.isEmpty && fromPeer != toPeer) Chain.fromOption(toOn.lastOption) else to
|
||||
val fromTo = fromFix.reverse.flatMap(_.via.reverse) ++ toFix.flatMap(_.via)
|
||||
val optimized = optimizePath(fromPeer ++ fromTo ++ toPeer, fromPeer, toPeer)
|
||||
|
||||
debug("FIND PATH " + fromFix)
|
||||
debug(" -> " + toFix)
|
||||
debug(" Optimized: " + optimized)
|
||||
optimized
|
||||
}
|
||||
|
||||
@tailrec
|
||||
def skipPrefix[T](chain: Chain[T], prefix: Chain[T], init: Chain[T]): Chain[T] =
|
||||
(chain, prefix) match {
|
||||
case (c ==: ctail, p ==: ptail) if c == p => skipPrefix(ctail, ptail, init)
|
||||
case (_, `nil`) => chain
|
||||
case (_, _) => init
|
||||
}
|
||||
|
||||
@tailrec
|
||||
def skipCommonPrefix[T](chain1: Chain[T], chain2: Chain[T]): (Chain[T], Chain[T]) =
|
||||
(chain1, chain2) match {
|
||||
case (c ==: ctail, p ==: ptail) if c == p => skipCommonPrefix(ctail, ptail)
|
||||
case _ => chain1 -> chain2
|
||||
}
|
||||
|
||||
@tailrec
|
||||
def skipSuffix[T](chain: Chain[T], suffix: Chain[T], init: Chain[T]): Chain[T] =
|
||||
(chain, suffix) match {
|
||||
case (cinit :== c, pinit :== p) if c == p => skipSuffix(cinit, pinit, init)
|
||||
case (_, `nil`) => chain
|
||||
case (_, _) => init
|
||||
}
|
||||
|
||||
// Walks through peer IDs, doing a noop function on each
|
||||
// If same IDs are found in a row, does noop only once
|
||||
// if there's a chain like a -> b -> c -> ... -> b -> g, remove everything between b and b
|
||||
def through(peerIds: Chain[ValueModel], reversed: Boolean = false): Chain[Tree] =
|
||||
peerIds.map { v =>
|
||||
v.lastType match {
|
||||
case _: BoxType =>
|
||||
val itemName = "-via-peer-"
|
||||
|
||||
FuncOps.meta(
|
||||
FuncOps.fold(
|
||||
itemName,
|
||||
v,
|
||||
if (reversed)
|
||||
FuncOps.seq(
|
||||
FuncOps.next(itemName),
|
||||
FuncOps.noop(VarModel(itemName, ScalarType.string))
|
||||
)
|
||||
else
|
||||
FuncOps.seq(
|
||||
FuncOps.noop(VarModel(itemName, ScalarType.string)),
|
||||
FuncOps.next(itemName)
|
||||
)
|
||||
),
|
||||
skipTopology = true
|
||||
)
|
||||
case _ =>
|
||||
FuncOps.noop(v)
|
||||
}
|
||||
}
|
||||
.map(_.tree)
|
||||
|
||||
def setServiceCallPeerId(tag: OpTag, loc: Location): OpTag = tag match {
|
||||
case c: CallServiceTag if c.peerId.isEmpty =>
|
||||
c.copy(peerId = loc.lastOn.map(_.peerId))
|
||||
case t => t
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,7 @@ package aqua.model
|
||||
import aqua.model.func.Call
|
||||
import aqua.model.func.body._
|
||||
import aqua.model.transform.{BodyConfig, ErrorsCatcher}
|
||||
import aqua.types.{LiteralType, ScalarType}
|
||||
import aqua.types.{ArrayType, LiteralType, ScalarType}
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
@ -92,11 +92,22 @@ object Node {
|
||||
val otherRelay = LiteralModel("other-relay", ScalarType.string)
|
||||
val otherPeer2 = LiteralModel("other-peer-2", ScalarType.string)
|
||||
val otherRelay2 = LiteralModel("other-relay-2", ScalarType.string)
|
||||
val varNode = VarModel("node-id", ScalarType.string)
|
||||
val viaList = VarModel("other-relay-2", ArrayType(ScalarType.string))
|
||||
|
||||
def call(i: Int, on: ValueModel = null) = Node(
|
||||
CallServiceTag(LiteralModel(s"srv$i", ScalarType.string), s"fn$i", Call(Nil, None), Option(on))
|
||||
)
|
||||
|
||||
def callLiteral(i: Int, on: ValueModel = null) = Node(
|
||||
CallServiceTag(
|
||||
LiteralModel("\"srv" + i + "\"", LiteralType.string),
|
||||
s"fn$i",
|
||||
Call(Nil, None),
|
||||
Option(on)
|
||||
)
|
||||
)
|
||||
|
||||
def errorCall(bc: BodyConfig, i: Int, on: ValueModel = null) = Node(
|
||||
CallServiceTag(
|
||||
bc.errorHandlingCallback,
|
||||
@ -133,6 +144,8 @@ object Node {
|
||||
def seq(nodes: Node*) = Node(SeqTag, nodes.toList)
|
||||
def xor(left: Node, right: Node) = Node(XorTag, left :: right :: Nil)
|
||||
|
||||
def par(left: Node, right: Node) = Node(ParTag, left :: right :: Nil)
|
||||
|
||||
def on(peer: ValueModel, via: List[ValueModel], body: Node*) =
|
||||
Node(
|
||||
OnTag(peer, Chain.fromSeq(via)),
|
||||
|
@ -78,7 +78,68 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
call(2, otherPeer)
|
||||
)
|
||||
|
||||
proc should be(expected)
|
||||
proc.equalsOrPrintDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
"topology resolver" should "go through relay to any other node, via another relay, in complex xor/seq" in {
|
||||
|
||||
val init = on(
|
||||
initPeer,
|
||||
relay :: Nil,
|
||||
on(
|
||||
otherPeer,
|
||||
otherRelay :: Nil,
|
||||
xor(
|
||||
seq(
|
||||
call(1),
|
||||
call(2)
|
||||
),
|
||||
call(3)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
val proc: Node = Topology.resolve(init)
|
||||
|
||||
val expected =
|
||||
seq(
|
||||
through(relay),
|
||||
through(otherRelay),
|
||||
xor(
|
||||
seq(
|
||||
call(1, otherPeer),
|
||||
call(2, otherPeer)
|
||||
),
|
||||
call(3, otherPeer)
|
||||
)
|
||||
)
|
||||
|
||||
proc.equalsOrPrintDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
"topology resolver" should "simplify a route with init_peer_id" in {
|
||||
val init = on(
|
||||
initPeer,
|
||||
relay :: Nil,
|
||||
seq(
|
||||
on(
|
||||
initPeer,
|
||||
relay :: Nil,
|
||||
call(1)
|
||||
),
|
||||
call(2)
|
||||
)
|
||||
)
|
||||
|
||||
val proc: Node = Topology.resolve(init)
|
||||
|
||||
val expected =
|
||||
seq(
|
||||
call(1, initPeer),
|
||||
call(2, initPeer)
|
||||
)
|
||||
|
||||
proc.equalsOrPrintDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
"topology resolver" should "get back to init peer" in {
|
||||
@ -116,6 +177,46 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
proc.equalsOrPrintDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
"topology resolver" should "not stackoverflow" in {
|
||||
/*
|
||||
OnTag(LiteralModel(%init_peer_id%,ScalarType(string)),Chain(VarModel(-relay-,ScalarType(string),Chain()))) {
|
||||
SeqTag{
|
||||
CallServiceTag(LiteralModel("getDataSrv",ScalarType(string)),-relay-,Call(List(),Some(Export(-relay-,ScalarType(string)))),None)
|
||||
CallServiceTag(LiteralModel("getDataSrv",ScalarType(string)),node_id,Call(List(),Some(Export(node_id,ScalarType(string)))),None)
|
||||
CallServiceTag(LiteralModel("getDataSrv",ScalarType(string)),viaAr,Call(List(),Some(Export(viaAr,[]ScalarType(string)))),None)
|
||||
OnTag(VarModel(node_id,ScalarType(string),Chain()),Chain(VarModel(viaAr,[]ScalarType(string),Chain()))) {
|
||||
CallServiceTag(LiteralModel("cid",Literal(string)),ids,Call(List(),Some(Export(p,ScalarType(string)))),None)
|
||||
}
|
||||
OnTag(LiteralModel(%init_peer_id%,ScalarType(string)),Chain(VarModel(-relay-,ScalarType(string),Chain()))) {
|
||||
CallServiceTag(LiteralModel("callbackSrv",ScalarType(string)),response,Call(List(VarModel(p,ScalarType(string),Chain())),None),None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
val init = on(
|
||||
initPeer,
|
||||
relay :: Nil,
|
||||
seq(
|
||||
call(1),
|
||||
call(2),
|
||||
call(3),
|
||||
on(
|
||||
varNode,
|
||||
viaList :: Nil,
|
||||
call(4)
|
||||
),
|
||||
on(
|
||||
initPeer,
|
||||
relay :: Nil,
|
||||
call(5)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
Topology.resolve(init)
|
||||
}
|
||||
|
||||
"topology resolver" should "get back to init peer after a long chain" in {
|
||||
|
||||
val init = on(
|
||||
|
@ -17,7 +17,7 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
val func: FuncCallable =
|
||||
FuncCallable(
|
||||
"ret",
|
||||
FuncOp(on(otherPeer, Nil, call(1))),
|
||||
FuncOp(on(otherPeer, otherRelay :: Nil, call(1))),
|
||||
ArgsDef.empty,
|
||||
Some((ret, ScalarType.string)),
|
||||
Map.empty,
|
||||
@ -35,14 +35,17 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
seq(
|
||||
dataCall(bc, "-relay-", initPeer),
|
||||
through(relayV),
|
||||
through(otherRelay),
|
||||
xor(
|
||||
call(1, otherPeer),
|
||||
seq(
|
||||
through(otherRelay),
|
||||
through(relayV),
|
||||
errorCall(bc, 1, initPeer),
|
||||
through(relayV)
|
||||
)
|
||||
),
|
||||
through(otherRelay),
|
||||
through(relayV),
|
||||
xor(
|
||||
respCall(bc, ret, initPeer),
|
||||
@ -52,12 +55,13 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
)
|
||||
),
|
||||
seq(
|
||||
through(relayV),
|
||||
errorCall(bc, 3, initPeer)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
println(procFC)
|
||||
|
||||
procFC.equalsOrPrintDiff(expectedFC) should be(true)
|
||||
|
||||
}
|
||||
|
75
semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala
Normal file
75
semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala
Normal file
@ -0,0 +1,75 @@
|
||||
package aqua.semantics
|
||||
|
||||
//import aqua.AquaSpec
|
||||
//import aqua.model.transform._
|
||||
//import aqua.model.{AquaContext, Node, VarModel}
|
||||
//import aqua.parser.Ast
|
||||
//import aqua.parser.lift.{LiftParser, Span}
|
||||
//import aqua.types.ScalarType
|
||||
//import cats.data.Chain
|
||||
//import org.scalatest.flatspec.AnyFlatSpec
|
||||
//import org.scalatest.matchers.should.Matchers
|
||||
//
|
||||
//class SemanticsSpec extends AnyFlatSpec with Matchers with AquaSpec {
|
||||
//
|
||||
// // use it to fix https://github.com/fluencelabs/aqua/issues/90
|
||||
// "sem" should "create right model" in {
|
||||
// implicit val fileLift: LiftParser[Span.F] = Span.spanLiftParser
|
||||
//
|
||||
// val script =
|
||||
// """service CustomId("cid"):
|
||||
// | id(s: string) -> string
|
||||
// | ids() -> string
|
||||
// |
|
||||
// |func viaArr(node_id: string, viaAr: []string) -> string:
|
||||
// | on node_id via viaAr:
|
||||
// | p <- CustomId.ids()
|
||||
// | <- p""".stripMargin
|
||||
//
|
||||
// val ast = Ast.fromString(script).toList.head
|
||||
//
|
||||
// val ctx = AquaContext.blank
|
||||
// val bc = BodyConfig()
|
||||
// import bc.aquaContextMonoid
|
||||
//
|
||||
// val func = Semantics.process(ast, ctx).toList.head.funcs("viaArr")
|
||||
//
|
||||
// val initCallable: InitPeerCallable = InitViaRelayCallable(
|
||||
// Chain.fromOption(bc.relayVarName).map(VarModel(_, ScalarType.string))
|
||||
// )
|
||||
//
|
||||
// val argsProvider: ArgsProvider =
|
||||
// ArgsFromService(
|
||||
// bc.dataSrvId,
|
||||
// bc.relayVarName.map(_ -> ScalarType.string).toList ::: func.args.dataArgs.toList.map(add =>
|
||||
// add.name -> add.dataType
|
||||
// )
|
||||
// )
|
||||
//
|
||||
// val transform =
|
||||
// initCallable.transform _ compose argsProvider.transform
|
||||
//
|
||||
// val callback = initCallable.service(bc.callbackSrvId)
|
||||
//
|
||||
// val wrapFunc = ResolveFunc(
|
||||
// transform,
|
||||
// callback,
|
||||
// bc.respFuncName
|
||||
// )
|
||||
//
|
||||
// val tree =
|
||||
// wrapFunc.resolve(func).value.tree
|
||||
//
|
||||
// println(Node.cofToNode(tree))
|
||||
//
|
||||
// // SO
|
||||
//// Topology.resolve(
|
||||
//// Node.cofToNode(tree)
|
||||
//// )
|
||||
//
|
||||
// // or
|
||||
//// val expected =
|
||||
//// seq(par(on(LiteralModel("\"other-peer\"", LiteralType.string), Nil, callL(1)), callL(1)))
|
||||
//
|
||||
// }
|
||||
//}
|
Loading…
Reference in New Issue
Block a user