mirror of
https://github.com/fluencelabs/aqua.git
synced 2024-12-04 22:50:18 +00:00
Support options for via
clause in on
expression (#153)
* Support options for `via` clause in `on` expression * Remove the generated comment * Fix: provide stream as an argument Co-authored-by: Dima <dmitry.shakhtarin@fluence.ai>
This commit is contained in:
parent
6a96098227
commit
0e2ea88934
@ -23,7 +23,7 @@ func betterMessage(relay: string, arr: []string, opt: ?string, str: *string) ->
|
||||
on relay:
|
||||
Peer.is_connected("something")
|
||||
par isOnline <- Peer.is_connected(relay)
|
||||
par on "quray":
|
||||
par on "quray" via arr:
|
||||
Peer.is_connected("qurara")
|
||||
|
||||
stream: *string
|
||||
|
@ -6,6 +6,7 @@ import cats.syntax.show._
|
||||
abstract sealed class Keyword(val value: String)
|
||||
|
||||
object Keyword {
|
||||
case object NA extends Keyword("")
|
||||
|
||||
case object Null extends Keyword("null")
|
||||
|
||||
@ -93,24 +94,36 @@ object Air {
|
||||
case class Call(triplet: Triplet, args: List[DataView], result: Option[String])
|
||||
extends Air(Keyword.Call)
|
||||
|
||||
case class Comment(comment: String, air: Air) extends Air(Keyword.NA)
|
||||
|
||||
private def show(depth: Int, air: Air): String = {
|
||||
def showNext(a: Air) = show(depth + 1, a)
|
||||
|
||||
val space = " " * depth
|
||||
s"$space(${air.keyword.value}" +
|
||||
(air match {
|
||||
case Air.Null ⇒ ""
|
||||
case Air.Next(label) ⇒ s" $label"
|
||||
case Air.Fold(iter, label, inst) ⇒ s" ${iter.show} $label\n${showNext(inst)}$space"
|
||||
case Air.Match(left, right, inst) ⇒ s" ${left.show} ${right.show}\n${showNext(inst)}$space"
|
||||
case Air.Mismatch(left, right, inst) ⇒
|
||||
s" ${left.show} ${right.show}\n${showNext(inst)}$space"
|
||||
case Air.Par(l, r) ⇒ s"\n${showNext(l)}${showNext(r)}$space"
|
||||
case Air.Seq(l, r) ⇒ s"\n${showNext(l)}${showNext(r)}$space"
|
||||
case Air.Xor(l, r) ⇒ s"\n${showNext(l)}${showNext(r)}$space"
|
||||
case Air.Call(triplet, args, res) ⇒
|
||||
s" ${triplet.show} [${args.map(_.show).mkString(" ")}]${res.fold("")(" " + _)}"
|
||||
}) + ")\n"
|
||||
|
||||
air match {
|
||||
case Air.Comment(c, a) =>
|
||||
space + "; " + c.replace("\n", "\n" + space + "; ") + "\n" +
|
||||
show(depth, a)
|
||||
case _ =>
|
||||
s"$space(${air.keyword.value}" +
|
||||
(air match {
|
||||
case Air.Null ⇒ ""
|
||||
case Air.Next(label) ⇒ s" $label"
|
||||
case Air.Fold(iter, label, inst) ⇒ s" ${iter.show} $label\n${showNext(inst)}$space"
|
||||
case Air.Match(left, right, inst) ⇒
|
||||
s" ${left.show} ${right.show}\n${showNext(inst)}$space"
|
||||
case Air.Mismatch(left, right, inst) ⇒
|
||||
s" ${left.show} ${right.show}\n${showNext(inst)}$space"
|
||||
case Air.Par(l, r) ⇒ s"\n${showNext(l)}${showNext(r)}$space"
|
||||
case Air.Seq(l, r) ⇒ s"\n${showNext(l)}${showNext(r)}$space"
|
||||
case Air.Xor(l, r) ⇒ s"\n${showNext(l)}${showNext(r)}$space"
|
||||
case Air.Call(triplet, args, res) ⇒
|
||||
s" ${triplet.show} [${args.map(_.show).mkString(" ")}]${res.fold("")(" " + _)}"
|
||||
case Air.Comment(_, _) => ";; Should not be displayed"
|
||||
}) + ")\n"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
implicit val s: Show[Air] = Show.show(show(0, _))
|
||||
|
@ -8,6 +8,8 @@ import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
sealed trait AirGen {
|
||||
def generate: Air
|
||||
|
||||
@ -42,66 +44,71 @@ object AirGen {
|
||||
case list => list.reduceLeft(SeqGen)
|
||||
}
|
||||
|
||||
private def folder(op: OpTag, ops: Chain[AirGen]): Eval[AirGen] =
|
||||
op match {
|
||||
case mt: MetaTag =>
|
||||
folder(mt.op, ops).map(ag => mt.comment.fold(ag)(CommentGen(_, ag)))
|
||||
case SeqTag =>
|
||||
Eval later ops.toList.reduceLeftOption(SeqGen).getOrElse(NullGen)
|
||||
case ParTag =>
|
||||
Eval later ops.toList.reduceLeftOption(ParGen).getOrElse(NullGen)
|
||||
case XorTag =>
|
||||
Eval later ops.toList.reduceLeftOption(XorGen).getOrElse(NullGen)
|
||||
case XorTag.LeftBiased =>
|
||||
Eval later XorGen(opsToSingle(ops), NullGen)
|
||||
|
||||
case NextTag(item) =>
|
||||
Eval later NextGen(item)
|
||||
case MatchMismatchTag(left, right, shouldMatch) =>
|
||||
Eval later MatchMismatchGen(
|
||||
valueToData(left),
|
||||
valueToData(right),
|
||||
shouldMatch,
|
||||
opsToSingle(ops)
|
||||
)
|
||||
|
||||
case ForTag(item, iterable) =>
|
||||
Eval later ForGen(valueToData(iterable), item, opsToSingle(ops))
|
||||
case CallServiceTag(serviceId, funcName, Call(args, exportTo), peerId) =>
|
||||
Eval.later(
|
||||
ServiceCallGen(
|
||||
peerId.map(valueToData).getOrElse(DataView.InitPeerId),
|
||||
valueToData(serviceId),
|
||||
funcName,
|
||||
args.map(valueToData),
|
||||
exportTo.map {
|
||||
case Call.Export(name, _: StreamType) => "$" + name
|
||||
case Call.Export(name, _) => name
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
case CallArrowTag(funcName, _) =>
|
||||
// TODO: should be already resolved & removed from tree
|
||||
println(
|
||||
Console.RED + s"Unresolved arrow in AirGen: $funcName" + Console.RESET
|
||||
)
|
||||
Eval later NullGen
|
||||
|
||||
case OnTag(_, _) =>
|
||||
// TODO should be resolved
|
||||
Eval later opsToSingle(
|
||||
ops
|
||||
)
|
||||
case XorParTag(opsx, opsy) =>
|
||||
// TODO should be resolved
|
||||
println(
|
||||
Console.RED + "XorParTag reached AirGen, most likely it's an error" + Console.RESET
|
||||
)
|
||||
Eval later opsToSingle(
|
||||
Chain(apply(opsx.tree), apply(opsy.tree))
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
def apply(op: Cofree[Chain, OpTag]): AirGen =
|
||||
Cofree
|
||||
.cata[Chain, OpTag, AirGen](op) {
|
||||
case (SeqTag, ops) =>
|
||||
Eval later ops.toList.reduceLeftOption(SeqGen).getOrElse(NullGen)
|
||||
case (ParTag, ops) =>
|
||||
Eval later ops.toList.reduceLeftOption(ParGen).getOrElse(NullGen)
|
||||
case (XorTag, ops) =>
|
||||
Eval later ops.toList.reduceLeftOption(XorGen).getOrElse(NullGen)
|
||||
case (XorTag.LeftBiased, ops) =>
|
||||
Eval later XorGen(opsToSingle(ops), NullGen)
|
||||
|
||||
case (NextTag(item), _) =>
|
||||
Eval later NextGen(item)
|
||||
case (MatchMismatchTag(left, right, shouldMatch), ops) =>
|
||||
Eval later MatchMismatchGen(
|
||||
valueToData(left),
|
||||
valueToData(right),
|
||||
shouldMatch,
|
||||
opsToSingle(ops)
|
||||
)
|
||||
|
||||
case (ForTag(item, iterable), ops) =>
|
||||
Eval later ForGen(valueToData(iterable), item, opsToSingle(ops))
|
||||
case (CallServiceTag(serviceId, funcName, Call(args, exportTo), peerId), _) =>
|
||||
Eval.later(
|
||||
ServiceCallGen(
|
||||
peerId.map(valueToData).getOrElse(DataView.InitPeerId),
|
||||
valueToData(serviceId),
|
||||
funcName,
|
||||
args.map(valueToData),
|
||||
exportTo.map {
|
||||
case Call.Export(name, _: StreamType) => "$" + name
|
||||
case Call.Export(name, _) => name
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
case (CallArrowTag(funcName, _), _) =>
|
||||
// TODO: should be already resolved & removed from tree
|
||||
println(
|
||||
Console.RED + s"Unresolved arrow in AirGen: $funcName" + Console.RESET
|
||||
)
|
||||
Eval later NullGen
|
||||
|
||||
case (OnTag(_, _), ops) =>
|
||||
// TODO should be resolved
|
||||
Eval later opsToSingle(
|
||||
ops
|
||||
)
|
||||
case (XorParTag(opsx, opsy), _) =>
|
||||
// TODO should be resolved
|
||||
println(
|
||||
Console.RED + "XorParTag reached AirGen, most likely it's an error" + Console.RESET
|
||||
)
|
||||
Eval later opsToSingle(
|
||||
Chain(apply(opsx.tree), apply(opsy.tree))
|
||||
)
|
||||
|
||||
}
|
||||
.cata[Chain, OpTag, AirGen](op)(folder)
|
||||
.value
|
||||
}
|
||||
|
||||
@ -116,6 +123,12 @@ case class SeqGen(left: AirGen, right: AirGen) extends AirGen {
|
||||
|
||||
}
|
||||
|
||||
case class CommentGen(comment: String, op: AirGen) extends AirGen {
|
||||
|
||||
override def generate: Air =
|
||||
Air.Comment(comment, op.generate)
|
||||
}
|
||||
|
||||
case class MatchMismatchGen(
|
||||
left: DataView,
|
||||
right: DataView,
|
||||
|
@ -62,4 +62,16 @@ object FuncOps {
|
||||
|
||||
def next(item: String): FuncOp =
|
||||
FuncOp.leaf(NextTag(item))
|
||||
|
||||
def meta(op: FuncOp, skipTopology: Boolean = false, comment: String = null): FuncOp =
|
||||
FuncOp(
|
||||
op.tree.copy(
|
||||
MetaTag(
|
||||
skipTopology = skipTopology,
|
||||
comment = Option(comment),
|
||||
op.head
|
||||
),
|
||||
op.tree.tail
|
||||
)
|
||||
)
|
||||
}
|
||||
|
@ -44,6 +44,12 @@ case class MatchMismatchTag(left: ValueModel, right: ValueModel, shouldMatch: Bo
|
||||
extends SeqGroupTag
|
||||
case class ForTag(item: String, iterable: ValueModel) extends SeqGroupTag
|
||||
|
||||
case class MetaTag(
|
||||
skipTopology: Boolean,
|
||||
comment: Option[String],
|
||||
op: OpTag
|
||||
) extends OpTag
|
||||
|
||||
case class CallArrowTag(
|
||||
funcName: String,
|
||||
call: Call
|
||||
|
@ -11,6 +11,16 @@ case class Cursor(point: ChainZipper[Tree], loc: Location) {
|
||||
def downLoc(tree: Tree): Location =
|
||||
loc.down(point.copy(current = tree))
|
||||
|
||||
def mapParent(f: Tree => Tree): Cursor =
|
||||
copy(loc =
|
||||
Location(
|
||||
loc.path match {
|
||||
case parent :: tail => parent.copy(current = f(parent.current)) :: tail
|
||||
case path => path
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
def prevOnTags: Chain[OnTag] =
|
||||
Chain
|
||||
.fromSeq(
|
||||
|
@ -1,12 +1,15 @@
|
||||
package aqua.model.topology
|
||||
|
||||
import aqua.model.ValueModel
|
||||
import aqua.model.{ValueModel, VarModel}
|
||||
import aqua.model.func.body._
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
import ChainZipper.Matchers._
|
||||
import Location.Matchers._
|
||||
import aqua.types.{BoxType, ScalarType}
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
object Topology {
|
||||
type Tree = Cofree[Chain, OpTag]
|
||||
@ -14,14 +17,39 @@ object Topology {
|
||||
// Walks through peer IDs, doing a noop function on each
|
||||
// If same IDs are found in a row, does noop only once
|
||||
// if there's a chain like a -> b -> c -> ... -> b -> g, remove everything between b and b
|
||||
def through(peerIds: Chain[ValueModel]): Chain[Tree] =
|
||||
def through(peerIds: Chain[ValueModel], reversed: Boolean = false): Chain[Tree] =
|
||||
peerIds
|
||||
.foldLeft(Chain.empty[ValueModel]) {
|
||||
case (acc, p) if acc.lastOption.contains(p) => acc
|
||||
case (acc, p) if acc.contains(p) => acc.takeWhile(_ != p) :+ p
|
||||
case (acc, p) => acc :+ p
|
||||
}
|
||||
.map(FuncOps.noop)
|
||||
.map { v =>
|
||||
v.lastType match {
|
||||
case _: BoxType =>
|
||||
val itemName = "-via-peer-"
|
||||
|
||||
FuncOps.meta(
|
||||
FuncOps.fold(
|
||||
itemName,
|
||||
v,
|
||||
if (reversed)
|
||||
FuncOps.seq(
|
||||
FuncOps.next(itemName),
|
||||
FuncOps.noop(VarModel(itemName, ScalarType.string))
|
||||
)
|
||||
else
|
||||
FuncOps.seq(
|
||||
FuncOps.noop(VarModel(itemName, ScalarType.string)),
|
||||
FuncOps.next(itemName)
|
||||
)
|
||||
),
|
||||
skipTopology = true
|
||||
)
|
||||
case _ =>
|
||||
FuncOps.noop(v)
|
||||
}
|
||||
}
|
||||
.map(_.tree)
|
||||
|
||||
def mapTag(tag: OpTag, loc: Location): OpTag = tag match {
|
||||
@ -47,50 +75,57 @@ object Topology {
|
||||
}
|
||||
.value
|
||||
|
||||
@tailrec
|
||||
private def transformWalker(c: Cursor): List[Tree] =
|
||||
c match {
|
||||
case Cursor(_, `head`(parent: MetaTag) /: _) if !parent.skipTopology =>
|
||||
transformWalker(c.mapParent(p => p.copy(parent.op, p.tail)))
|
||||
|
||||
case Cursor(
|
||||
`current`(cf),
|
||||
loc @ `head`(parent: GroupTag) /: _
|
||||
) =>
|
||||
val cfu = cf.copy(mapTag(cf.head, loc))
|
||||
|
||||
val getThere = (cfu.head, loc.pathOn) match {
|
||||
case (OnTag(pid, _), h :: _) if h.peerId == pid => Chain.empty[ValueModel]
|
||||
case (OnTag(_, via), h :: _) =>
|
||||
h.via.reverse ++ via
|
||||
case (_, _) => Chain.empty[ValueModel]
|
||||
}
|
||||
|
||||
val prevOn = c.prevOnTags
|
||||
|
||||
val prevPath = prevOn.map { case OnTag(_, v) =>
|
||||
v.reverse
|
||||
}
|
||||
.flatMap(identity)
|
||||
|
||||
val nextOn = parent match {
|
||||
case ParTag | XorTag => c.nextOnTags
|
||||
case _ => Chain.empty[OnTag]
|
||||
}
|
||||
val nextPath = (if (nextOn.nonEmpty) getThere.reverse else Chain.empty) ++ nextOn.map {
|
||||
case OnTag(_, v) =>
|
||||
v.reverse
|
||||
}
|
||||
.flatMap(identity) ++ Chain.fromOption(
|
||||
// Dirty fix for join behaviour
|
||||
nextOn.lastOption.filter(_ => parent == ParTag).map(_.peerId)
|
||||
)
|
||||
|
||||
if (prevOn.isEmpty && getThere.isEmpty) cfu :: Nil
|
||||
else
|
||||
(through(prevPath ++ loc.pathViaChain ++ getThere)
|
||||
.append(cfu) ++ through(nextPath, reversed = true)).toList
|
||||
|
||||
case Cursor(ChainZipper(_, cf, _), loc) =>
|
||||
cf.copy(mapTag(cf.head, loc)) :: Nil
|
||||
}
|
||||
|
||||
def resolveOnMoves(op: Tree): Tree =
|
||||
Cursor
|
||||
.transform(op) {
|
||||
case c @ Cursor(
|
||||
`current`(cf),
|
||||
loc @ `head`(parent: GroupTag) /: _
|
||||
) =>
|
||||
val cfu = cf.copy(mapTag(cf.head, loc))
|
||||
|
||||
val getThere = (cfu.head, loc.pathOn) match {
|
||||
case (OnTag(pid, _), h :: _) if h.peerId == pid => Chain.empty[ValueModel]
|
||||
case (OnTag(_, via), h :: _) =>
|
||||
h.via.reverse ++ via
|
||||
case (_, _) => Chain.empty[ValueModel]
|
||||
}
|
||||
|
||||
val prevOn = c.prevOnTags
|
||||
|
||||
val prevPath = prevOn.map { case OnTag(_, v) =>
|
||||
v.reverse
|
||||
}
|
||||
.flatMap(identity)
|
||||
|
||||
val nextOn = parent match {
|
||||
case ParTag | XorTag => c.nextOnTags
|
||||
case _ => Chain.empty[OnTag]
|
||||
}
|
||||
val nextPath = (if (nextOn.nonEmpty) getThere.reverse else Chain.empty) ++ nextOn.map {
|
||||
case OnTag(_, v) =>
|
||||
v.reverse
|
||||
}
|
||||
.flatMap(identity) ++ Chain.fromOption(
|
||||
// Dirty fix for join behaviour
|
||||
nextOn.lastOption.filter(_ => parent == ParTag).map(_.peerId)
|
||||
)
|
||||
|
||||
if (prevOn.isEmpty && getThere.isEmpty) cfu :: Nil
|
||||
else
|
||||
(through(prevPath ++ loc.pathViaChain ++ getThere)
|
||||
.append(cfu) ++ through(nextPath)).toList
|
||||
|
||||
case Cursor(ChainZipper(_, cf, _), loc) =>
|
||||
cf.copy(mapTag(cf.head, loc)) :: Nil
|
||||
}
|
||||
.transform(op)(transformWalker)
|
||||
.getOrElse(op)
|
||||
|
||||
}
|
||||
|
@ -13,21 +13,21 @@ trait ArgsProvider {
|
||||
case class ArgsFromService(dataServiceId: ValueModel, names: List[(String, DataType)])
|
||||
extends ArgsProvider {
|
||||
|
||||
private def getDataElOp(name: String, t: DataType, el: Type): FuncOp = {
|
||||
private def getStreamDataOp(name: String, t: StreamType): FuncOp = {
|
||||
val iter = s"$name-iter"
|
||||
val item = s"$name-item"
|
||||
FuncOps.seq(
|
||||
FuncOps.callService(
|
||||
dataServiceId,
|
||||
name,
|
||||
Call(Nil, Some(Call.Export(iter, t)))
|
||||
Call(Nil, Some(Call.Export(iter, ArrayType(t.element))))
|
||||
),
|
||||
FuncOps.fold(
|
||||
item,
|
||||
VarModel(iter, ArrayType(el), Chain.empty),
|
||||
VarModel(iter, ArrayType(t.element), Chain.empty),
|
||||
FuncOps.seq(
|
||||
// TODO: currently this does not work, as identity wraps everything with an array
|
||||
FuncOps.identity(VarModel(item, el), Call.Export(name, t)),
|
||||
FuncOps.identity(VarModel(item, t.element), Call.Export(name, t)),
|
||||
FuncOps.next(item)
|
||||
)
|
||||
)
|
||||
@ -36,8 +36,8 @@ case class ArgsFromService(dataServiceId: ValueModel, names: List[(String, DataT
|
||||
|
||||
def getDataOp(name: String, t: DataType): FuncOp =
|
||||
t match {
|
||||
case StreamType(el) =>
|
||||
getDataElOp(name, t, el)
|
||||
case st: StreamType =>
|
||||
getStreamDataOp(name, st)
|
||||
case _ =>
|
||||
FuncOps.callService(
|
||||
dataServiceId,
|
||||
|
@ -1,42 +1,57 @@
|
||||
package aqua.semantics.expr
|
||||
|
||||
import aqua.model.Model
|
||||
import aqua.model.{Model, ValueModel}
|
||||
import aqua.model.func.body.{FuncOp, OnTag}
|
||||
import aqua.parser.expr.OnExpr
|
||||
import aqua.semantics.Prog
|
||||
import aqua.semantics.rules.ValuesAlgebra
|
||||
import aqua.semantics.rules.abilities.AbilitiesAlgebra
|
||||
import aqua.semantics.rules.types.TypesAlgebra
|
||||
import aqua.types.{BoxType, OptionType, ScalarType}
|
||||
import cats.Traverse
|
||||
import cats.data.Chain
|
||||
import cats.free.Free
|
||||
import cats.syntax.apply._
|
||||
import cats.syntax.functor._
|
||||
import cats.syntax.flatMap._
|
||||
|
||||
class OnSem[F[_]](val expr: OnExpr[F]) extends AnyVal {
|
||||
|
||||
def program[Alg[_]](implicit
|
||||
V: ValuesAlgebra[F, Alg],
|
||||
T: TypesAlgebra[F, Alg],
|
||||
A: AbilitiesAlgebra[F, Alg]
|
||||
): Prog[Alg, Model] =
|
||||
Prog.around(
|
||||
expr.via.foldLeft(
|
||||
V.ensureIsString(expr.peerId)
|
||||
) { case (acc, v) =>
|
||||
acc >> V.ensureIsString(v)
|
||||
(
|
||||
V.ensureIsString(expr.peerId),
|
||||
Traverse[List]
|
||||
.traverse(expr.via)(v =>
|
||||
V.valueToModel(v).flatTap {
|
||||
case Some(vm) =>
|
||||
vm.lastType match {
|
||||
case _: BoxType =>
|
||||
T.ensureTypeMatches(v, OptionType(ScalarType.string), vm.lastType)
|
||||
case _ =>
|
||||
T.ensureTypeMatches(v, ScalarType.string, vm.lastType)
|
||||
}
|
||||
case None => Free.pure(false)
|
||||
}
|
||||
)
|
||||
.map(_.flatten)
|
||||
).mapN { case (_, viaVM) =>
|
||||
viaVM
|
||||
}
|
||||
>> A.beginScope(expr.peerId),
|
||||
(_: Unit, ops: Model) =>
|
||||
<* A.beginScope(expr.peerId),
|
||||
(viaVM: List[ValueModel], ops: Model) =>
|
||||
A.endScope() >> (ops match {
|
||||
case op: FuncOp =>
|
||||
(
|
||||
V.valueToModel(expr.peerId),
|
||||
Traverse[List].traverse(expr.via)(V.valueToModel).map(_.flatten)
|
||||
).mapN {
|
||||
case (Some(om), via) =>
|
||||
V.valueToModel(expr.peerId).map {
|
||||
case Some(om) =>
|
||||
FuncOp.wrap(
|
||||
OnTag(
|
||||
om,
|
||||
Chain.fromSeq(via)
|
||||
Chain.fromSeq(viaVM)
|
||||
),
|
||||
op
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user