From 8060695dbb0a2f34febf739eb20db8b8781b3682 Mon Sep 17 00:00:00 2001 From: Dima Date: Mon, 21 Aug 2023 12:26:30 +0200 Subject: [PATCH] feat: parseq implementation (fixes LNG-223) (#840) * parsec implementation * add test code * parsec expression * Refactor * Refactor * Add return strategy to on * Add ExitStrategy * Add TopologyPath * Add ExitStrategy.ToRelay * Handle ToRelay * Refactor * Refactor * Refactor * Handle OnModel with Relay strategy * parsec -> parseq * parsec -> parseq * Add semantics test * Add topology tests * Add comments --------- Co-authored-by: InversionSpaces --- aqua-src/antithesis.aqua | 45 ++------ .../main/scala/aqua/backend/air/AirGen.scala | 4 +- .../aqua/compiler/AquaCompilerSpec.scala | 19 +--- integration-tests/aqua/examples/parseq.aqua | 26 +++++ .../scala/aqua/model/inline/TagInliner.scala | 21 ++-- .../inline/raw/ApplyGateRawInliner.scala | 8 +- .../aqua/model/inline/ArrowInlinerSpec.scala | 4 +- .../src/main/scala/aqua/raw/ops/RawTag.scala | 38 +++++-- .../src/test/scala/aqua/res/ResBuilder.scala | 3 +- model/src/main/scala/aqua/model/OpModel.scala | 38 +++++-- .../topology/OpModelTreeCursor.scala | 46 ++++++-- .../model/transform/topology/PathFinder.scala | 106 ++++++++++-------- .../model/transform/topology/Topology.scala | 93 ++++++++++----- .../transform/topology/TopologyPath.scala | 65 +++++++++++ .../transform/topology/strategy/After.scala | 32 +++--- .../transform/topology/strategy/Before.scala | 3 +- .../transform/topology/strategy/Begins.scala | 4 +- .../transform/topology/strategy/Ends.scala | 15 ++- .../transform/topology/strategy/Fail.scala | 12 +- .../transform/topology/strategy/For.scala | 13 ++- .../topology/strategy/ParGroup.scala | 28 ++--- .../topology/strategy/ParGroupBranch.scala | 27 ++++- .../transform/topology/strategy/Root.scala | 10 +- .../topology/strategy/SeqGroup.scala | 3 +- .../topology/strategy/SeqGroupBranch.scala | 5 +- .../transform/topology/strategy/SeqNext.scala | 3 +- .../topology/strategy/XorBranch.scala | 18 ++- .../topology/strategy/XorGroup.scala | 3 +- .../aqua/model/transform/ModelBuilder.scala | 9 +- .../transform/topology/TopologySpec.scala | 98 ++++++++++++++-- .../aqua/parser/expr/func/ArrowExpr.scala | 1 + .../scala/aqua/parser/expr/func/OnExpr.scala | 10 +- .../aqua/parser/expr/func/ParSeqExpr.scala | 39 +++++++ .../main/scala/aqua/parser/lexer/Token.scala | 1 + parser/src/test/scala/aqua/AquaSpec.scala | 3 + .../scala/aqua/parser/ParSecExprSpec.scala | 36 ++++++ .../main/scala/aqua/semantics/ExprSem.scala | 1 + .../aqua/semantics/expr/func/ForSem.scala | 38 ++++--- .../aqua/semantics/expr/func/OnSem.scala | 81 +++++++------ .../aqua/semantics/expr/func/ParSeqSem.scala | 77 +++++++++++++ .../abilities/AbilitiesInterpreter.scala | 15 +-- .../rules/topology/TopologyAlgebra.scala | 11 -- .../scala/aqua/semantics/SemanticsSpec.scala | 60 ++++++++++ 43 files changed, 846 insertions(+), 326 deletions(-) create mode 100644 integration-tests/aqua/examples/parseq.aqua create mode 100644 model/transform/src/main/scala/aqua/model/transform/topology/TopologyPath.scala create mode 100644 parser/src/main/scala/aqua/parser/expr/func/ParSeqExpr.scala create mode 100644 parser/src/test/scala/aqua/parser/ParSecExprSpec.scala create mode 100644 semantics/src/main/scala/aqua/semantics/expr/func/ParSeqSem.scala delete mode 100644 semantics/src/main/scala/aqua/semantics/rules/topology/TopologyAlgebra.scala diff --git a/aqua-src/antithesis.aqua b/aqua-src/antithesis.aqua index 4bf24584..72ac4e01 100644 --- a/aqua-src/antithesis.aqua +++ b/aqua-src/antithesis.aqua @@ -1,35 +1,12 @@ -aqua Aaa +service Console("run-console"): + print(s: string) -import "builtin.aqua" - -export structuralTypingTest - -data WideData: - s: string - n: u32 - -data ExactData: - s: string - -ability ExactAbility: - s: string - arr(s: string, s2: string, s3: string, s4: string) -> string - exact: ExactData - -ability WideAbility: - s: string - arr(s: string, s2: string, s3: string, s4: string) -> string - g: string - exact: WideData - -func ss(s1: string, s2: string, s3: string, s4: string) -> string: - <- Op.concat_strings(Op.concat_strings(Op.concat_strings(s1, s2), s3), s4) - -func main{ExactAbility}(someData: ExactData, secondData: ExactData) -> string: - <- ExactAbility.arr(someData.s, ExactAbility.exact.s, secondData.s, ExactAbility.s) - -func structuralTypingTest() -> string: - wd = WideData(s = "some_string", n = 32) - - WAbility = WideAbility(s = "ab_string", g = "", arr = ss, exact = wd) - <- main{WAbility}(wd, WAbility.exact) +func main(): + ss: *string + dd: *string + peerId = "peerId" + relay = "relay" + parsec s <- ss on peerId via relay: + Console.print(s) + for d <- dd par: + Console.print(d) diff --git a/backend/air/src/main/scala/aqua/backend/air/AirGen.scala b/backend/air/src/main/scala/aqua/backend/air/AirGen.scala index eaca0d74..97ec846f 100644 --- a/backend/air/src/main/scala/aqua/backend/air/AirGen.scala +++ b/backend/air/src/main/scala/aqua/backend/air/AirGen.scala @@ -96,8 +96,8 @@ object AirGen extends Logging { case FoldRes(item, iterable, mode) => val m = mode.map { - case ForModel.NullMode => NullGen - case ForModel.NeverMode => NeverGen + case ForModel.Mode.Null => NullGen + case ForModel.Mode.Never => NeverGen } Eval later ForGen(valueToData(iterable), item, opsToSingle(ops), m) case RestrictionRes(item, itemType) => diff --git a/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala b/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala index 1e688d2e..2fdd2c24 100644 --- a/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala +++ b/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala @@ -19,28 +19,17 @@ import aqua.parser.lift.Span import aqua.parser.lift.Span.S import aqua.raw.ConstantRaw import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw} -import aqua.res.{ - ApRes, - CallRes, - CallServiceRes, - CanonRes, - FoldRes, - MakeRes, - MatchMismatchRes, - NextRes, - ParRes, - RestrictionRes, - SeqRes, - XorRes -} +import aqua.res.* import aqua.res.ResBuilder import aqua.types.{ArrayType, CanonStreamType, LiteralType, ScalarType, StreamType, Type} + import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import cats.Id import cats.data.{Chain, NonEmptyChain, NonEmptyMap, Validated, ValidatedNec} import cats.instances.string.* import cats.syntax.show.* +import cats.syntax.option.* class AquaCompilerSpec extends AnyFlatSpec with Matchers { import ModelBuilder.* @@ -173,7 +162,7 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { RestrictionRes(results.name, resultsType).wrap( SeqRes.wrap( ParRes.wrap( - FoldRes(peer.name, peers, Some(ForModel.NeverMode)).wrap( + FoldRes(peer.name, peers, ForModel.Mode.Never.some).wrap( ParRes.wrap( XorRes.wrap( // better if first relay will be outside `for` diff --git a/integration-tests/aqua/examples/parseq.aqua b/integration-tests/aqua/examples/parseq.aqua new file mode 100644 index 00000000..8ab129b5 --- /dev/null +++ b/integration-tests/aqua/examples/parseq.aqua @@ -0,0 +1,26 @@ +import "@fluencelabs/aqua-lib/builtin.aqua" + +service NumOp("op"): + identity(n: u64) -> u64 + +data PeerRelay: + peer: string + relay: string + +func testParSeq(peer1: string, peer2: string, peer3: string, relay1: string, relay2: string, relay3: string) -> string: + pr1 = PeerRelay(peer = peer1, relay = relay1) + pr2 = PeerRelay(peer = peer2, relay = relay2) + pr3 = PeerRelay(peer = peer3, relay = relay3) + peers = [pr1, pr2, pr3] + stream: *u64 + stream2: *u64 + parseq p <- peers on p.peer via p.relay: + stream <- Peer.timestamp_ms() + + for p <- peers par: + on p.peer via p.relay: + join stream[peers.length - 1] + stream2 <<- Peer.timestamp_ms() + + join stream2[peers.length - 1] + <- "ok" \ No newline at end of file diff --git a/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala b/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala index 0aceada5..03293187 100644 --- a/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala @@ -143,14 +143,8 @@ object TagInliner extends Logging { v.copy(properties = Chain.empty), CallModel.Export(canonV.name, canonV.baseType) ).leaf - flatResult <- flatCanonStream(canonV, Some(canonOp)) - } yield { - val (resV, resOp) = flatResult - (resV, combineOpsWithSeq(op, resOp)) - } - case v @ VarModel(_, CanonStreamType(_), _) => - flatCanonStream(v, op) - case _ => State.pure((vm, op)) + } yield (canonV, combineOpsWithSeq(op, canonOp.some)) + case _ => (vm, op).pure } } @@ -186,7 +180,7 @@ object TagInliner extends Logging { treeFunctionName: String ): State[S, TagInlined] = tag match { - case OnTag(peerId, via) => + case OnTag(peerId, via, strategy) => for { peerIdDe <- valueToModel(peerId) viaDe <- valueListToModel(via.toList) @@ -196,9 +190,12 @@ object TagInliner extends Logging { (pid, pif) = peerIdDe (viaD, viaF) = viaDeFlattened.unzip .bimap(Chain.fromSeq, _.flatten) + strat = strategy.map { case OnTag.ReturnStrategy.Relay => + OnModel.ReturnStrategy.Relay + } toModel = (children: Chain[OpModel.Tree]) => XorModel.wrap( - OnModel(pid, viaD).wrap( + OnModel(pid, viaD, strat).wrap( children ), // This will return to previous topology @@ -289,8 +286,8 @@ object TagInliner extends Logging { } _ <- Exports[S].resolved(item, VarModel(n, elementType)) m = mode.map { - case ForTag.WaitMode => ForModel.NeverMode - case ForTag.PassMode => ForModel.NullMode + case ForTag.Mode.Wait => ForModel.Mode.Never + case ForTag.Mode.Pass => ForModel.Mode.Null } } yield TagInlined.Single( model = ForModel(n, v, m), diff --git a/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala b/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala index 61ee9f86..0b3f4b49 100644 --- a/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala @@ -4,11 +4,13 @@ import aqua.model.* import aqua.model.inline.Inline import aqua.model.inline.state.{Arrows, Exports, Mangler} import aqua.raw.value.{ApplyGateRaw, LiteralRaw, VarRaw} -import cats.data.State -import cats.data.Chain import aqua.model.inline.RawValueInliner.unfold import aqua.types.{ArrayType, CanonStreamType, ScalarType, StreamType} + +import cats.data.State +import cats.data.Chain import cats.syntax.monoid.* +import cats.syntax.option.* import scribe.Logging object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging { @@ -63,7 +65,7 @@ object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging { RestrictionModel(varSTest.name, streamType).wrap( increment(idxModel, incrVar), - ForModel(iter.name, VarModel(streamName, streamType), Some(ForModel.NeverMode)).wrap( + ForModel(iter.name, VarModel(streamName, streamType), ForModel.Mode.Never.some).wrap( PushToStreamModel( iter, CallModel.Export(varSTest.name, varSTest.`type`) diff --git a/model/inline/src/test/scala/aqua/model/inline/ArrowInlinerSpec.scala b/model/inline/src/test/scala/aqua/model/inline/ArrowInlinerSpec.scala index edacb3e1..ba2c39c9 100644 --- a/model/inline/src/test/scala/aqua/model/inline/ArrowInlinerSpec.scala +++ b/model/inline/src/test/scala/aqua/model/inline/ArrowInlinerSpec.scala @@ -1623,7 +1623,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers { ) val foldOp = - ForTag(iVar.name, array, Some(ForTag.WaitMode)).wrap(inFold, NextTag(iVar.name).leaf) + ForTag(iVar.name, array, ForTag.Mode.Wait.some).wrap(inFold, NextTag(iVar.name).leaf) val model: OpModel.Tree = ArrowInliner .callArrow[InliningState]( @@ -1649,7 +1649,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers { ._2 model.equalsOrShowDiff( - ForModel(iVar0.name, ValueModel.fromRaw(array), Some(ForModel.NeverMode)).wrap( + ForModel(iVar0.name, ValueModel.fromRaw(array), ForModel.Mode.Never.some).wrap( CallServiceModel( LiteralModel.fromRaw(serviceId), fnName, diff --git a/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala b/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala index 209616d4..dfa5ee7b 100644 --- a/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala +++ b/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala @@ -82,6 +82,7 @@ case object ParTag extends ParGroupTag { } case class IfTag(value: ValueRaw) extends GroupTag { + override def mapValues(f: ValueRaw => ValueRaw): RawTag = IfTag(value.map(f)) } @@ -110,13 +111,34 @@ case object TryTag extends GroupTag { case object Otherwise extends GroupTag } -case class OnTag(peerId: ValueRaw, via: Chain[ValueRaw]) extends SeqGroupTag { +case class OnTag( + peerId: ValueRaw, + via: Chain[ValueRaw], + // Strategy of returning from this `on` block + // affects handling of this `on` in topology layer + strategy: Option[OnTag.ReturnStrategy] = None +) extends SeqGroupTag { override def mapValues(f: ValueRaw => ValueRaw): RawTag = - OnTag(peerId.map(f), via.map(_.map(f))) + OnTag(peerId.map(f), via.map(_.map(f)), strategy) - override def toString: String = - s"(on $peerId${if (via.nonEmpty) " via " + via.toList.mkString(" via ") else ""})" + override def toString: String = { + val viaPart = if (via.nonEmpty) " via " + via.toList.mkString(" via ") else "" + val strategyPart = strategy.fold("")(s => s" | $s") + s"(on $peerId$viaPart$strategyPart)" + } +} + +object OnTag { + + // Return strategy of `on` block + // affects handling of `on` in topology layer + enum ReturnStrategy { + // Leave peer to the first relay + // Do not make the whole back transition + // NOTE: used for `parseq` + case Relay + } } case class NextTag(item: String) extends RawTag { @@ -148,9 +170,11 @@ case class ForTag(item: String, iterable: ValueRaw, mode: Option[ForTag.Mode] = } object ForTag { - sealed trait Mode - case object WaitMode extends Mode - case object PassMode extends Mode + + enum Mode { + case Wait + case Pass + } } case class CallArrowRawTag( diff --git a/model/res/src/test/scala/aqua/res/ResBuilder.scala b/model/res/src/test/scala/aqua/res/ResBuilder.scala index 4bfe108e..7c38cb6a 100644 --- a/model/res/src/test/scala/aqua/res/ResBuilder.scala +++ b/model/res/src/test/scala/aqua/res/ResBuilder.scala @@ -5,6 +5,7 @@ import aqua.types.* import aqua.raw.value.* import cats.data.Chain +import cats.syntax.option.* object ResBuilder { @@ -27,7 +28,7 @@ object ResBuilder { ), peer ).leaf, - FoldRes(iter.name, stream, Some(ForModel.NeverMode)).wrap( + FoldRes(iter.name, stream, ForModel.Mode.Never.some).wrap( ApRes(iter, CallModel.Export(testVM.name, testVM.`type`)).leaf, CanonRes(testVM, peer, CallModel.Export(canon.name, canon.`type`)).leaf, XorRes.wrap( diff --git a/model/src/main/scala/aqua/model/OpModel.scala b/model/src/main/scala/aqua/model/OpModel.scala index 3a6a9446..9a7e247c 100644 --- a/model/src/main/scala/aqua/model/OpModel.scala +++ b/model/src/main/scala/aqua/model/OpModel.scala @@ -22,6 +22,7 @@ sealed trait OpModel extends TreeNode[OpModel] { def usesVarNames: Set[String] = Set.empty // What var names are exported – can be used AFTER this tag is executed + // NOTE: Exported names could be restricted, see `restrictsVarNames` def exportsVarNames: Set[String] = Set.empty } @@ -91,15 +92,36 @@ case object XorModel extends GroupOpModel { .getOrElse(EmptyModel.leaf) } -case class OnModel(peerId: ValueModel, via: Chain[ValueModel]) extends SeqGroupModel { +case class OnModel( + peerId: ValueModel, + via: Chain[ValueModel], + // Strategy of returning from this `on` + // affects handling this `on` in topology layer + strategy: Option[OnModel.ReturnStrategy] = None +) extends SeqGroupModel { - override def toString: String = - s"on $peerId${if (via.nonEmpty) s" via ${via.toList.mkString(", ")}" else ""}" + override def toString: String = { + val viaPart = if (via.nonEmpty) s" via ${via.toList.mkString(", ")}" else "" + val strategyPart = strategy.map(s => s" | to ${s.toString.toLowerCase}").getOrElse("") + s"on $peerId$viaPart$strategyPart" + } override lazy val usesVarNames: Set[String] = peerId.usesVarNames ++ via.iterator.flatMap(_.usesVarNames) } +object OnModel { + + // Strategy of returning from `on` + // affects handling `on` in topology layer + enum ReturnStrategy { + // Leave peer to the first relay + // Do not make the whole back transition + // NOTE: used for `parseq` + case Relay + } +} + case class NextModel(item: String) extends OpModel { override def usesVarNames: Set[String] = Set(item) @@ -125,7 +147,7 @@ case class MatchMismatchModel(left: ValueModel, right: ValueModel, shouldMatch: case class ForModel( item: String, iterable: ValueModel, - mode: Option[ForModel.Mode] = Some(ForModel.NullMode) + mode: Option[ForModel.Mode] = Some(ForModel.Mode.Null) ) extends SeqGroupModel { override def toString: String = @@ -138,9 +160,11 @@ case class ForModel( } object ForModel { - sealed trait Mode - case object NullMode extends Mode - case object NeverMode extends Mode + + enum Mode { + case Null + case Never + } } // TODO how is it used? remove, if it's not diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/OpModelTreeCursor.scala b/model/transform/src/main/scala/aqua/model/transform/topology/OpModelTreeCursor.scala index 25d0b467..cf857e30 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/OpModelTreeCursor.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/OpModelTreeCursor.scala @@ -1,11 +1,16 @@ package aqua.model.transform.topology import aqua.model.* +import aqua.model.transform.cursor.* + import cats.Eval import cats.data.{Chain, NonEmptyList, OptionT} -import aqua.model.transform.cursor.* import cats.syntax.traverse.* import cats.syntax.show.* +import cats.syntax.foldable.* +import cats.syntax.apply.* +import cats.instances.lazyList.* +import cats.syntax.applicative.* import cats.free.Cofree import scribe.Logging @@ -65,23 +70,48 @@ case class OpModelTreeCursor( !allToRight.forall(_.isNoExec) // Whether variables exported from this branch are used later in the code or not - def exportsUsedLater: Boolean = - OpModel.exportsVarNames(current).map(ns => ns.nonEmpty && checkNamesUsedLater(ns)).value + def exportsUsedLater: Boolean = ( + namesUsedLater, + OpModel.exportsVarNames(current) + ).mapN(_ intersect _).value.nonEmpty - // TODO write a test - def checkNamesUsedLater(names: Set[String]): Boolean = + def namesUsedLater: Eval[Set[String]] = allToRight .map(_.current) .map(OpModel.usesVarNames) - .exists(_.value.intersect(names).nonEmpty) + .combineAll - def cata[A](wrap: ChainZipper[Cofree[Chain, A]] => Chain[Cofree[Chain, A]])( + // Check that exports of this subtree are used later in the code + // Do not take into account subtrees for which the filter returns false + def exportsUsedLaterFilter( + filter: OpModelTreeCursor => Boolean + ): Eval[Boolean] = ( + cata((cur, childs: Chain[Set[String]]) => + Eval.later( + if (filter(cur)) + childs.combineAll ++ + // TODO: Move to OpModel + cur.op.exportsVarNames -- + cur.op.restrictsVarNames + else Set.empty + ) + ), + namesUsedLater + ).mapN(_ intersect _).map(_.nonEmpty) + + def cata[A](f: (OpModelTreeCursor, Chain[A]) => Eval[A]): Eval[A] = + for { + childs <- Chain.fromSeq(children).traverse(_.cata(f)) + res <- f(this, childs) + } yield res + + def traverse[A](wrap: ChainZipper[Cofree[Chain, A]] => Chain[Cofree[Chain, A]])( folder: OpModelTreeCursor => OptionT[Eval, ChainZipper[Cofree[Chain, A]]] ): Eval[Chain[Cofree[Chain, A]]] = folder(this).map { case cz @ ChainZipper(_, curr, _) => val updatedTail = for { childs <- Eval.later(Chain.fromSeq(children)) - addition <- childs.flatTraverse(_.cata(wrap)(folder)) + addition <- childs.flatTraverse(_.traverse(wrap)(folder)) tail <- curr.tail } yield tail ++ addition diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/PathFinder.scala b/model/transform/src/main/scala/aqua/model/transform/topology/PathFinder.scala index 4cbdc968..65a2ecb5 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/PathFinder.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/PathFinder.scala @@ -19,15 +19,36 @@ object PathFinder extends Logging { * @return * Chain of peers to visit in between */ - def findPath(fromOn: List[OnModel], toOn: List[OnModel]): Chain[ValueModel] = + def findPath(fromOn: TopologyPath, toOn: TopologyPath): Chain[ValueModel] = findPath( - Chain.fromSeq(fromOn).reverse, - Chain.fromSeq(toOn).reverse, - fromOn.headOption.map(_.peerId), - toOn.headOption.map(_.peerId) + Chain.fromSeq(fromOn.path.reverse), + Chain.fromSeq(toOn.path.reverse), + fromOn.peerId, + toOn.peerId ) - def findPath( + /** + * Finds the path – chain of peers to visit to get from [[fromOn]] to [[toOn]] + * @param fromOn + * Previous location + * @param toOn + * Next location + * @return + * Chain of peers to visit in between with enforced last transition + */ + def findPathEnforce(fromOn: TopologyPath, toOn: TopologyPath): Chain[ValueModel] = { + val path = findPath( + Chain.fromSeq(fromOn.path.reverse), + Chain.fromSeq(toOn.path.reverse), + fromOn.peerId, + toOn.peerId + ) + + // TODO: Is it always correct to do so? + toOn.peerId.fold(path)(p => path :+ p) + } + + private def findPath( fromOn: Chain[OnModel], toOn: Chain[OnModel], fromPeer: Option[ValueModel], @@ -38,23 +59,27 @@ object PathFinder extends Logging { val (from, to) = skipCommonPrefix(fromOn, toOn) val fromFix = - if (from.isEmpty && fromPeer != toPeer) Chain.fromOption(fromOn.lastOption) else from - val toFix = if (to.isEmpty && fromPeer != toPeer) Chain.fromOption(toOn.lastOption) else to + if (from.isEmpty && fromPeer != toPeer) Chain.fromOption(fromOn.lastOption) + else from + val toFix = + if (to.isEmpty && fromPeer != toPeer) Chain.fromOption(toOn.lastOption) + else to logger.trace("FIND PATH FROM | " + fromFix) logger.trace(" TO | " + toFix) val fromTo = fromFix.reverse.flatMap(_.via.reverse) ++ toFix.flatMap(_.via) + logger.trace(s"FROM TO: $fromTo") - val fromPeerCh = Chain.fromOption(fromPeer) - val toPeerCh = Chain.fromOption(toPeer) - val optimized = optimizePath(fromPeerCh ++ fromTo ++ toPeerCh, fromPeerCh, toPeerCh) + val toOptimize = Chain.fromOption(fromPeer) ++ fromTo ++ Chain.fromOption(toPeer) + val optimized = optimizePath(toOptimize, fromPeer, toPeer) logger.trace( s"FROM PEER '${fromPeer.map(_.toString).getOrElse("None")}' TO PEER '${toPeer.map(_.toString).getOrElse("None")}'" ) logger.trace(" Optimized: " + optimized) + optimized } @@ -63,52 +88,45 @@ object PathFinder extends Logging { * * @param peerIds * peers to walk trough - * @param prefix + * @param fromPeer * getting from the previous peer - * @param suffix + * @param toPeer * getting to the next peer * @return * optimal path with no duplicates */ - def optimizePath( + private def optimizePath( peerIds: Chain[ValueModel], - prefix: Chain[ValueModel], - suffix: Chain[ValueModel] + fromPeer: Option[ValueModel], + toPeer: Option[ValueModel] ): Chain[ValueModel] = { - val optimized = peerIds - .foldLeft(Chain.empty[ValueModel]) { - case (acc, p) if acc.lastOption.contains(p) => acc - case (acc, p) if acc.contains(p) => acc.takeWhile(_ != p) :+ p - case (acc, p) => acc :+ p - } + val optimized = peerIds.foldLeft(Chain.empty[ValueModel]) { + case (acc, p) if acc.lastOption.contains(p) => acc + case (acc, p) if acc.contains(p) => acc.takeWhile(_ != p) :+ p + case (acc, p) => acc :+ p + } + logger.trace(s"PEER IDS: $optimized") - logger.trace(s"PREFIX: $prefix") - logger.trace(s"SUFFIX: $suffix") - logger.trace(s"OPTIMIZED WITH PREFIX AND SUFFIX: $optimized") - val noPrefix = skipPrefix(optimized, prefix, optimized) - skipSuffix(noPrefix, suffix, noPrefix) + logger.trace(s"FROM PEER: $fromPeer") + logger.trace(s"TO PEER: $toPeer") + + val skipFrom = optimized.uncons match { + case Some((head, tail)) if fromPeer.contains(head) => tail + case _ => optimized + } + + val skipTo = skipFrom.initLast match { + case Some((init, last)) if toPeer.contains(last) => init + case _ => skipFrom + } + + skipTo } @tailrec - def skipPrefix[T](chain: Chain[T], prefix: Chain[T], init: Chain[T]): Chain[T] = - (chain, prefix) match { - case (c ==: ctail, p ==: ptail) if c == p => skipPrefix(ctail, ptail, init) - case (_, `nil`) => chain - case (_, _) => init - } - - @tailrec - def skipCommonPrefix[T](chain1: Chain[T], chain2: Chain[T]): (Chain[T], Chain[T]) = + private def skipCommonPrefix[T](chain1: Chain[T], chain2: Chain[T]): (Chain[T], Chain[T]) = (chain1, chain2) match { case (c ==: ctail, p ==: ptail) if c == p => skipCommonPrefix(ctail, ptail) case _ => chain1 -> chain2 } - - @tailrec - def skipSuffix[T](chain: Chain[T], suffix: Chain[T], init: Chain[T]): Chain[T] = - (chain, suffix) match { - case (cinit :== c, pinit :== p) if c == p => skipSuffix(cinit, pinit, init) - case (_, `nil`) => chain - case (_, _) => init - } } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala b/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala index f8e2d201..726168e0 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala @@ -1,11 +1,13 @@ package aqua.model.transform.topology +import aqua.model.transform.topology.TopologyPath import aqua.model.transform.cursor.ChainZipper import aqua.model.transform.topology.strategy.* import aqua.model.* import aqua.raw.value.{LiteralRaw, ValueRaw} import aqua.res.{ApRes, CanonRes, FoldRes, MakeRes, NextRes, ResolvedOp, SeqRes} import aqua.types.{ArrayType, BoxType, CanonStreamType, ScalarType, StreamType} + import cats.Eval import cats.data.Chain.{==:, nil} import cats.data.{Chain, NonEmptyChain, NonEmptyList, OptionT} @@ -18,6 +20,7 @@ import cats.syntax.flatMap.* import cats.syntax.foldable.* import cats.syntax.applicative.* import cats.instances.map.* +import cats.kernel.Monoid import scribe.Logging /** @@ -69,7 +72,7 @@ case class Topology private ( // Current topology location – stack of OnModel's collected from parents branch // ApplyTopologyModel shifts topology to pathOn where this topology was Captured - val pathOn: Eval[List[OnModel]] = Eval + val pathOn: Eval[TopologyPath] = Eval .defer( cursor.op match { case o: OnModel => @@ -91,20 +94,23 @@ case class Topology private ( .memoize // Find path of first `ForceExecModel` (call, canon, join) in this subtree - lazy val firstExecutesOn: Eval[Option[List[OnModel]]] = + lazy val firstExecutesOn: Eval[Option[TopologyPath]] = (cursor.op match { case _: ForceExecModel => pathOn.map(_.some) case _ => children.collectFirstSomeM(_.firstExecutesOn) }).memoize // Find path of last `ForceExecModel` (call, canon, join) in this subtree - lazy val lastExecutesOn: Eval[Option[List[OnModel]]] = + lazy val lastExecutesOn: Eval[Option[TopologyPath]] = (cursor.op match { case _: ForceExecModel => pathOn.map(_.some) case _ => children.reverse.collectFirstSomeM(_.lastExecutesOn) }).memoize - lazy val currentPeerId: Option[ValueModel] = pathOn.value.headOption.map(_.peerId) + lazy val currentPeerId: Option[ValueModel] = pathOn.value.peerId + + // Path of current relay + lazy val relayOn: Eval[TopologyPath] = pathOn.map(_.toRelay) // Get topology of previous sibling skipping `NoExec` nodes lazy val prevSibling: Option[Topology] = cursor.toPrevSibling.flatMap { @@ -147,23 +153,30 @@ case class Topology private ( lazy val isForModel: Boolean = forModel.isDefined // Before the left boundary of this element, what was the scope - lazy val beforeOn: Eval[List[OnModel]] = before.beforeOn(this).memoize + lazy val beforeOn: Eval[TopologyPath] = before.beforeOn(this).memoize // Inside the left boundary of this element, what should be the scope - lazy val beginsOn: Eval[List[OnModel]] = begins.beginsOn(this).memoize + lazy val beginsOn: Eval[TopologyPath] = begins.beginsOn(this).memoize // After this element is done, what is the scope - lazy val endsOn: Eval[List[OnModel]] = ends.endsOn(this).memoize + lazy val endsOn: Eval[TopologyPath] = ends.endsOn(this).memoize // After this element is done, where should it move to prepare for the next one - lazy val afterOn: Eval[List[OnModel]] = after.afterOn(this).memoize + lazy val afterOn: Eval[TopologyPath] = after.afterOn(this).memoize // Usually we don't care about exiting from where this tag ends into the outer scope // But for some cases, like par branches, its necessary, so the exit can be forced - lazy val forceExit: Eval[Boolean] = after.forceExit(this).memoize + lazy val forceExit: Eval[Topology.ExitStrategy] = + cursor.op match { + case OnModel(_, _, Some(OnModel.ReturnStrategy.Relay)) => + Eval.now(Topology.ExitStrategy.ToRelay) + case FailModel(_) => + Eval.now(Topology.ExitStrategy.Empty) + case _ => after.forceExit(this) + } // Where we finally are, after exit enforcement is applied - lazy val finallyOn: Eval[List[OnModel]] = after.finallyOn(this).memoize + lazy val finallyOn: Eval[TopologyPath] = after.finallyOn(this).memoize lazy val pathBefore: Eval[Chain[ValueModel]] = begins.pathBefore(this).memoize @@ -176,15 +189,38 @@ case class Topology private ( object Topology extends Logging { type Res = ResolvedOp.Tree - def findRelayPathEnforcement(before: List[OnModel], begin: List[OnModel]): Chain[ValueModel] = + // Strategy of generating exit transitions + enum ExitStrategy { + // Force generation of full exit transitions + case Full + // Generate exit to the current relay only + case ToRelay + // Do force generation of exit transitions + case Empty + } + + object ExitStrategy { + + given Monoid[ExitStrategy] with { + def empty: ExitStrategy = Empty + + def combine(x: ExitStrategy, y: ExitStrategy): ExitStrategy = + (x, y) match { + case (Full, _) | (_, Full) => Full + case (ToRelay, _) | (_, ToRelay) => ToRelay + case _ => Empty + } + } + } + + def findRelayPathEnforcement(before: TopologyPath, begin: TopologyPath): Chain[ValueModel] = Chain.fromOption( // Get target peer of `begin` - begin.headOption - .map(_.peerId) + begin.peerId // Check that it is last relay of previous `on` - .filter(lastPeerId => begin.tail.headOption.exists(_.via.lastOption.contains(lastPeerId))) + .filter(lastPeerId => begin.previous.flatMap(_.lastRelay).contains(lastPeerId)) // Check that it is not target peer of `before` - .filterNot(lastPeerId => before.headOption.exists(_.peerId == lastPeerId)) + .filterNot(lastPeerId => before.current.exists(_.peerId == lastPeerId)) ) // Return strategy for calculating `beforeOn` for @@ -223,12 +259,11 @@ object Topology extends Logging { // Return strategy for calculating `afterOn` for // node pointed on by `cursor` private def decideAfter(cursor: OpModelTreeCursor): After = - (cursor.parentOp, cursor.op) match { - case (_, _: FailModel) => Fail - case (Some(_: ParGroupModel), _) => ParGroupBranch - case (Some(XorModel), _) => XorBranch - case (Some(_: SeqGroupModel), _) => SeqGroupBranch - case (None, _) => Root + cursor.parentOp match { + case Some(_: ParGroupModel) => ParGroupBranch + case Some(XorModel) => XorBranch + case Some(_: SeqGroupModel) => SeqGroupBranch + case None => Root case _ => Default } @@ -274,7 +309,7 @@ object Topology extends Logging { i } - val resolvedCofree = cursor.cata(wrap) { rc => + val resolvedCofree = cursor.traverse(wrap) { rc => logger.debug(s"<:> $rc") val currI = nextI val resolved = MakeRes @@ -333,8 +368,8 @@ object Topology extends Logging { def printDebugInfo(rc: OpModelTreeCursor, i: Int): Unit = { println(Console.BLUE + rc + Console.RESET) println(i + " : " + rc.topology) - println("Before: " + rc.topology.beforeOn.value) - println("Begin: " + rc.topology.beginsOn.value) + println("Before: " + rc.topology.beforeOn.value.show) + println("Begin: " + rc.topology.beginsOn.value.show) println( (if (rc.topology.pathBefore.value.nonEmpty) Console.YELLOW else "") + "PathBefore: " + Console.RESET + rc.topology.pathBefore.value @@ -342,12 +377,10 @@ object Topology extends Logging { println("Parent: " + Console.CYAN + rc.topology.parent.getOrElse("-") + Console.RESET) - println("End : " + rc.topology.endsOn.value) - println("After: " + rc.topology.afterOn.value) - println( - "Exit : " + (if (rc.topology.forceExit.value) Console.MAGENTA + "true" + Console.RESET - else "false") - ) + println("End : " + rc.topology.endsOn.value.show) + println("After: " + rc.topology.afterOn.value.show) + println("Relay: " + rc.topology.relayOn.value.show) + println("Exit : " + Console.MAGENTA + rc.topology.forceExit.value + Console.RESET) println( (if (rc.topology.pathAfter.value.nonEmpty) Console.YELLOW else "") + "PathAfter: " + Console.RESET + rc.topology.pathAfter.value diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/TopologyPath.scala b/model/transform/src/main/scala/aqua/model/transform/topology/TopologyPath.scala new file mode 100644 index 00000000..da3edb73 --- /dev/null +++ b/model/transform/src/main/scala/aqua/model/transform/topology/TopologyPath.scala @@ -0,0 +1,65 @@ +package aqua.model.transform.topology + +import aqua.model.OnModel +import aqua.model.ValueModel + +import cats.kernel.Monoid +import cats.Show +import cats.data.Chain.:== + +final case class TopologyPath( + path: List[OnModel] +) extends AnyVal { + def ::(on: OnModel): TopologyPath = TopologyPath(on :: path) + + // First `on` in the path + def current: Option[OnModel] = path.headOption + + // Current peer id + def peerId: Option[ValueModel] = current.map(_.peerId) + + // Path with the first `on` removed + def previous: Option[TopologyPath] = path match { + case _ :: tail => Some(TopologyPath(tail)) + case Nil => None + } + + // Last relay in the current `on` + def lastRelay: Option[ValueModel] = current.flatMap(_.via.lastOption) + + def reverse: TopologyPath = TopologyPath(path.reverse) + + def commonPrefix(other: TopologyPath): TopologyPath = + TopologyPath(path.zip(other.path).takeWhile(_ == _).map(_._1)) + + // Path of the first relay in the path + def toRelay: TopologyPath = { + def toRelayTailRec( + currentPath: List[OnModel] + ): List[OnModel] = currentPath match { + case Nil => Nil + case (on @ OnModel(_, other :== r, _)) :: tail => + on.copy(peerId = r, via = other) :: tail + case _ :: tail => toRelayTailRec(tail) + } + + TopologyPath(toRelayTailRec(path)) + } +} + +object TopologyPath { + + given Monoid[TopologyPath] with { + def empty: TopologyPath = TopologyPath(Nil) + def combine(x: TopologyPath, y: TopologyPath): TopologyPath = TopologyPath(x.path ++ y.path) + } + + val empty = Monoid[TopologyPath].empty + + given Show[TopologyPath] with { + + def show(t: TopologyPath): String = + if (t.path.isEmpty) "empty" + else t.path.map(_.toString).mkString(" -> ") + } +} diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/After.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/After.scala index abe044e3..7c7001fe 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/After.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/After.scala @@ -1,28 +1,31 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.{PathFinder, Topology} +import aqua.model.transform.topology.Topology.ExitStrategy import aqua.model.{OnModel, ValueModel} import cats.Eval import cats.data.Chain import cats.syntax.apply.* +import aqua.model.transform.topology.TopologyPath trait After { - def forceExit(current: Topology): Eval[Boolean] = Eval.now(false) + def forceExit(current: Topology): Eval[ExitStrategy] = Eval.now(ExitStrategy.Empty) - def afterOn(current: Topology): Eval[List[OnModel]] = current.pathOn + def afterOn(current: Topology): Eval[TopologyPath] = current.pathOn - protected def afterParent(current: Topology): Eval[List[OnModel]] = + protected def afterParent(current: Topology): Eval[TopologyPath] = current.parent.map( _.afterOn ) getOrElse current.pathOn // In case exit is performed and pathAfter is inserted, we're actually where // execution is expected to continue After this node is handled - final def finallyOn(current: Topology): Eval[List[OnModel]] = + final def finallyOn(current: Topology): Eval[TopologyPath] = current.forceExit.flatMap { - case true => current.afterOn - case false => current.endsOn + case ExitStrategy.Full => current.afterOn + case ExitStrategy.ToRelay => current.relayOn + case ExitStrategy.Empty => current.endsOn } // If exit is forced, make a path outside this node @@ -34,10 +37,11 @@ trait After { // – from where it ends to where execution is expected to continue private def pathAfterVia(current: Topology): Eval[Chain[ValueModel]] = current.forceExit.flatMap { - case true => + case ExitStrategy.Empty => Eval.now(Chain.empty) + case ExitStrategy.ToRelay => + (current.endsOn, current.relayOn).mapN(PathFinder.findPathEnforce) + case ExitStrategy.Full => (current.endsOn, current.afterOn).mapN(PathFinder.findPath) - case false => - Eval.now(Chain.empty) } // If exit is forced, make a path outside this node @@ -45,22 +49,22 @@ trait After { // explicitly pinging the next node (useful inside par branches) def pathAfterAndPingNext(current: Topology): Eval[Chain[ValueModel]] = current.forceExit.flatMap { - case false => Eval.now(Chain.empty) - case true => + case ExitStrategy.Empty | ExitStrategy.ToRelay => Eval.now(Chain.empty) + case ExitStrategy.Full => (current.endsOn, current.afterOn, current.lastExecutesOn).mapN { case (e, a, _) if e == a => Chain.empty case (e, a, l) if l.contains(e) => // Pingback in case no relays involved Chain.fromOption( - a.headOption + a.current // Add nothing if last node is the same - .filterNot(e.headOption.contains) + .filterNot(e.current.contains) .map(_.peerId) ) case (e, a, _) => // We wasn't at e, so need to get through the last peer in case it matches with the relay Topology.findRelayPathEnforcement(a, e) ++ Chain.fromOption( - a.headOption.map(_.peerId) + a.peerId ) } }.flatMap { appendix => diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Before.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Before.scala index 24cd94db..ea2f64df 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Before.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Before.scala @@ -1,13 +1,14 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath import aqua.model.OnModel import cats.Eval trait Before { - def beforeOn(current: Topology): Eval[List[OnModel]] = + def beforeOn(current: Topology): Eval[TopologyPath] = // Go to the parent, see where it begins current.parent.map(_.beginsOn) getOrElse // This means, we have no parent; then we're where we should be diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Begins.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Begins.scala index 8176b556..9bf9e043 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Begins.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Begins.scala @@ -1,6 +1,6 @@ package aqua.model.transform.topology.strategy -import aqua.model.transform.topology.{PathFinder, Topology} +import aqua.model.transform.topology.{PathFinder, Topology, TopologyPath} import aqua.model.{OnModel, ValueModel} import cats.Eval @@ -12,7 +12,7 @@ import cats.instances.tuple.* trait Begins { - def beginsOn(current: Topology): Eval[List[OnModel]] = current.pathOn + def beginsOn(current: Topology): Eval[TopologyPath] = current.pathOn def pathBefore(current: Topology): Eval[Chain[ValueModel]] = (current.beforeOn, current.beginsOn).tupled diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Ends.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Ends.scala index 45a64154..54b988d9 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Ends.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Ends.scala @@ -1,29 +1,32 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.{PathFinder, Topology} +import aqua.model.transform.topology.TopologyPath +import aqua.model.transform.topology.Topology.ExitStrategy import aqua.model.OnModel import cats.Eval trait Ends { - def endsOn(current: Topology): Eval[List[OnModel]] = + def endsOn(current: Topology): Eval[TopologyPath] = current.beginsOn private def childFinally( current: Topology, child: Topology => Option[Topology] - ): Eval[List[OnModel]] = + ): Eval[TopologyPath] = child(current).map(lc => lc.forceExit.flatMap { - case true => current.afterOn - case false => lc.endsOn + case ExitStrategy.Empty => lc.endsOn + case ExitStrategy.ToRelay => lc.pathOn.map(_.toRelay) + case ExitStrategy.Full => current.afterOn } ) getOrElse current.beginsOn - protected def lastChildFinally(current: Topology): Eval[List[OnModel]] = + protected def lastChildFinally(current: Topology): Eval[TopologyPath] = childFinally(current, _.lastChild) - protected def firstChildFinally(current: Topology): Eval[List[OnModel]] = + protected def firstChildFinally(current: Topology): Eval[TopologyPath] = childFinally(current, _.firstChild) } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Fail.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Fail.scala index f5335db3..7a75891a 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Fail.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Fail.scala @@ -1,8 +1,8 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.Topology.ExitStrategy import aqua.model.ValueModel - import aqua.model.{OnModel, XorModel} import cats.data.Chain @@ -13,11 +13,7 @@ import cats.syntax.traverse.* import cats.syntax.option.* import cats.syntax.applicative.* -object Fail extends Begins with After { - - // override just to be explicit - override def forceExit(current: Topology): Eval[Boolean] = - Eval.now(false) // There is no need to insert hops after `fail` +object Fail extends Begins { override def pathBefore(current: Topology): Eval[Chain[ValueModel]] = for { @@ -26,8 +22,6 @@ object Fail extends Begins with After { // Get last hop to final peer // if it is not in the path // TODO: Add option to enforce last hop to [[PathFinder]] - hop = begins.headOption - .map(_.peerId) - .filterNot(peer => path.lastOption.contains(peer) || path.isEmpty) + hop = begins.peerId.filterNot(peer => path.lastOption.contains(peer) || path.isEmpty) } yield path ++ Chain.fromOption(hop) } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/For.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/For.scala index 011a1277..67779ae3 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/For.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/For.scala @@ -1,6 +1,7 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath import aqua.model.{NextModel, OnModel} import cats.Eval @@ -18,24 +19,24 @@ object For extends Begins { // Optimization: get all the path inside the For block out of the block, to avoid repeating // hops for every For iteration - override def beginsOn(current: Topology): Eval[List[OnModel]] = + override def beginsOn(current: Topology): Eval[TopologyPath] = // Skip `next` child because its `beginsOn` depends on `this.beginsOn`, see [bug LNG-149] (current.forModel zip firstNotNextChild(current).map(_.beginsOn)).map { case (model, childBeginsOn) => for { child <- childBeginsOn // Take path until this for's iterator is used - path <- child.reverse - .foldM(List.empty[OnModel])((acc, on) => + path <- child.reverse.path + .foldM(TopologyPath.empty)((acc, on) => State .get[Boolean] .flatMap(found => if (found) acc.pure // Short circuit else - (acc, on) match { - case (_, OnModel(_, r)) if r.exists(_.usesVarNames.contains(model.item)) => + (acc.path, on) match { + case (_, OnModel(_, r, _)) if r.exists(_.usesVarNames.contains(model.item)) => State.set(true).as(acc) - case (OnModel(_, r @ (r0 ==: _)) :: _, OnModel(p, _)) + case (OnModel(_, r @ (r0 ==: _), _) :: _, OnModel(p, _, _)) if p.usesVarNames.contains(model.item) => // This is to take the outstanding relay and force moving there State.set(true).as(OnModel(r0, r) :: acc) diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/ParGroup.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/ParGroup.scala index 7a8126b7..bf835672 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/ParGroup.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/ParGroup.scala @@ -1,10 +1,16 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath +import aqua.model.transform.topology.Topology.ExitStrategy import aqua.model.OnModel import cats.Eval import cats.syntax.apply.* +import cats.syntax.reducible.* +import cats.syntax.foldable.* +import cats.syntax.traverse.* +import cats.instances.lazyList.* object ParGroup extends Begins with Ends { override def toString: String = "" @@ -12,25 +18,21 @@ object ParGroup extends Begins with Ends { // Optimization: find the longest common prefix of all the par branches, and move it outside of this par // When branches will calculate their paths, they will take this move into account. // So less hops will be produced - override def beginsOn(current: Topology): Eval[List[OnModel]] = + override def beginsOn(current: Topology): Eval[TopologyPath] = current.children .map(_.beginsOn.map(_.reverse)) .reduceLeftOption { case (b1e, b2e) => - (b1e, b2e).mapN { case (b1, b2) => - (b1 zip b2).takeWhile(_ == _).map(_._1) - } + (b1e, b2e).mapN { case (b1, b2) => b1.commonPrefix(b2) } } .map(_.map(_.reverse)) getOrElse super.beginsOn(current) // Par block ends where all the branches end, if they have forced exit (not fire-and-forget) - override def endsOn(current: Topology): Eval[List[OnModel]] = + override def endsOn(current: Topology): Eval[TopologyPath] = current.children - .map(_.forceExit) - .reduceLeftOption { case (a, b) => - (a, b).mapN(_ || _) - } - .map(_.flatMap { - case true => current.afterOn - case false => super.endsOn(current) - }) getOrElse super.endsOn(current) + .traverse(_.forceExit) + .flatMap(_.combineAll match { + case ExitStrategy.Empty => super.endsOn(current) + case ExitStrategy.ToRelay => current.pathOn.map(_.toRelay) + case ExitStrategy.Full => current.afterOn + }) } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/ParGroupBranch.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/ParGroupBranch.scala index 77304ad9..3fce1966 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/ParGroupBranch.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/ParGroupBranch.scala @@ -1,6 +1,8 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath +import aqua.model.transform.topology.Topology.ExitStrategy import aqua.model.{OnModel, ValueModel} import cats.Eval @@ -10,14 +12,31 @@ import cats.data.Chain object ParGroupBranch extends Ends with After { override def toString: String = "/*" - override def forceExit(current: Topology): Eval[Boolean] = - Eval.later(current.cursor.exportsUsedLater) + override def forceExit(current: Topology): Eval[ExitStrategy] = + current.cursor + .exportsUsedLaterFilter( + _.op match { + // This feels like a hack: + // We suppose that `on` with Relay strategy + // does not want to generate return transitions + // because of it's exports. + // This is used for `parseq` implementation. + // We could not use `forceExit` of childs here + // because it would cause infinite recursion. + case OnModel(_, _, Some(OnModel.ReturnStrategy.Relay)) => false + case _ => true + } + ) + .map(used => + if (used) ExitStrategy.Full + else ExitStrategy.Empty + ) - override def afterOn(current: Topology): Eval[List[OnModel]] = + override def afterOn(current: Topology): Eval[TopologyPath] = afterParent(current) override def pathAfter(current: Topology): Eval[Chain[ValueModel]] = pathAfterAndPingNext(current) - override def endsOn(current: Topology): Eval[List[OnModel]] = current.beforeOn + override def endsOn(current: Topology): Eval[TopologyPath] = current.beforeOn } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Root.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Root.scala index ce690e2f..3ba15974 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Root.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Root.scala @@ -1,6 +1,8 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath +import aqua.model.transform.topology.Topology.ExitStrategy import aqua.model.OnModel import cats.Eval @@ -8,11 +10,11 @@ import cats.Eval object Root extends Before with Ends with After { override def toString: String = "" - override def beforeOn(current: Topology): Eval[List[OnModel]] = current.beginsOn + override def beforeOn(current: Topology): Eval[TopologyPath] = current.beginsOn - override def endsOn(current: Topology): Eval[List[OnModel]] = current.pathOn + override def endsOn(current: Topology): Eval[TopologyPath] = current.pathOn - override def afterOn(current: Topology): Eval[List[OnModel]] = current.pathOn + override def afterOn(current: Topology): Eval[TopologyPath] = current.pathOn - override def forceExit(current: Topology): Eval[Boolean] = Eval.now(false) + override def forceExit(current: Topology): Eval[ExitStrategy] = Eval.now(ExitStrategy.Empty) } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqGroup.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqGroup.scala index 6ddffa40..af217edf 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqGroup.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqGroup.scala @@ -1,6 +1,7 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath import aqua.model.OnModel import cats.Eval @@ -8,6 +9,6 @@ import cats.Eval object SeqGroup extends Ends { override def toString: String = "" - override def endsOn(current: Topology): Eval[List[OnModel]] = + override def endsOn(current: Topology): Eval[TopologyPath] = lastChildFinally(current) } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqGroupBranch.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqGroupBranch.scala index 76934a51..7ff40b7d 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqGroupBranch.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqGroupBranch.scala @@ -1,6 +1,7 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath import aqua.model.OnModel import cats.Eval @@ -10,12 +11,12 @@ object SeqGroupBranch extends Before with After { override def toString: String = "/*" // If parent is seq, then before this node we are where previous node, if any, ends - override def beforeOn(current: Topology): Eval[List[OnModel]] = + override def beforeOn(current: Topology): Eval[TopologyPath] = // Where we are after the previous node in the parent current.prevSibling .map(_.finallyOn) getOrElse super.beforeOn(current) - override def afterOn(current: Topology): Eval[List[OnModel]] = + override def afterOn(current: Topology): Eval[TopologyPath] = current.nextSibling.map(_.beginsOn) getOrElse afterParent(current) } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqNext.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqNext.scala index 74052ee2..d1e6d989 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqNext.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/SeqNext.scala @@ -1,6 +1,7 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath import aqua.model.OnModel import cats.Eval @@ -8,6 +9,6 @@ import cats.Eval object SeqNext extends Begins { override def toString: String = "/" - override def beginsOn(current: Topology): Eval[List[OnModel]] = + override def beginsOn(current: Topology): Eval[TopologyPath] = current.parents.find(_.isForModel).map(_.beginsOn) getOrElse super.beginsOn(current) } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala index 54382bbc..b1615b08 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala @@ -1,6 +1,8 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath +import aqua.model.transform.topology.Topology.ExitStrategy import aqua.model.{OnModel, ParGroupModel, SeqGroupModel, ValueModel, XorModel} import cats.Eval @@ -13,7 +15,7 @@ import cats.syntax.option.* object XorBranch extends Before with After { override def toString: String = Console.RED + "/*" + Console.RESET - override def beforeOn(current: Topology): Eval[List[OnModel]] = + override def beforeOn(current: Topology): Eval[TopologyPath] = current.prevSibling.map(_.beginsOn) getOrElse super.beforeOn(current) // Find closest par exit up and return its branch current is in @@ -35,16 +37,20 @@ object XorBranch extends Before with After { private def closestParExit(current: Topology): Option[Topology] = closestParExitChild(current).flatMap(_.parent) - override def forceExit(current: Topology): Eval[Boolean] = + override def forceExit(current: Topology): Eval[ExitStrategy] = closestParExitChild(current).fold( - Eval.later(current.cursor.moveUp.exists(_.hasExecLater)) + Eval.later { + if (current.cursor.moveUp.exists(_.hasExecLater)) ExitStrategy.Full + else ExitStrategy.Empty + } )(_.forceExit) // Force exit if par branch needs it - override def afterOn(current: Topology): Eval[List[OnModel]] = + override def afterOn(current: Topology): Eval[TopologyPath] = current.forceExit.flatMap { - case true => + case ExitStrategy.Empty => super.afterOn(current) + case ExitStrategy.ToRelay => current.relayOn + case ExitStrategy.Full => closestParExit(current).fold(afterParent(current))(_.afterOn) - case false => super.afterOn(current) } // Parent of this branch's parent xor – fixes the case when this xor is in par diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorGroup.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorGroup.scala index 82f502bd..4f8677e9 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorGroup.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorGroup.scala @@ -1,6 +1,7 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology +import aqua.model.transform.topology.TopologyPath import aqua.model.OnModel import cats.Eval @@ -10,7 +11,7 @@ object XorGroup extends Ends { override def toString: String = "" // Xor tag ends where any child ends; can't get first one as it may lead to recursion - override def endsOn(current: Topology): Eval[List[OnModel]] = + override def endsOn(current: Topology): Eval[TopologyPath] = firstChildFinally(current) } diff --git a/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala b/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala index 2ce79264..371f5727 100644 --- a/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala +++ b/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala @@ -9,13 +9,14 @@ import aqua.types.{ArrayType, LiteralType, ScalarType} import aqua.types.StreamType import aqua.model.IntoIndexModel import aqua.model.inline.raw.ApplyGateRawInliner +import aqua.model.OnModel +import aqua.model.FailModel +import aqua.res.ResolvedOp import scala.language.implicitConversions import cats.data.Chain import cats.data.Chain.==: -import aqua.model.OnModel -import aqua.model.FailModel -import aqua.res.ResolvedOp +import cats.syntax.option.* object ModelBuilder { implicit def rawToValue(raw: ValueRaw): ValueModel = ValueModel.fromRaw(raw) @@ -131,7 +132,7 @@ object ModelBuilder { def foldPar(item: String, iter: ValueRaw, body: OpModel.Tree*) = { val ops = SeqModel.wrap(body: _*) DetachModel.wrap( - ForModel(item, ValueModel.fromRaw(iter), Some(ForModel.NeverMode)) + ForModel(item, ValueModel.fromRaw(iter), ForModel.Mode.Never.some) .wrap(ParModel.wrap(ops, NextModel(item).leaf)) ) } diff --git a/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala b/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala index 95481cfa..657d47fb 100644 --- a/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala +++ b/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala @@ -6,6 +6,11 @@ import aqua.res.* import aqua.raw.ops.Call import aqua.raw.value.{IntoIndexRaw, LiteralRaw, VarRaw} import aqua.types.{LiteralType, ScalarType, StreamType} +import aqua.types.ArrayType +import aqua.raw.ConstantRaw.initPeerId +import aqua.model.ForModel +import aqua.raw.value.ValueRaw + import cats.Eval import cats.data.{Chain, NonEmptyList} import cats.data.Chain.* @@ -14,10 +19,6 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import cats.syntax.show.* import cats.syntax.option.* -import aqua.types.ArrayType -import aqua.raw.ConstantRaw.initPeerId -import aqua.model.ForModel.NullMode -import aqua.raw.value.ValueRaw class TopologySpec extends AnyFlatSpec with Matchers { @@ -439,7 +440,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { through(relay), callRes(0, otherPeer), ParRes.wrap( - FoldRes("i", valueArray, Some(ForModel.NeverMode)) + FoldRes("i", valueArray, ForModel.Mode.Never.some) .wrap(ParRes.wrap(callRes(2, otherPeer2), NextRes("i").leaf)) ), through(relay), @@ -486,7 +487,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { val expected = SeqRes.wrap( through(relay), ParRes.wrap( - FoldRes("i", valueArray, Some(ForModel.NeverMode)).wrap( + FoldRes("i", valueArray, ForModel.Mode.Never.some).wrap( ParRes.wrap( // better if first relay will be outside `for` SeqRes.wrap( @@ -553,7 +554,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { val expected = SeqRes.wrap( through(relay), ParRes.wrap( - FoldRes("i", valueArray, Some(ForModel.NeverMode)).wrap( + FoldRes("i", valueArray, ForModel.Mode.Never.some).wrap( ParRes.wrap( // better if first relay will be outside `for` SeqRes.wrap( @@ -735,7 +736,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { val expected = SeqRes.wrap( callRes(1, otherPeer), ParRes.wrap( - FoldRes("i", valueArray, Some(ForModel.NeverMode)).wrap( + FoldRes("i", valueArray, ForModel.Mode.Never.some).wrap( ParRes.wrap( SeqRes.wrap( // TODO: should be outside of fold @@ -812,7 +813,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { val expected = SeqRes.wrap( ParRes.wrap( - FoldRes("i", ValueModel.fromRaw(valueArray), Some(ForModel.NeverMode)).wrap( + FoldRes("i", ValueModel.fromRaw(valueArray), ForModel.Mode.Never.some).wrap( ParRes.wrap( SeqRes.wrap( through(relay), @@ -861,7 +862,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { val expected = SeqRes.wrap( ParRes.wrap( - FoldRes("i", ValueModel.fromRaw(valueArray), Some(ForModel.NeverMode)).wrap( + FoldRes("i", ValueModel.fromRaw(valueArray), ForModel.Mode.Never.some).wrap( ParRes.wrap( SeqRes.wrap( through(relay), @@ -888,6 +889,81 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } + it should "return to relay for `on` with ReturnStrategy.Relay in `par`" in { + val init = OnModel(initPeer, Chain.one(relay)).wrap( + ParModel.wrap( + OnModel( + otherPeer, + Chain(otherRelay), + OnModel.ReturnStrategy.Relay.some + ).wrap( + callModel(0, CallModel.Export("var", ScalarType.string) :: Nil) + ) + ), + callModel(1, Nil, VarRaw("var", ScalarType.string) :: Nil) + ) + + val proc = Topology.resolve(init).value + + val expected = SeqRes.wrap( + through(relay), + through(otherRelay), + ParRes.wrap( + SeqRes.wrap( + callRes(0, otherPeer, Some(CallModel.Export("var", ScalarType.string))), + through(otherRelay) + // Note missing hops here + ) + ), + callRes(1, initPeer, None, VarModel("var", ScalarType.string) :: Nil) + ) + + proc.equalsOrShowDiff(expected) should be(true) + } + + it should "return to relay for `on` with ReturnStrategy.Relay in `par` in `xor`" in { + val init = OnModel(initPeer, Chain.one(relay)).wrap( + ParModel.wrap( + XorModel.wrap( + OnModel( + otherPeer, + Chain(otherRelay), + OnModel.ReturnStrategy.Relay.some + ).wrap( + callModel(0, CallModel.Export("var", ScalarType.string) :: Nil) + ), + failLastErrorModel + ) + ), + callModel(1, Nil, VarRaw("var", ScalarType.string) :: Nil) + ) + + val proc = Topology.resolve(init).value + + val expected = SeqRes.wrap( + ParRes.wrap( + XorRes.wrap( + SeqRes.wrap( + through(relay), + through(otherRelay), + callRes(0, otherPeer, Some(CallModel.Export("var", ScalarType.string))), + through(otherRelay) + // Note missing hops here + ), + SeqRes.wrap( + through(otherRelay), + through(relay), + through(initPeer), + failLastErrorRes + ) + ) + ), + callRes(1, initPeer, None, VarModel("var", ScalarType.string) :: Nil) + ) + + proc.equalsOrShowDiff(expected) should be(true) + } + it should "handle empty for correctly [bug LNG-149]" in { val streamName = "array-inline" val iterName = "a-0" @@ -928,7 +1004,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { CallModel.Export(array.name, array.`type`) ).leaf ), - FoldRes(iterName, array, NullMode.some).wrap( + FoldRes(iterName, array, ForModel.Mode.Null.some).wrap( NextRes(iterName).leaf ) ) diff --git a/parser/src/main/scala/aqua/parser/expr/func/ArrowExpr.scala b/parser/src/main/scala/aqua/parser/expr/func/ArrowExpr.scala index 17041b44..b4a635d8 100644 --- a/parser/src/main/scala/aqua/parser/expr/func/ArrowExpr.scala +++ b/parser/src/main/scala/aqua/parser/expr/func/ArrowExpr.scala @@ -30,6 +30,7 @@ object ArrowExpr extends Expr.AndIndented { ElseOtherwiseExpr :: TryExpr :: CatchExpr :: + Expr.defer(ParSeqExpr) :: Expr.defer(ParExpr) :: Expr.defer(CoExpr) :: Expr.defer(JoinExpr) :: diff --git a/parser/src/main/scala/aqua/parser/expr/func/OnExpr.scala b/parser/src/main/scala/aqua/parser/expr/func/OnExpr.scala index 0e2f357f..1569a886 100644 --- a/parser/src/main/scala/aqua/parser/expr/func/OnExpr.scala +++ b/parser/src/main/scala/aqua/parser/expr/func/OnExpr.scala @@ -10,7 +10,8 @@ import cats.{~>, Comonad} import aqua.parser.lift.Span import aqua.parser.lift.Span.{P0ToSpan, PToSpan} -case class OnExpr[F[_]](peerId: ValueToken[F], via: List[ValueToken[F]]) extends Expr[F](OnExpr, peerId) { +case class OnExpr[F[_]](peerId: ValueToken[F], via: List[ValueToken[F]]) + extends Expr[F](OnExpr, peerId) { override def mapK[K[_]: Comonad](fk: F ~> K): OnExpr[K] = copy(peerId.mapK(fk), via.map(_.mapK(fk))) @@ -20,10 +21,9 @@ object OnExpr extends Expr.AndIndented { override def validChildren: List[Expr.Lexem] = ForExpr.validChildren - override def p: P[OnExpr[Span.S]] = { + override def p: P[OnExpr[Span.S]] = (`on` *> ` ` *> ValueToken.`value` ~ (` ` *> `via` *> ` ` *> ValueToken.`value`).rep0).map { - case (peerId, via) => - OnExpr(peerId, via) + case (peerId, via) => OnExpr(peerId, via) } - } + } diff --git a/parser/src/main/scala/aqua/parser/expr/func/ParSeqExpr.scala b/parser/src/main/scala/aqua/parser/expr/func/ParSeqExpr.scala new file mode 100644 index 00000000..7016263f --- /dev/null +++ b/parser/src/main/scala/aqua/parser/expr/func/ParSeqExpr.scala @@ -0,0 +1,39 @@ +package aqua.parser.expr.func + +import aqua.parser.Expr +import aqua.parser.expr.* +import aqua.parser.lexer.Token.{`parseq`, *} +import aqua.parser.lexer.{Name, ValueToken} +import aqua.parser.lift.LiftParser +import aqua.parser.lift.LiftParser.* +import cats.parse.Parser as P +import cats.syntax.comonad.* +import cats.{~>, Comonad} +import aqua.parser.lift.Span +import aqua.parser.lift.Span.{P0ToSpan, PToSpan} + +case class ParSeqExpr[F[_]]( + item: Name[F], + iterable: ValueToken[F], + peerId: ValueToken[F], + via: List[ValueToken[F]] +) extends Expr[F](ParSeqExpr, item) { + + override def mapK[K[_]: Comonad](fk: F ~> K): ParSeqExpr[K] = + copy(item.mapK(fk), iterable.mapK(fk), peerId.mapK(fk), via.map(_.mapK(fk))) +} + +object ParSeqExpr extends Expr.AndIndented { + + override def validChildren: List[Expr.Lexem] = ArrowExpr.funcChildren + + private lazy val parseqPart = (`parseq` *> ` ` *> Name.p <* ` <- `) ~ ValueToken.`value` + + private lazy val onPart = + `on` *> ` ` *> ValueToken.`value` ~ (` ` *> `via` *> ` ` *> ValueToken.`value`).rep0 + + override def p: P[ParSeqExpr[Span.S]] = + ((parseqPart <* ` `) ~ onPart).map { case ((item, iterable), (peerId, via)) => + ParSeqExpr(item, iterable, peerId, via) + } +} diff --git a/parser/src/main/scala/aqua/parser/lexer/Token.scala b/parser/src/main/scala/aqua/parser/lexer/Token.scala index fee1a1cb..9eac6e85 100644 --- a/parser/src/main/scala/aqua/parser/lexer/Token.scala +++ b/parser/src/main/scala/aqua/parser/lexer/Token.scala @@ -65,6 +65,7 @@ object Token { val `try`: P[Unit] = P.string("try") val `catch`: P[Unit] = P.string("catch") val `par`: P[Unit] = P.string("par") + val `parseq`: P[Unit] = P.string("parseq") val `co`: P[Unit] = P.string("co") val `join`: P[Unit] = P.string("join") val `copy`: P[Unit] = P.string("copy") diff --git a/parser/src/test/scala/aqua/AquaSpec.scala b/parser/src/test/scala/aqua/AquaSpec.scala index 4a5145d4..e0c53262 100644 --- a/parser/src/test/scala/aqua/AquaSpec.scala +++ b/parser/src/test/scala/aqua/AquaSpec.scala @@ -124,6 +124,9 @@ trait AquaSpec extends EitherValues { def parseOn(str: String): OnExpr[Id] = OnExpr.p.parseAll(str).value.mapK(spanToId) + def parseParSeq(str: String): ParSeqExpr[Id] = + ParSeqExpr.p.parseAll(str).value.mapK(spanToId) + def parseReturn(str: String): ReturnExpr[Id] = ReturnExpr.p.parseAll(str).value.mapK(spanToId) diff --git a/parser/src/test/scala/aqua/parser/ParSecExprSpec.scala b/parser/src/test/scala/aqua/parser/ParSecExprSpec.scala new file mode 100644 index 00000000..b3dfd9ff --- /dev/null +++ b/parser/src/test/scala/aqua/parser/ParSecExprSpec.scala @@ -0,0 +1,36 @@ +package aqua.parser + +import aqua.AquaSpec +import aqua.parser.expr.func.ParSeqExpr +import cats.Id +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class ParSeqExprSpec extends AnyFlatSpec with Matchers with AquaSpec { + + import AquaSpec.* + + "parseq" should "be parsed" in { + parseParSeq("parseq s <- strings on \"peerId\"") should be( + ParSeqExpr[Id](toName("s"), toVar("strings"), toStr("peerId"), Nil) + ) + + parseParSeq("parseq s <- strings on \"peerId\" via \"relay\"") should be( + ParSeqExpr[Id](toName("s"), toVar("strings"), toStr("peerId"), toStr("relay") :: Nil) + ) + + parseParSeq("parseq s <- strings on \"peerId\" via \"relay\" via \"relay2\"") should be( + ParSeqExpr[Id]( + toName("s"), + toVar("strings"), + toStr("peerId"), + toStr("relay") :: toStr("relay2") :: Nil + ) + ) + + parseParSeq("parseq s <- strings on peerId via relay") should be( + ParSeqExpr[Id](toName("s"), toVar("strings"), toVar("peerId"), toVar("relay") :: Nil) + ) + + } +} diff --git a/semantics/src/main/scala/aqua/semantics/ExprSem.scala b/semantics/src/main/scala/aqua/semantics/ExprSem.scala index 84537232..7fd390f8 100644 --- a/semantics/src/main/scala/aqua/semantics/ExprSem.scala +++ b/semantics/src/main/scala/aqua/semantics/ExprSem.scala @@ -47,6 +47,7 @@ object ExprSem { case expr: CatchExpr[S] => new CatchSem(expr).program[G] case expr: ElseOtherwiseExpr[S] => new ElseOtherwiseSem(expr).program[G] case expr: ParExpr[S] => new ParSem(expr).program[G] + case expr: ParSeqExpr[S] => new ParSeqSem(expr).program[G] case expr: CoExpr[S] => new CoSem(expr).program[G] case expr: JoinExpr[S] => new JoinSem(expr).program[G] case expr: ReturnExpr[S] => new ReturnSem(expr).program[G] diff --git a/semantics/src/main/scala/aqua/semantics/expr/func/ForSem.scala b/semantics/src/main/scala/aqua/semantics/expr/func/ForSem.scala index a48a8d19..427125ea 100644 --- a/semantics/src/main/scala/aqua/semantics/expr/func/ForSem.scala +++ b/semantics/src/main/scala/aqua/semantics/expr/func/ForSem.scala @@ -2,9 +2,10 @@ package aqua.semantics.expr.func import aqua.raw.Raw import aqua.parser.expr.func.ForExpr +import aqua.parser.lexer.{Name, ValueToken} import aqua.raw.value.ValueRaw import aqua.raw.ops.* -import aqua.raw.ops.ForTag.WaitMode +import aqua.raw.ops.ForTag import aqua.semantics.Prog import aqua.semantics.rules.ValuesAlgebra import aqua.semantics.rules.abilities.AbilitiesAlgebra @@ -23,7 +24,7 @@ import cats.syntax.option.* class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal { - def program[F[_]: Monad](implicit + def program[F[_]: Monad](using V: ValuesAlgebra[S, F], N: NamesAlgebra[S, F], T: TypesAlgebra[S, F], @@ -31,17 +32,7 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal { ): Prog[F, Raw] = Prog .around( - V.valueToRaw(expr.iterable).flatMap { - case Some(vm) => - vm.`type` match { - case t: BoxType => - N.define(expr.item, t.element).as(vm.some) - case dt => - T.ensureTypeMatches(expr.iterable, ArrayType(dt), dt).as(none) - } - - case _ => none.pure - }, + ForSem.beforeFor(expr.item, expr.iterable), // Without type of ops specified // scala compiler fails to compile this (iterable, ops: Raw) => @@ -53,7 +44,7 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal { case ForExpr.Mode.TryMode => TryTag } - val mode = expr.mode.collect { case ForExpr.Mode.ParMode => WaitMode } + val mode = expr.mode.collect { case ForExpr.Mode.ParMode => ForTag.Mode.Wait } val forTag = ForTag(expr.item.value, vm, mode).wrap( innerTag.wrap( @@ -72,3 +63,22 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal { .namesScope(expr.token) .abilitiesScope(expr.token) } + +object ForSem { + + def beforeFor[S[_], F[_]: Monad](item: Name[S], iterable: ValueToken[S])(implicit + V: ValuesAlgebra[S, F], + N: NamesAlgebra[S, F], + T: TypesAlgebra[S, F] + ): F[Option[ValueRaw]] = + V.valueToRaw(iterable).flatMap { + case Some(vm) => + vm.`type` match { + case t: BoxType => + N.define(item, t.element).as(vm.some) + case dt => + T.ensureTypeMatches(iterable, ArrayType(dt), dt).as(none) + } + case _ => none.pure + } +} diff --git a/semantics/src/main/scala/aqua/semantics/expr/func/OnSem.scala b/semantics/src/main/scala/aqua/semantics/expr/func/OnSem.scala index f39c8d44..ea9c539d 100644 --- a/semantics/src/main/scala/aqua/semantics/expr/func/OnSem.scala +++ b/semantics/src/main/scala/aqua/semantics/expr/func/OnSem.scala @@ -2,12 +2,12 @@ package aqua.semantics.expr.func import aqua.raw.ops.{FuncOp, OnTag} import aqua.parser.expr.func.OnExpr +import aqua.parser.lexer.ValueToken import aqua.raw.Raw import aqua.raw.value.ValueRaw import aqua.semantics.Prog import aqua.semantics.rules.ValuesAlgebra import aqua.semantics.rules.abilities.AbilitiesAlgebra -import aqua.semantics.rules.topology.TopologyAlgebra import aqua.semantics.rules.types.TypesAlgebra import aqua.types.{BoxType, OptionType, ScalarType} import cats.data.Chain @@ -25,41 +25,48 @@ class OnSem[S[_]](val expr: OnExpr[S]) extends AnyVal { T: TypesAlgebra[S, Alg], A: AbilitiesAlgebra[S, Alg] ): Prog[Alg, Raw] = - Prog.around( - ( - V.ensureIsString(expr.peerId), - expr.via - .traverse(v => - V.valueToRaw(v).flatTap { - case Some(vm) => - vm.`type` match { - case _: BoxType => - T.ensureTypeMatches(v, OptionType(ScalarType.string), vm.`type`) - case _ => - T.ensureTypeMatches(v, ScalarType.string, vm.`type`) - } - case None => false.pure[Alg] - } - ) - .map(_.flatten) - ).mapN { case (_, viaVM) => - viaVM - } - <* A.beginScope(expr.peerId), - (viaVM: List[ValueRaw], ops: Raw) => - A.endScope() >> (ops match { - case FuncOp(op) => - V.valueToRaw(expr.peerId).map { - case Some(om) => - OnTag( - om, - Chain.fromSeq(viaVM) - ).wrap(op).toFuncOp - case _ => - Raw.error("OnSem: Impossible error") - } + Prog + .around( + OnSem.beforeOn(expr.peerId, expr.via), + (viaVM: List[ValueRaw], ops: Raw) => + ops match { + case FuncOp(op) => + V.valueToRaw(expr.peerId).map { + case Some(om) => + OnTag( + om, + Chain.fromSeq(viaVM) + ).wrap(op).toFuncOp + case _ => + Raw.error("OnSem: Impossible error") + } + + case m => Raw.error("On body is not an op, it's " + m).pure[Alg] + } + ) + .abilitiesScope(expr.peerId) +} + +object OnSem { + + def beforeOn[S[_], Alg[_]: Monad](peerId: ValueToken[S], via: List[ValueToken[S]])(implicit + V: ValuesAlgebra[S, Alg], + T: TypesAlgebra[S, Alg], + A: AbilitiesAlgebra[S, Alg] + ): Alg[List[ValueRaw]] = + V.ensureIsString(peerId) *> via + .traverse(v => + V.valueToRaw(v).flatTap { + case Some(vm) => + vm.`type` match { + case _: BoxType => + T.ensureTypeMatches(v, OptionType(ScalarType.string), vm.`type`) + case _ => + T.ensureTypeMatches(v, ScalarType.string, vm.`type`) + } + case None => false.pure[Alg] + } + ) + .map(_.flatten) - case m => Raw.error("On body is not an op, it's " + m).pure[Alg] - }) - ) } diff --git a/semantics/src/main/scala/aqua/semantics/expr/func/ParSeqSem.scala b/semantics/src/main/scala/aqua/semantics/expr/func/ParSeqSem.scala new file mode 100644 index 00000000..e17871f2 --- /dev/null +++ b/semantics/src/main/scala/aqua/semantics/expr/func/ParSeqSem.scala @@ -0,0 +1,77 @@ +package aqua.semantics.expr.func + +import aqua.raw.Raw +import aqua.parser.expr.func.ParSeqExpr +import aqua.raw.value.ValueRaw +import aqua.raw.ops.* +import aqua.raw.ops.ForTag +import aqua.semantics.Prog +import aqua.semantics.rules.ValuesAlgebra +import aqua.semantics.rules.abilities.AbilitiesAlgebra +import aqua.semantics.rules.names.NamesAlgebra +import aqua.semantics.rules.types.TypesAlgebra +import aqua.types.{ArrayType, BoxType, StreamType} + +import cats.Monad +import cats.data.Chain +import cats.syntax.option.* +import cats.syntax.applicative.* +import cats.syntax.apply.* +import cats.syntax.flatMap.* +import cats.syntax.functor.* + +class ParSeqSem[S[_]](val expr: ParSeqExpr[S]) extends AnyVal { + + def program[F[_]: Monad](implicit + V: ValuesAlgebra[S, F], + N: NamesAlgebra[S, F], + T: TypesAlgebra[S, F], + A: AbilitiesAlgebra[S, F] + ): Prog[F, Raw] = + Prog + .around( + ( + ForSem.beforeFor(expr.item, expr.iterable), + OnSem.beforeOn(expr.peerId, expr.via) + ).tupled, + // Without type of ops specified + // scala compiler fails to compile this + (iterableVia, ops: Raw) => { + val (iterableVM, viaVM) = iterableVia + after(iterableVM, viaVM, ops) + } + ) + .namesScope(expr.token) + .abilitiesScope(expr.token) + + private def after[F[_]: Monad]( + iterableVM: Option[ValueRaw], + viaVM: List[ValueRaw], + ops: Raw + )(using + V: ValuesAlgebra[S, F], + N: NamesAlgebra[S, F], + T: TypesAlgebra[S, F], + A: AbilitiesAlgebra[S, F] + ): F[Raw] = + V.valueToRaw(expr.peerId).map((_, iterableVM, ops)).flatMap { + case (Some(peerId), Some(vm), FuncOp(op)) => + for { + restricted <- FuncOpSem.restrictStreamsInScope(op) + onTag = OnTag( + peerId = peerId, + via = Chain.fromSeq(viaVM), + strategy = OnTag.ReturnStrategy.Relay.some + ) + tag = ForTag(expr.item.value, vm).wrap( + ParTag.wrap( + onTag.wrap(restricted), + NextTag(expr.item.value).leaf + ) + ) + } yield tag.toFuncOp + case (None, _, _) => Raw.error("ParSeqSem: could not resolve `peerId`").pure + case (_, None, _) => Raw.error("ParSeqSem: could not resolve `iterable`").pure + case (_, _, _) => Raw.error("ParSeqSem: wrong body of `parseq` block").pure + } +} diff --git a/semantics/src/main/scala/aqua/semantics/rules/abilities/AbilitiesInterpreter.scala b/semantics/src/main/scala/aqua/semantics/rules/abilities/AbilitiesInterpreter.scala index cc14231b..6ff60c63 100644 --- a/semantics/src/main/scala/aqua/semantics/rules/abilities/AbilitiesInterpreter.scala +++ b/semantics/src/main/scala/aqua/semantics/rules/abilities/AbilitiesInterpreter.scala @@ -1,19 +1,16 @@ package aqua.semantics.rules.abilities -import aqua.parser.lexer.{Ability, Name, NamedTypeToken, Token, ValueToken} -import aqua.raw.ServiceRaw -import aqua.raw.RawContext +import aqua.parser.lexer.{Name, NamedTypeToken, Token, ValueToken} import aqua.raw.value.ValueRaw +import aqua.raw.{RawContext, ServiceRaw} import aqua.semantics.Levenshtein -import aqua.semantics.rules.definitions.DefinitionsAlgebra -import aqua.semantics.rules.locations.LocationsAlgebra -import aqua.semantics.rules.{abilities, StackInterpreter} import aqua.semantics.rules.errors.ReportErrors +import aqua.semantics.rules.locations.LocationsAlgebra +import aqua.semantics.rules.{StackInterpreter, abilities} import aqua.types.ArrowType -import cats.data.{NonEmptyList, NonEmptyMap, State} +import cats.data.{NonEmptyMap, State} import cats.syntax.functor.* import cats.syntax.traverse.* -import cats.~> import monocle.Lens import monocle.macros.GenLens @@ -29,7 +26,7 @@ class AbilitiesInterpreter[S[_], X](implicit GenLens[AbilitiesState[S]](_.stack) ) - import stackInt.{getState, mapStackHead, mapStackHeadE, modify, report, setState} + import stackInt.{getState, mapStackHead, modify, report} override def defineService( name: NamedTypeToken[S], diff --git a/semantics/src/main/scala/aqua/semantics/rules/topology/TopologyAlgebra.scala b/semantics/src/main/scala/aqua/semantics/rules/topology/TopologyAlgebra.scala deleted file mode 100644 index e32bb63d..00000000 --- a/semantics/src/main/scala/aqua/semantics/rules/topology/TopologyAlgebra.scala +++ /dev/null @@ -1,11 +0,0 @@ -package aqua.semantics.rules.topology - -import aqua.parser.expr.func.OnExpr -import aqua.parser.lexer.Token - -trait TopologyAlgebra[S[_], Alg[_]] { - - def beginScope(token: OnExpr[S]): Alg[Unit] - - def endScope(): Alg[Unit] -} diff --git a/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala b/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala index 4ca3caa8..391244dd 100644 --- a/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala +++ b/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala @@ -16,7 +16,12 @@ import cats.~> import cats.data.Chain import cats.data.NonEmptyChain import cats.syntax.show.* +import cats.syntax.traverse.* +import cats.syntax.foldable.* import cats.data.Validated +import cats.free.Cofree +import cats.data.State +import cats.Eval class SemanticsSpec extends AnyFlatSpec with Matchers with Inside { @@ -45,6 +50,34 @@ class SemanticsSpec extends AnyFlatSpec with Matchers with Inside { } } + def matchSubtree(tree: RawTag.Tree)( + matcher: PartialFunction[(RawTag, RawTag.Tree), Any] + ): Unit = { + def evalMatch(t: RawTag.Tree): Eval[Option[Unit]] = + if (matcher.isDefinedAt((t.head, t))) + Eval.now( + Some( + matcher((t.head, t)) + ) + ) + else + t.tail.flatMap( + _.collectFirstSomeM(evalMatch) + ) + + evalMatch(tree).value.getOrElse(fail(s"Did not match subtree")) + } + + def matchChildren(tree: RawTag.Tree)( + matchers: PartialFunction[(RawTag, RawTag.Tree), Any]* + ): Unit = { + val children = tree.tail.value + children should have size matchers.length + children.toList.zip(matchers).foreach { case (child, matcher) => + matcher.lift((child.head, child)).getOrElse(fail(s"Unexpected child $child")) + } + } + val testServiceDef = """ |service Test("test"): | testCall() @@ -526,4 +559,31 @@ class SemanticsSpec extends AnyFlatSpec with Matchers with Inside { body.equalsOrShowDiff(expected) should be(true) } } + + it should "generate right model for `parseq`" in { + val script = + testServiceDef + """ + |data Peer: + | peer: string + | relay: string + | + |func test(): + | peers = [Peer(peer="a", relay="b"), Peer(peer="c", relay="d")] + | parseq p <- peers on p.peer via p.relay: + | Test.testCallStr(p.peer) + |""".stripMargin + + insideBody(script) { body => + matchSubtree(body) { case (ForTag("p", _, None), forTag) => + matchChildren(forTag) { case (ParTag, parTag) => + matchChildren(parTag)( + { case (OnTag(_, _, strat), _) => + strat shouldBe Some(OnTag.ReturnStrategy.Relay) + }, + { case (NextTag("p"), _) => } + ) + } + } + } + } }