From c1fe24b04d8a2f711ed7b316e7ae9a4f12732421 Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Thu, 6 Jul 2023 11:44:37 +0200 Subject: [PATCH] feat(compiler): Make topology hop with non-FFI snippet [fixes LNG-125] (#764) * Remove MakeRes.canon * Replace noop with hop * Rewrite join * Remove JoinModel, fix tests * Share code between tests * Pass type in RestrictionTag(Model) * Fix MakeRes.hop * Fix wrapping * Rename vars, add comments * Fix XorBranch topology * Fix tests --- .../src/main/scala/aqua/backend/air/Air.scala | 10 +- .../main/scala/aqua/backend/air/AirGen.scala | 30 ++-- build.sbt | 4 +- .../aqua/compiler/AquaCompilerSpec.scala | 87 +++++----- integration-tests/aqua/examples/foldJoin.aqua | 4 +- .../scala/aqua/model/inline/TagInliner.scala | 4 +- .../inline/raw/ApplyGateRawInliner.scala | 158 ++++++++++-------- .../raw/ApplyPropertiesRawInliner.scala | 4 +- .../inline/raw/CollectionRawInliner.scala | 45 ++--- .../aqua/model/inline/ArrowInlinerSpec.scala | 8 +- .../inline/CollectionRawInlinerSpec.scala | 10 +- .../src/main/scala/aqua/raw/ops/RawTag.scala | 4 +- .../res/src/main/scala/aqua/res/MakeRes.scala | 77 +++++---- .../src/main/scala/aqua/res/ResolvedOp.scala | 11 +- .../src/test/scala/aqua/res/ResBuilder.scala | 47 ++++++ model/src/main/scala/aqua/model/OpModel.scala | 14 +- .../model/transform/topology/Topology.scala | 9 +- .../transform/topology/strategy/After.scala | 10 +- .../topology/strategy/XorBranch.scala | 41 +++-- .../aqua/model/transform/ModelBuilder.scala | 35 +++- .../transform/topology/TopologySpec.scala | 118 +++++++------ .../src/main/scala/aqua/tree/TreeNode.scala | 13 +- .../aqua/semantics/expr/func/ArrowSem.scala | 3 +- .../aqua/semantics/expr/func/ForSem.scala | 9 +- types/src/main/scala/aqua/types/Type.scala | 10 ++ 25 files changed, 455 insertions(+), 310 deletions(-) create mode 100644 model/res/src/test/scala/aqua/res/ResBuilder.scala diff --git a/backend/air/src/main/scala/aqua/backend/air/Air.scala b/backend/air/src/main/scala/aqua/backend/air/Air.scala index 69821d16..b2ed57c7 100644 --- a/backend/air/src/main/scala/aqua/backend/air/Air.scala +++ b/backend/air/src/main/scala/aqua/backend/air/Air.scala @@ -46,8 +46,6 @@ object DataView { case class Variable(name: String) extends DataView - case class Stream(name: String) extends DataView - case class VarLens(name: String, lens: String, isField: Boolean = true) extends DataView { def append(sublens: String): VarLens = copy(lens = lens + sublens) } @@ -57,7 +55,6 @@ object DataView { case InitPeerId ⇒ "%init_peer_id%" case LastError ⇒ "%last_error%" case Variable(name) ⇒ name - case Stream(name) ⇒ name case VarLens(name, lens, isField) ⇒ if (isField) name + ".$" + lens else name + lens @@ -90,7 +87,12 @@ object Air { case class Next(label: String) extends Air(Keyword.Next) - case class Fold(iterable: DataView, label: String, instruction: Air, lastNextInstruction: Option[Air]) extends Air(Keyword.Fold) + case class Fold( + iterable: DataView, + label: String, + instruction: Air, + lastNextInstruction: Option[Air] + ) extends Air(Keyword.Fold) case class Match(left: DataView, right: DataView, instruction: Air) extends Air(Keyword.Match) diff --git a/backend/air/src/main/scala/aqua/backend/air/AirGen.scala b/backend/air/src/main/scala/aqua/backend/air/AirGen.scala index 51552cd6..83c065bb 100644 --- a/backend/air/src/main/scala/aqua/backend/air/AirGen.scala +++ b/backend/air/src/main/scala/aqua/backend/air/AirGen.scala @@ -3,7 +3,7 @@ package aqua.backend.air import aqua.model.* import aqua.raw.ops.Call import aqua.res.* -import aqua.types.{ArrayType, CanonStreamType, StreamType} +import aqua.types.{ArrayType, CanonStreamType, StreamType, Type} import cats.Eval import cats.data.Chain import cats.free.Cofree @@ -26,14 +26,17 @@ object AirGen extends Logging { s".[$idx]${propertyToString(tail)}" } + def varNameToString(name: String, `type`: Type): String = + (`type` match { + case _: StreamType => "$" + name + case _: CanonStreamType => "#" + name + case _ => name + }).replace('.', '_') + def valueToData(vm: ValueModel): DataView = vm match { case LiteralModel(value, _) => DataView.StringScalar(value) case VarModel(name, t, property) => - val n = (t match { - case _: StreamType => "$" + name - case _: CanonStreamType => "#" + name - case _ => name - }).replace('.', '_') + val n = varNameToString(name, t) if (property.isEmpty) DataView.Variable(n) else { val functors = property.find { @@ -97,8 +100,8 @@ object AirGen extends Logging { case ForModel.NeverMode => NeverGen } Eval later ForGen(valueToData(iterable), item, opsToSingle(ops), m) - case RestrictionRes(item, isStream) => - Eval later NewGen(item, isStream, opsToSingle(ops)) + case RestrictionRes(item, itemType) => + Eval later NewGen(varNameToString(item, itemType), opsToSingle(ops)) case CallServiceRes(serviceId, funcName, CallRes(args, exportTo), peerId) => Eval.later( ServiceCallGen( @@ -179,14 +182,17 @@ case class MatchMismatchGen( else Air.Mismatch(left, right, body.generate) } -case class ForGen(iterable: DataView, item: String, body: AirGen, mode: Option[AirGen]) extends AirGen { +case class ForGen(iterable: DataView, item: String, body: AirGen, mode: Option[AirGen]) + extends AirGen { override def generate: Air = Air.Fold(iterable, item, body.generate, mode.map(_.generate)) } -case class NewGen(item: String, isStream: Boolean, body: AirGen) extends AirGen { +case class NewGen(name: String, body: AirGen) extends AirGen { - override def generate: Air = - Air.New(if (isStream) DataView.Stream("$" + item) else DataView.Variable(item), body.generate) + override def generate: Air = Air.New( + DataView.Variable(name), + body.generate + ) } case class NextGen(item: String) extends AirGen { diff --git a/build.sbt b/build.sbt index 3b7eb867..29073bbf 100644 --- a/build.sbt +++ b/build.sbt @@ -233,7 +233,7 @@ lazy val transform = crossProject(JVMPlatform, JSPlatform) .crossType(CrossType.Pure) .in(file("model/transform")) .settings(commons: _*) - .dependsOn(model, res, inline) + .dependsOn(model, res, inline, res % "test->test") lazy val semantics = crossProject(JVMPlatform, JSPlatform) .withoutSuffixFor(JVMPlatform) @@ -252,7 +252,7 @@ lazy val compiler = crossProject(JVMPlatform, JSPlatform) .crossType(CrossType.Pure) .in(file("compiler")) .settings(commons: _*) - .dependsOn(semantics, linker, backend, transform % Test) + .dependsOn(semantics, linker, backend, transform % Test, res % "test->test") lazy val backend = crossProject(JVMPlatform, JSPlatform) .withoutSuffixFor(JVMPlatform) diff --git a/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala b/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala index 5ee59561..5380985d 100644 --- a/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala +++ b/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala @@ -1,6 +1,14 @@ package aqua.compiler -import aqua.model.{CallModel, ForModel, FunctorModel, IntoIndexModel, LiteralModel, ValueModel, VarModel} +import aqua.model.{ + CallModel, + ForModel, + FunctorModel, + IntoIndexModel, + LiteralModel, + ValueModel, + VarModel +} import aqua.model.transform.TransformConfig import aqua.model.transform.Transform import aqua.parser.ParserError @@ -10,7 +18,21 @@ import aqua.parser.lift.Span import aqua.parser.lift.Span.S import aqua.raw.ConstantRaw import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw} -import aqua.res.{ApRes, CallRes, CallServiceRes, CanonRes, FoldRes, MakeRes, MatchMismatchRes, NextRes, ParRes, RestrictionRes, SeqRes, XorRes} +import aqua.res.{ + ApRes, + CallRes, + CallServiceRes, + CanonRes, + FoldRes, + MakeRes, + MatchMismatchRes, + NextRes, + ParRes, + RestrictionRes, + SeqRes, + XorRes +} +import aqua.res.ResBuilder import aqua.types.{ArrayType, CanonStreamType, LiteralType, ScalarType, StreamType, Type} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -83,8 +105,8 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { } - def through(peer: ValueModel, log: String = null) = - MakeRes.noop(peer, log) + def through(peer: ValueModel) = + MakeRes.hop(peer) val relay = VarRaw("-relay-", ScalarType.string) @@ -97,42 +119,10 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { ).leaf } - val init = LiteralModel.fromRaw(ValueRaw.InitPeerId) + private val init = LiteralModel.fromRaw(ValueRaw.InitPeerId) - private def join(vm: VarModel, length: ValueModel) = { - val testVM = VarModel(vm.name + "_test", vm.`type`) - val iter = VarModel(vm.name + "_fold_var", ScalarType.string) - val canon = VarModel(vm.name + "_iter_canon", CanonStreamType(ScalarType.string)) - val canonRes = VarModel(vm.name + "_result_canon", CanonStreamType(ScalarType.string)) - val arrayRes = VarModel(vm.name + "_gate", ArrayType(ScalarType.string)) - val idx = VarModel(vm.name + "_incr", ScalarType.u32) - - RestrictionRes(testVM.name, true).wrap( - CallServiceRes( - LiteralModel("\"math\"", ScalarType.string), - "add", - CallRes( - length :: LiteralModel.fromRaw(LiteralRaw.number(1)) :: Nil, - Some(CallModel.Export(idx.name, idx.`type`)) - ), - init - ).leaf, - FoldRes(iter.name, vm, Some(ForModel.NeverMode)).wrap( - ApRes(iter, CallModel.Export(testVM.name, testVM.`type`)).leaf, - CanonRes(testVM, init, CallModel.Export(canon.name, canon.`type`)).leaf, - XorRes.wrap( - MatchMismatchRes( - canon.copy(properties = Chain.one(FunctorModel("length", ScalarType.u32))), - idx, - true - ).leaf, - NextRes(iter.name).leaf - ) - ), - CanonRes(testVM, init, CallModel.Export(canonRes.name, canonRes.`type`)).leaf, - ApRes(canonRes, CallModel.Export(arrayRes.name, arrayRes.`type`)).leaf - ) - } + private def join(vm: VarModel, idx: ValueModel) = + ResBuilder.join(vm, idx, init) "aqua compiler" should "create right topology" in { @@ -177,7 +167,7 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { SeqRes.wrap( getDataSrv("-relay-", ScalarType.string), getDataSrv(peers.name, peers.`type`), - RestrictionRes("results", true).wrap( + RestrictionRes(results.name, resultsType).wrap( SeqRes.wrap( ParRes.wrap( FoldRes(peer.name, peers, Some(ForModel.NeverMode)).wrap( @@ -203,10 +193,10 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { ), join(results, LiteralModel.fromRaw(LiteralRaw.number(2))), CanonRes(results, init, CallModel.Export(canonResult.name, canonResult.`type`)).leaf, - ApRes( - canonResult, - CallModel.Export(flatResult.name, flatResult.`type`) - ).leaf + ApRes( + canonResult, + CallModel.Export(flatResult.name, flatResult.`type`) + ).leaf ) ), CallServiceRes( @@ -283,23 +273,24 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { val Some(funcWrap) = aquaRes.funcs.find(_.funcName == "wrap") val Some(barfoo) = aquaRes.funcs.find(_.funcName == "barfoo") - val resVM = VarModel("res", StreamType(ScalarType.string)) + val resStreamType = StreamType(ScalarType.string) + val resVM = VarModel("res", resStreamType) val resCanonVM = VarModel("-res-fix-0", CanonStreamType(ScalarType.string)) val resFlatVM = VarModel("-res-flat-0", ArrayType(ScalarType.string)) barfoo.body.equalsOrShowDiff( SeqRes.wrap( - RestrictionRes("res", true).wrap( + RestrictionRes(resVM.name, resStreamType).wrap( SeqRes.wrap( // res <- foo() ApRes( LiteralModel.fromRaw(LiteralRaw.quote("I am MyFooBar foo")), - CallModel.Export("res", StreamType(ScalarType.string)) + CallModel.Export(resVM.name, resVM.`type`) ).leaf, // res <- bar() ApRes( LiteralModel.fromRaw(LiteralRaw.quote(" I am MyFooBar bar")), - CallModel.Export("res", StreamType(ScalarType.string)) + CallModel.Export(resVM.name, resVM.`type`) ).leaf, // canonicalization CanonRes( diff --git a/integration-tests/aqua/examples/foldJoin.aqua b/integration-tests/aqua/examples/foldJoin.aqua index 20b77173..3548e058 100644 --- a/integration-tests/aqua/examples/foldJoin.aqua +++ b/integration-tests/aqua/examples/foldJoin.aqua @@ -15,7 +15,5 @@ func getTwoResults(node: string) -> []u64: on n: try: res <- Peer.timestamp_sec() - Op2.identity(res!) - Op2.identity(res!1) - Op2.identity(res!2) + join res!2 <- res \ No newline at end of file diff --git a/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala b/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala index 0a7d9e2d..20c2bbd5 100644 --- a/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala @@ -351,8 +351,8 @@ object TagInliner extends Logging { } } yield model.fold(TagInlined.Empty())(m => TagInlined.Single(model = m)) - case RestrictionTag(name, isStream) => - pure(RestrictionModel(name, isStream)) + case RestrictionTag(name, typ) => + pure(RestrictionModel(name, typ)) case DeclareStreamTag(value) => value match diff --git a/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala b/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala index 2413ac52..2830ee86 100644 --- a/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/raw/ApplyGateRawInliner.scala @@ -7,12 +7,92 @@ import aqua.raw.value.{ApplyGateRaw, LiteralRaw, VarRaw} import cats.data.State import cats.data.Chain import aqua.model.inline.RawValueInliner.unfold -import aqua.types.{CanonStreamType, ScalarType, StreamType, ArrayType} +import aqua.types.{ArrayType, CanonStreamType, ScalarType, StreamType} import cats.syntax.monoid.* import scribe.Logging object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging { + /** + * To wait for the element of a stream by the given index, the following model is generated: + * (seq + * (seq + * (seq + * (call ("math" "add") [0 1] stream_incr) + * (fold $stream s + * (seq + * (seq + * (ap s $stream_test) + * (canon $stream_test #stream_iter_canon) + * ) + * (xor + * (match #stream_iter_canon.length stream_incr + * (null) + * ) + * (next s) + * ) + * ) + * (never) + * ) + * ) + * (canon $stream_test #stream_result_canon) + * ) + * (ap #stream_result_canon stream_gate) + * ) + */ + def joinStreamOnIndexModel( + streamName: String, + streamType: StreamType, + idxModel: ValueModel, + idxIncrName: String, + testName: String, + iterName: String, + canonName: String, + iterCanonName: String, + resultName: String + ): OpModel.Tree = { + val varSTest = VarModel(testName, streamType) + val iter = VarModel(iterName, streamType.element) + + val iterCanon = VarModel(iterCanonName, CanonStreamType(streamType.element)) + + val resultCanon = + VarModel(canonName, CanonStreamType(streamType.element)) + + val incrVar = VarModel(idxIncrName, ScalarType.u32) + + RestrictionModel(varSTest.name, streamType).wrap( + increment(idxModel, incrVar), + ForModel(iter.name, VarModel(streamName, streamType), Some(ForModel.NeverMode)).wrap( + PushToStreamModel( + iter, + CallModel.Export(varSTest.name, varSTest.`type`) + ).leaf, + CanonicalizeModel( + varSTest, + CallModel.Export(iterCanon.name, iterCanon.`type`) + ).leaf, + XorModel.wrap( + MatchMismatchModel( + iterCanon + .copy(properties = Chain.one(FunctorModel("length", ScalarType.`u32`))), + incrVar, + true + ).leaf, + NextModel(iter.name).leaf + ) + ), + CanonicalizeModel( + varSTest, + CallModel.Export(resultCanon.name, CanonStreamType(streamType.element)) + ).leaf, + FlattenModel( + resultCanon, + resultName + ).leaf + ) + } + override def apply[S: Mangler: Exports: Arrows]( afr: ApplyGateRaw, propertyAllowed: Boolean @@ -27,73 +107,19 @@ object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging { idxFolded <- unfold(afr.idx) (idxModel, idxInline) = idxFolded } yield { - val varSTest = VarModel(uniqueTestName, afr.streamType) - val iter = VarModel(uniqueIter, afr.streamType.element) - - val iterCanon = VarModel(uniqueIterCanon, CanonStreamType(afr.streamType.element)) - - val resultCanon = - VarModel(uniqueCanonName, CanonStreamType(afr.streamType.element)) - - val incrVar = VarModel(uniqueIdxIncr, ScalarType.u32) - - // To wait for the element of a stream by the given index, the following model is generated: - // (seq - // (seq - // (seq - // (call %init_peer_id% ("math" "add") [0 1] stream_incr) - // (fold $stream s - // (seq - // (seq - // (ap s $stream_test) - // (canon %init_peer_id% $stream_test #stream_iter_canon) - // ) - // (xor - // (match #stream_iter_canon.length stream_incr - // (null) - // ) - // (next s) - // ) - // ) - // (never) - // ) - // ) - // (canon %init_peer_id% $stream_test #stream_result_canon) - // ) - // (ap #stream_result_canon stream_gate) - // ) - val gate = RestrictionModel(varSTest.name, true).wrap( - increment(idxModel, incrVar), - ForModel(iter.name, VarModel(afr.name, afr.streamType), Some(ForModel.NeverMode)).wrap( - PushToStreamModel( - iter, - CallModel.Export(varSTest.name, varSTest.`type`) - ).leaf, - CanonicalizeModel( - varSTest, - CallModel.Export(iterCanon.name, iterCanon.`type`) - ).leaf, - XorModel.wrap( - MatchMismatchModel( - iterCanon - .copy(properties = Chain.one(FunctorModel("length", ScalarType.`u32`))), - incrVar, - true - ).leaf, - NextModel(iter.name).leaf - ) - ), - CanonicalizeModel( - varSTest, - CallModel.Export(resultCanon.name, CanonStreamType(afr.streamType.element)) - ).leaf, - FlattenModel( - resultCanon, - uniqueResultName - ).leaf + val gate = joinStreamOnIndexModel( + streamName = afr.name, + streamType = afr.streamType, + idxModel = idxModel, + idxIncrName = uniqueIdxIncr, + testName = uniqueTestName, + iterName = uniqueIter, + canonName = uniqueCanonName, + iterCanonName = uniqueIterCanon, + resultName = uniqueResultName ) - val tree = SeqModel.wrap((idxInline.predo.toList :+ gate):_*) + val tree = SeqModel.wrap(idxInline.predo.toList :+ gate) val treeInline = Inline(idxInline.flattenValues, predo = Chain.one(tree)) diff --git a/model/inline/src/main/scala/aqua/model/inline/raw/ApplyPropertiesRawInliner.scala b/model/inline/src/main/scala/aqua/model/inline/raw/ApplyPropertiesRawInliner.scala index 5e31a0a6..3822d1f6 100644 --- a/model/inline/src/main/scala/aqua/model/inline/raw/ApplyPropertiesRawInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/raw/ApplyPropertiesRawInliner.scala @@ -3,7 +3,6 @@ package aqua.model.inline.raw import aqua.model.{ CallModel, CallServiceModel, - CanonicalizeModel, FlattenModel, ForModel, FunctorModel, @@ -15,7 +14,6 @@ import aqua.model.{ OpModel, PropertyModel, PushToStreamModel, - RestrictionModel, SeqModel, ValueModel, VarModel, @@ -153,7 +151,7 @@ object ApplyPropertiesRawInliner extends RawInliner[ApplyPropertyRaw] with Loggi properties.map { case iir @ IntoIndexRaw(vr, t) => unfold(vr, propertiesAllowed = false).flatMap { - case (vm@VarModel(_, _, _), inline) if vm.properties.nonEmpty => + case (vm @ VarModel(_, _, _), inline) if vm.properties.nonEmpty => removeProperties(vm).map { case (vf, inlf) => PropertyRawWithModel(iir, Option(IntoIndexModel(vf.name, t))) -> Inline( inline.flattenValues ++ inlf.flattenValues, diff --git a/model/inline/src/main/scala/aqua/model/inline/raw/CollectionRawInliner.scala b/model/inline/src/main/scala/aqua/model/inline/raw/CollectionRawInliner.scala index d5aaa047..e13c2b2e 100644 --- a/model/inline/src/main/scala/aqua/model/inline/raw/CollectionRawInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/raw/CollectionRawInliner.scala @@ -24,21 +24,24 @@ object CollectionRawInliner extends RawInliner[CollectionRaw] { raw: CollectionRaw, propertiesAllowed: Boolean ): State[S, (ValueModel, Inline)] = unfoldCollection(raw) - + def unfoldCollection[S: Mangler: Exports: Arrows]( raw: CollectionRaw, assignToName: Option[String] = None ): State[S, (ValueModel, Inline)] = for { - streamName <- - raw.boxType match { - case _: StreamType => assignToName.map(s => State.pure(s)).getOrElse(Mangler[S].findAndForbidName("stream-inline")) - case _: CanonStreamType => Mangler[S].findAndForbidName("canon_stream-inline") - case _: ArrayType => Mangler[S].findAndForbidName("array-inline") - case _: OptionType => Mangler[S].findAndForbidName("option-inline") - } + streamName <- raw.boxType match { + case _: StreamType => + assignToName + .map(s => State.pure(s)) + .getOrElse(Mangler[S].findAndForbidName("stream-inline")) + case _: CanonStreamType => Mangler[S].findAndForbidName("canon_stream-inline") + case _: ArrayType => Mangler[S].findAndForbidName("array-inline") + case _: OptionType => Mangler[S].findAndForbidName("option-inline") + } - stream = VarModel(streamName, StreamType(raw.elementType)) + streamType = StreamType(raw.elementType) + stream = VarModel(streamName, streamType) streamExp = CallModel.Export(stream.name, stream.`type`) valsWithInlines <- raw.values @@ -48,13 +51,13 @@ object CollectionRawInliner extends RawInliner[CollectionRaw] { // push values to the stream, that is gathering the collection vals = valsWithInlines.map { case (v, _) => - PushToStreamModel(v, streamExp).leaf - } + PushToStreamModel(v, streamExp).leaf + } // all inlines will be added before pushing values to the stream inlines = valsWithInlines.flatMap { case (_, t) => - Chain.fromOption(t) - } + Chain.fromOption(t) + } canonName <- if (raw.boxType.isStream) State.pure(streamName) @@ -67,19 +70,19 @@ object CollectionRawInliner extends RawInliner[CollectionRaw] { } yield VarModel(canonName, canon.`type`) -> Inline.tree( raw.boxType match { case ArrayType(_) => - RestrictionModel(streamName, isStream = true).wrap( - SeqModel.wrap((inlines ++ vals :+ CanonicalizeModel(stream, canon).leaf).toList: _*) + RestrictionModel(streamName, streamType).wrap( + SeqModel.wrap(inlines ++ vals :+ CanonicalizeModel(stream, canon).leaf) ) case OptionType(_) => - RestrictionModel(streamName, isStream = true).wrap( + RestrictionModel(streamName, streamType).wrap( SeqModel.wrap( - SeqModel.wrap(inlines.toList:_*), - XorModel.wrap((vals :+ NullModel.leaf).toList: _*), - CanonicalizeModel(stream, canon).leaf + SeqModel.wrap(inlines), + XorModel.wrap(vals :+ NullModel.leaf), + CanonicalizeModel(stream, canon).leaf ) ) - case _ => - SeqModel.wrap((inlines ++ vals).toList: _*) + case _ => + SeqModel.wrap(inlines ++ vals) } ) } diff --git a/model/inline/src/test/scala/aqua/model/inline/ArrowInlinerSpec.scala b/model/inline/src/test/scala/aqua/model/inline/ArrowInlinerSpec.scala index 872e11eb..edacb3e1 100644 --- a/model/inline/src/test/scala/aqua/model/inline/ArrowInlinerSpec.scala +++ b/model/inline/src/test/scala/aqua/model/inline/ArrowInlinerSpec.scala @@ -88,7 +88,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers { .callArrow[InliningState]( FuncArrow( "stream-callback", - RestrictionTag(streamVar.name, true).wrap( + RestrictionTag(streamVar.name, streamType).wrap( SeqTag.wrap( DeclareStreamTag(streamVar).leaf, CallArrowRawTag.func("cb", Call(streamVar :: Nil, Nil)).leaf @@ -115,7 +115,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers { ._2 model.equalsOrShowDiff( - RestrictionModel(streamVar.name, true).wrap( + RestrictionModel(streamVar.name, streamType).wrap( MetaModel .CallArrowModel("cb") .wrap( @@ -191,7 +191,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers { .callArrow[InliningState]( FuncArrow( "stream-callback", - RestrictionTag(streamVar.name, true).wrap( + RestrictionTag(streamVar.name, streamType).wrap( SeqTag.wrap( DeclareStreamTag(streamVar).leaf, CallArrowRawTag.func("cb", Call(streamVarLambda :: Nil, Nil)).leaf @@ -218,7 +218,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers { ._2 model.equalsOrShowDiff( - RestrictionModel(streamVar.name, true).wrap( + RestrictionModel(streamVar.name, streamType).wrap( CallServiceModel( LiteralModel.quote("test-service"), "some-call", diff --git a/model/inline/src/test/scala/aqua/model/inline/CollectionRawInlinerSpec.scala b/model/inline/src/test/scala/aqua/model/inline/CollectionRawInlinerSpec.scala index 2619fa66..07a23e02 100644 --- a/model/inline/src/test/scala/aqua/model/inline/CollectionRawInlinerSpec.scala +++ b/model/inline/src/test/scala/aqua/model/inline/CollectionRawInlinerSpec.scala @@ -7,6 +7,7 @@ import aqua.raw.ops.* import aqua.raw.value.{CollectionRaw, LiteralRaw, MakeStructRaw, VarRaw} import aqua.types.{CanonStreamType, OptionType, ScalarType, StreamType, StructType} import cats.data.{NonEmptyList, NonEmptyMap} +import cats.syntax.show.* import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -25,15 +26,14 @@ class CollectionRawInlinerSpec extends AnyFlatSpec with Matchers { val raw = CollectionRaw(NonEmptyList.of(makeStruct), OptionType(nestedType)) val (v, tree) = - RawValueInliner.valueToModel[InliningState](raw, false).run(InliningState()).value._2 + RawValueInliner.valueToModel[InliningState](raw, false).runA(InliningState()).value val resultValue = VarModel("option-inline-0", CanonStreamType(nestedType)) v shouldBe resultValue - tree.get.equalsOrShowDiff( - // create a stream - RestrictionModel("option-inline", true).wrap( + val expected = + RestrictionModel("option-inline", StreamType(nestedType)).wrap( // create a stream SeqModel.wrap( // create an object CallServiceModel( @@ -61,8 +61,8 @@ class CollectionRawInlinerSpec extends AnyFlatSpec with Matchers { ).leaf ) ) - ) shouldBe true + tree.get.equalsOrShowDiff(expected) shouldBe true } } diff --git a/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala b/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala index 55dd770b..20238e47 100644 --- a/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala +++ b/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala @@ -5,7 +5,7 @@ import aqua.raw.arrow.FuncRaw import aqua.raw.ops.RawTag.Tree import aqua.raw.value.{CallArrowRaw, ValueRaw, VarRaw} import aqua.tree.{TreeNode, TreeNodeCompanion} -import aqua.types.{ArrowType, ProductType} +import aqua.types.{ArrowType, DataType, ProductType} import cats.{Eval, Show} import cats.data.{Chain, NonEmptyList} import cats.free.Cofree @@ -121,7 +121,7 @@ case class NextTag(item: String) extends RawTag { copy(item = map.getOrElse(item, item)) } -case class RestrictionTag(name: String, isStream: Boolean) extends SeqGroupTag { +case class RestrictionTag(name: String, `type`: DataType) extends SeqGroupTag { override def restrictsVarNames: Set[String] = Set(name) diff --git a/model/res/src/main/scala/aqua/res/MakeRes.scala b/model/res/src/main/scala/aqua/res/MakeRes.scala index b32d0cb6..688920ec 100644 --- a/model/res/src/main/scala/aqua/res/MakeRes.scala +++ b/model/res/src/main/scala/aqua/res/MakeRes.scala @@ -6,46 +6,37 @@ import cats.data.{Chain, NonEmptyList} import cats.free.Cofree import aqua.raw.value.{LiteralRaw, ValueRaw} import aqua.model.* +import aqua.types.* -// TODO docs +/** + * Helpers for translating [[OpModel]] to [[ResolvedOp]] + */ object MakeRes { - val op: ValueModel = LiteralModel.fromRaw(LiteralRaw.quote("op")) - def noop(onPeer: ValueModel, log: String = null): ResolvedOp.Tree = - CallServiceRes( - op, - "noop", - CallRes( - Option(log).filter(_ == "").map(LiteralRaw.quote).map(LiteralModel.fromRaw).toList, - None - ), - onPeer - ).leaf + /** + * Make topology hop to peer + * + * @param onPeer peer to make hop to + * @return [[ResolvedOp.Tree]] corresponsing to a hop + */ + def hop(onPeer: ValueModel): ResolvedOp.Tree = { + // Those names can't be produced from compilation + // so they are safe to use + val streamName = "-ephemeral-stream-" + val canonName = "-ephemeral-canon-" + val elementType = BottomType + val streamType = StreamType(elementType) + val canonType = CanonStreamType(elementType) - def canon(onPeer: ValueModel, operand: ValueModel, target: CallModel.Export): ResolvedOp.Tree = - CallServiceRes( - op, - "identity", - CallRes(operand :: Nil, Some(target)), - onPeer - ).leaf - - def join(onPeer: ValueModel, operands: NonEmptyList[ValueModel]): ResolvedOp.Tree = - CallServiceRes( - op, - "noop", - CallRes(operands.toList, None), - onPeer - ).leaf - - private val initPeerId = ValueModel.fromRaw(ValueRaw.InitPeerId) - - private def orInit(currentPeerId: Option[ValueModel]): ValueModel = - currentPeerId.getOrElse(initPeerId) - - private def isNillLiteral(vm: ValueModel): Boolean = vm match { - case LiteralModel(value, t) if value == ValueRaw.Nil.value && t == ValueRaw.Nil.`type` => true - case _ => false + RestrictionRes(streamName, streamType).wrap( + RestrictionRes(canonName, canonType).wrap( + CanonRes( + operand = VarModel(streamName, streamType), + peerId = onPeer, + exportTo = CallModel.Export(canonName, canonType) + ).leaf + ) + ) } def resolve( @@ -56,7 +47,7 @@ object MakeRes { case MatchMismatchModel(a, b, s) => MatchMismatchRes(a, b, s).leaf case ForModel(item, iter, mode) if !isNillLiteral(iter) => FoldRes(item, iter, mode).leaf - case RestrictionModel(item, isStream) => RestrictionRes(item, isStream).leaf + case RestrictionModel(item, itemType) => RestrictionRes(item, itemType).leaf case DetachModel => ParRes.leaf case ParModel => ParRes.leaf case XorModel => XorRes.leaf @@ -85,8 +76,6 @@ object MakeRes { ApRes(operand, CallModel.Export(assignTo, ArrayType(el))).leaf case FlattenModel(operand, assignTo) => ApRes(operand, CallModel.Export(assignTo, operand.`type`)).leaf - case JoinModel(operands) => - join(orInit(currentPeerId), operands) case CallServiceModel(serviceId, funcName, CallModel(args, exportTo)) => CallServiceRes( serviceId, @@ -99,4 +88,14 @@ object MakeRes { NullRes.leaf } + + private val initPeerId = ValueModel.fromRaw(ValueRaw.InitPeerId) + + private def orInit(currentPeerId: Option[ValueModel]): ValueModel = + currentPeerId.getOrElse(initPeerId) + + private def isNillLiteral(vm: ValueModel): Boolean = vm match { + case LiteralModel(value, t) if value == ValueRaw.Nil.value && t == ValueRaw.Nil.`type` => true + case _ => false + } } diff --git a/model/res/src/main/scala/aqua/res/ResolvedOp.scala b/model/res/src/main/scala/aqua/res/ResolvedOp.scala index fa4e09aa..070ffaa5 100644 --- a/model/res/src/main/scala/aqua/res/ResolvedOp.scala +++ b/model/res/src/main/scala/aqua/res/ResolvedOp.scala @@ -3,6 +3,7 @@ package aqua.res import aqua.model.{CallModel, ForModel, ValueModel, VarModel} import aqua.raw.ops.Call import aqua.tree.{TreeNode, TreeNodeCompanion} +import aqua.types.DataType import cats.data.Chain import cats.free.Cofree import cats.Show @@ -31,12 +32,13 @@ case class MatchMismatchRes(left: ValueModel, right: ValueModel, shouldMatch: Bo override def toString: String = s"(${if (shouldMatch) "match" else "mismatch"} $left $right)" } -case class FoldRes(item: String, iterable: ValueModel, mode: Option[ForModel.Mode] = None) extends ResolvedOp { +case class FoldRes(item: String, iterable: ValueModel, mode: Option[ForModel.Mode] = None) + extends ResolvedOp { override def toString: String = s"(fold $iterable $item ${mode.map(_.toString).getOrElse("")}" } -case class RestrictionRes(item: String, isStream: Boolean) extends ResolvedOp { - override def toString: String = s"(new ${if (isStream) "$" else ""}$item " +case class RestrictionRes(item: String, `type`: DataType) extends ResolvedOp { + override def toString: String = s"(new ${`type`.airPrefix}$item " } case class CallServiceRes( @@ -52,7 +54,8 @@ case class ApRes(operand: ValueModel, exportTo: CallModel.Export) extends Resolv override def toString: String = s"(ap $operand $exportTo)" } -case class CanonRes(operand: ValueModel, peerId: ValueModel, exportTo: CallModel.Export) extends ResolvedOp { +case class CanonRes(operand: ValueModel, peerId: ValueModel, exportTo: CallModel.Export) + extends ResolvedOp { override def toString: String = s"(canon $peerId $operand $exportTo)" } diff --git a/model/res/src/test/scala/aqua/res/ResBuilder.scala b/model/res/src/test/scala/aqua/res/ResBuilder.scala new file mode 100644 index 00000000..4bfe108e --- /dev/null +++ b/model/res/src/test/scala/aqua/res/ResBuilder.scala @@ -0,0 +1,47 @@ +package aqua.res + +import aqua.model.* +import aqua.types.* +import aqua.raw.value.* + +import cats.data.Chain + +object ResBuilder { + + def join(stream: VarModel, onIdx: ValueModel, peer: ValueModel) = { + val testVM = VarModel(stream.name + "_test", stream.`type`) + val testStreamType = stream.`type`.asInstanceOf[StreamType] // Unsafe + val iter = VarModel(stream.name + "_fold_var", ScalarType.string) + val canon = VarModel(stream.name + "_iter_canon", CanonStreamType(ScalarType.string)) + val canonRes = VarModel(stream.name + "_result_canon", CanonStreamType(ScalarType.string)) + val arrayRes = VarModel(stream.name + "_gate", ArrayType(ScalarType.string)) + val idx = VarModel(stream.name + "_incr", ScalarType.u32) + + RestrictionRes(testVM.name, testStreamType).wrap( + CallServiceRes( + LiteralModel("\"math\"", ScalarType.string), + "add", + CallRes( + onIdx :: LiteralModel.fromRaw(LiteralRaw.number(1)) :: Nil, + Some(CallModel.Export(idx.name, idx.`type`)) + ), + peer + ).leaf, + FoldRes(iter.name, stream, Some(ForModel.NeverMode)).wrap( + ApRes(iter, CallModel.Export(testVM.name, testVM.`type`)).leaf, + CanonRes(testVM, peer, CallModel.Export(canon.name, canon.`type`)).leaf, + XorRes.wrap( + MatchMismatchRes( + canon.copy(properties = Chain.one(FunctorModel("length", ScalarType.u32))), + idx, + true + ).leaf, + NextRes(iter.name).leaf + ) + ), + CanonRes(testVM, peer, CallModel.Export(canonRes.name, canonRes.`type`)).leaf, + ApRes(canonRes, CallModel.Export(arrayRes.name, arrayRes.`type`)).leaf + ) + } + +} diff --git a/model/src/main/scala/aqua/model/OpModel.scala b/model/src/main/scala/aqua/model/OpModel.scala index 7a2c8911..770ac487 100644 --- a/model/src/main/scala/aqua/model/OpModel.scala +++ b/model/src/main/scala/aqua/model/OpModel.scala @@ -7,7 +7,7 @@ import cats.Show import cats.Eval import cats.data.NonEmptyList import aqua.tree.{TreeNode, TreeNodeCompanion} -import aqua.types.ScalarType +import aqua.types.* import scala.annotation.tailrec @@ -95,7 +95,9 @@ case class NextModel(item: String) extends OpModel { } -case class RestrictionModel(name: String, isStream: Boolean) extends SeqGroupModel { +// TODO: Refactor out `name` and `type` to +// something like VarModel without properties +case class RestrictionModel(name: String, `type`: DataType) extends SeqGroupModel { override def usesVarNames: Set[String] = Set.empty override def restrictsVarNames: Set[String] = Set(name) @@ -187,14 +189,6 @@ case class CanonicalizeModel(operand: ValueModel, exportTo: CallModel.Export) override def usesVarNames: Set[String] = operand.usesVarNames } -case class JoinModel(operands: NonEmptyList[ValueModel]) extends ForceExecModel { - - override def toString: String = s"join ${operands.toList.mkString(", ")}" - - override lazy val usesVarNames: Set[String] = - operands.toList.flatMap(_.usesVarNames).toSet -} - case class CaptureTopologyModel(name: String) extends NoExecModel case class ApplyTopologyModel(name: String) extends SeqGroupModel diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala b/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala index 54cf3e51..f297c62a 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala @@ -273,8 +273,8 @@ object Topology extends Logging { val chainZipperEv = resolved.traverse(tree => ( - rc.topology.pathBefore.map(through(_, s"before ${currI}")), - rc.topology.pathAfter.map(through(_, s"after ${currI}", reversed = true)) + rc.topology.pathBefore.map(through(_)), + rc.topology.pathAfter.map(through(_, reversed = true)) ).mapN { case (pathBefore, pathAfter) => ChainZipper(pathBefore, tree, pathAfter) }.flatTap(logResolvedDebugInfo(rc, _, tree)) @@ -299,20 +299,19 @@ object Topology extends Logging { // Walks through peer IDs, doing a noop function on each def through( peerIds: Chain[ValueModel], - log: String = null, reversed: Boolean = false ): Chain[Res] = peerIds.map { v => v.`type` match { case _: BoxType => val itemName = "-via-peer-" val steps = Chain( - MakeRes.noop(VarModel(itemName, ScalarType.string, Chain.empty), log), + MakeRes.hop(VarModel(itemName, ScalarType.string, Chain.empty)), NextRes(itemName).leaf ) FoldRes(itemName, v).wrap(if (reversed) steps.reverse else steps) case _ => - MakeRes.noop(v, log) + MakeRes.hop(v) } } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/After.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/After.scala index 21e38076..abe044e3 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/After.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/After.scala @@ -40,6 +40,9 @@ trait After { Eval.now(Chain.empty) } + // If exit is forced, make a path outside this node + // – from where it ends to where execution is expected to continue, + // explicitly pinging the next node (useful inside par branches) def pathAfterAndPingNext(current: Topology): Eval[Chain[ValueModel]] = current.forceExit.flatMap { case false => Eval.now(Chain.empty) @@ -48,7 +51,12 @@ trait After { case (e, a, _) if e == a => Chain.empty case (e, a, l) if l.contains(e) => // Pingback in case no relays involved - Chain.fromOption(a.headOption.map(_.peerId)) + Chain.fromOption( + a.headOption + // Add nothing if last node is the same + .filterNot(e.headOption.contains) + .map(_.peerId) + ) case (e, a, _) => // We wasn't at e, so need to get through the last peer in case it matches with the relay Topology.findRelayPathEnforcement(a, e) ++ Chain.fromOption( diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala index 0bce2f62..e4fa534f 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala @@ -1,10 +1,13 @@ package aqua.model.transform.topology.strategy import aqua.model.transform.topology.Topology -import aqua.model.{OnModel, ParGroupModel, SeqGroupModel, ValueModel} +import aqua.model.{OnModel, ParGroupModel, SeqGroupModel, ValueModel, XorModel} import cats.Eval import cats.data.Chain +import cats.syntax.functor.* +import cats.instances.lazyList.* +import cats.syntax.option.* // Parent == Xor object XorBranch extends Before with After { @@ -13,24 +16,29 @@ object XorBranch extends Before with After { override def beforeOn(current: Topology): Eval[List[OnModel]] = current.prevSibling.map(_.endsOn) getOrElse super.beforeOn(current) - private def closestParExit(current: Topology): Option[Topology] = + // Find closest par exit up and return its branch current is in + // Returns none if there is no par up + // or current is not at its exit + private def closestParExitChild(current: Topology): Option[Topology] = current.parents - .map(t => t -> t.parent.map(_.cursor.op)) - .takeWhile { - case (t, Some(_: ParGroupModel)) => true - case (t, Some(_: SeqGroupModel)) => t.nextSibling.isEmpty + .fproduct(_.parent.map(_.cursor.op)) + .dropWhile { + case (t, Some(_: SeqGroupModel)) => + t.nextSibling.isEmpty + case (_, Some(XorModel)) => + true case _ => false } - .map(_._1) - .map(t => t -> t.cursor.op) - .collectFirst { case (t, _: ParGroupModel) => - // println(Console.GREEN + s"collect ${t}" + Console.RESET) - t - } + .headOption + .collect { case (t, Some(_: ParGroupModel)) => t } + + private def closestParExit(current: Topology): Option[Topology] = + closestParExitChild(current).flatMap(_.parent) override def forceExit(current: Topology): Eval[Boolean] = - closestParExit(current) - .fold(Eval.later(current.cursor.moveUp.exists(_.hasExecLater)))(_.forceExit) + closestParExitChild(current).fold( + Eval.later(current.cursor.moveUp.exists(_.hasExecLater)) + )(_.forceExit) // Force exit if par branch needs it override def afterOn(current: Topology): Eval[List[OnModel]] = current.forceExit.flatMap { @@ -41,5 +49,8 @@ object XorBranch extends Before with After { // Parent of this branch's parent xor – fixes the case when this xor is in par override def pathAfter(current: Topology): Eval[Chain[ValueModel]] = - closestParExit(current).fold(super.pathAfter(current))(_ => pathAfterAndPingNext(current)) + closestParExit(current).fold(super.pathAfter(current))(_ => + // Ping next if we are exiting from par + super.pathAfterAndPingNext(current) + ) } diff --git a/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala b/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala index eb39a39b..8cae03c0 100644 --- a/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala +++ b/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala @@ -21,6 +21,11 @@ import aqua.res.{CallRes, CallServiceRes, MakeRes} import aqua.types.{ArrayType, LiteralType, ScalarType} import scala.language.implicitConversions +import aqua.types.StreamType +import aqua.model.IntoIndexModel +import cats.data.Chain +import cats.data.Chain.==: +import aqua.model.inline.raw.ApplyGateRawInliner object ModelBuilder { implicit def rawToValue(raw: ValueRaw): ValueModel = ValueModel.fromRaw(raw) @@ -125,6 +130,32 @@ object ModelBuilder { ) } - def through(peer: ValueModel, log: String = null) = - MakeRes.noop(peer, log) + def through(peer: ValueModel) = + MakeRes.hop(peer) + + /** + * @param stream stream [[VarModel]] + * @param idx id [[ValueModel]] + * @return [[OpModel.Tree]] of join of `stream[idx]` + */ + def join(stream: VarModel, idx: ValueModel): OpModel.Tree = + stream match { + case VarModel( + streamName, + streamType: StreamType, + Chain.`nil` + ) => + ApplyGateRawInliner.joinStreamOnIndexModel( + streamName = streamName, + streamType = streamType, + idxModel = idx, + idxIncrName = streamName + "_incr", + testName = streamName + "_test", + iterName = streamName + "_fold_var", + canonName = streamName + "_result_canon", + iterCanonName = streamName + "_iter_canon", + resultName = streamName + "_gate" + ) + case _ => ??? + } } diff --git a/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala b/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala index fbf77ddf..bb355172 100644 --- a/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala +++ b/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala @@ -8,6 +8,7 @@ import aqua.raw.value.{IntoIndexRaw, LiteralRaw, VarRaw} import aqua.types.{LiteralType, ScalarType, StreamType} import cats.Eval import cats.data.{Chain, NonEmptyList} +import cats.data.Chain.* import cats.free.Cofree import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -16,10 +17,31 @@ import cats.syntax.option.* import aqua.types.ArrayType import aqua.raw.ConstantRaw.initPeerId import aqua.model.ForModel.NullMode +import aqua.raw.value.ValueRaw class TopologySpec extends AnyFlatSpec with Matchers { - import ModelBuilder._ + import ModelBuilder.{join as joinModel, *} + import ResBuilder.join as joinRes + + def joinModelRes(streamEl: ValueRaw | ValueModel): (OpModel.Tree, ResolvedOp.Tree) = + streamEl match { + case vm: ValueModel => vm + case vr: ValueRaw => ValueModel.fromRaw(vr) + } match { + case stream @ VarModel(name, baseType, IntoIndexModel(idx, idxType) ==: Chain.`nil`) => + val idxModel = + if (idx.forall(Character.isDigit)) LiteralModel(idx, idxType) + else VarModel(idx, idxType) + + val streamWithoutIdx = stream.copy(properties = Chain.`nil`) + + ( + joinModel(streamWithoutIdx, idxModel), + joinRes(streamWithoutIdx, idxModel, ValueModel.fromRaw(initPeer)) + ) + case _ => ??? + } "topology resolver" should "do nothing on init peer" in { @@ -435,6 +457,8 @@ class TopologySpec extends AnyFlatSpec with Matchers { val stream = ValueModel.fromRaw(streamRaw) val streamEl = ValueModel.fromRaw(streamRawEl) + val (joinModel, joinRes) = joinModelRes(streamEl) + val init = SeqModel.wrap( DeclareStreamModel(stream).leaf, @@ -451,7 +475,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - JoinModel(NonEmptyList.one(streamEl)).leaf, + joinModel, callModel(3, Nil, streamRaw :: Nil) ) ) @@ -468,27 +492,25 @@ class TopologySpec extends AnyFlatSpec with Matchers { SeqRes.wrap( through(relay), XorRes.wrap( - callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))), + SeqRes.wrap( + callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))), + through(relay), + through(initPeer) + ), SeqRes.wrap( through(relay), callRes(4, initPeer) ) - ), - through(relay), - through(initPeer) + ) ), NextRes("i").leaf ) ) ), - CallServiceRes( - LiteralModel(s"\"op\"", LiteralType.string), - s"noop", - CallRes(streamEl :: Nil, None), - initPeer - ).leaf, + joinRes, callRes(3, initPeer, None, stream :: Nil) ) + proc.equalsOrShowDiff(expected) should be(true) } @@ -502,6 +524,8 @@ class TopologySpec extends AnyFlatSpec with Matchers { val stream = ValueModel.fromRaw(streamRaw) val streamEl = ValueModel.fromRaw(streamRawEl) + val (joinModel, joinRes) = joinModelRes(streamEl) + val init = SeqModel.wrap( DeclareStreamModel(stream).leaf, @@ -520,7 +544,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - JoinModel(NonEmptyList.one(streamEl)).leaf, + joinModel, callModel(3, Nil, streamRaw :: Nil) ) ) @@ -538,26 +562,20 @@ class TopologySpec extends AnyFlatSpec with Matchers { through(relay), XorRes.wrap( XorRes.wrap( - callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))) + SeqRes.wrap( + callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))), + through(relay), + through(initPeer) + ) ), - SeqRes.wrap( - through(relay), - callRes(4, initPeer) - ) - ), - through(relay), - through(initPeer) + callRes(4, initPeer) + ) ), NextRes("i").leaf ) ) ), - CallServiceRes( - LiteralModel(s"\"op\"", LiteralType.string), - s"noop", - CallRes(streamEl :: Nil, None), - initPeer - ).leaf, + joinRes, callRes(3, initPeer, None, stream :: Nil) ) @@ -784,6 +802,8 @@ class TopologySpec extends AnyFlatSpec with Matchers { val usedWithIdx = used.withProperty(IntoIndexRaw(LiteralRaw("1", ScalarType.u32), ScalarType.string)) + val (joinModel, joinRes) = joinModelRes(usedWithIdx) + val init = OnModel(initPeer, Chain.one(relay)).wrap( foldPar( @@ -793,7 +813,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { callModel(1, CallModel.Export(used.name, used.`type`) :: Nil) ) ), - JoinModel(NonEmptyList.one(usedWithIdx)).leaf, + joinModel, callModel(3, Nil, used :: Nil) ) @@ -818,12 +838,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - CallServiceRes( - LiteralModel(s"\"op\"", LiteralType.string), - s"noop", - CallRes(usedWithIdx :: Nil, None), - initPeer - ).leaf, + joinRes, callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil) ) @@ -835,6 +850,9 @@ class TopologySpec extends AnyFlatSpec with Matchers { val used = VarRaw("used", StreamType(ScalarType.string)) val usedWithIdx = used.withProperty(IntoIndexRaw(LiteralRaw("1", ScalarType.u32), ScalarType.string)) + + val (joinModel, joinRes) = joinModelRes(usedWithIdx) + val init = OnModel(initPeer, Chain.one(relay)).wrap( foldPar( @@ -846,7 +864,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { ) ) ), - JoinModel(NonEmptyList.one(usedWithIdx)).leaf, + joinModel, callModel(3, Nil, used :: Nil) ) @@ -860,25 +878,22 @@ class TopologySpec extends AnyFlatSpec with Matchers { SeqRes.wrap( through(relay), XorRes.wrap( - callRes( - 1, - ValueModel.fromRaw(i), - Some(CallModel.Export(used.name, used.`type`)) + SeqRes.wrap( + callRes( + 1, + ValueModel.fromRaw(i), + Some(CallModel.Export(used.name, used.`type`)) + ), + through(relay), + through(initPeer) ) - ), - through(relay), - through(initPeer) + ) ), NextRes("i").leaf ) ) ), - CallServiceRes( - LiteralModel(s"\"op\"", LiteralType.string), - s"noop", - CallRes(usedWithIdx :: Nil, None), - initPeer - ).leaf, + joinRes, callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil) ) @@ -888,7 +903,8 @@ class TopologySpec extends AnyFlatSpec with Matchers { "topology resolver" should "handle empty for correctly [bug LNG-149]" in { val streamName = "array-inline" val iterName = "a-0" - val stream = VarModel(streamName, StreamType(LiteralType.number)) + val streamType = StreamType(LiteralType.number) + val stream = VarModel(streamName, streamType) val array = VarModel(s"$streamName-0", ArrayType(LiteralType.number)) val literal = (i: String) => LiteralModel(i, LiteralType.number) @@ -901,7 +917,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { val model = OnModel(initPeer, Chain.one(relay)).wrap( SeqModel.wrap( - RestrictionModel(streamName, true).wrap( + RestrictionModel(streamName, streamType).wrap( push("1"), push("2"), CanonicalizeModel(stream, CallModel.Export(array.name, array.`type`)).leaf @@ -915,7 +931,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { val proc = Topology.resolve(model).value val expected = SeqRes.wrap( - RestrictionRes(streamName, true).wrap( + RestrictionRes(streamName, streamType).wrap( ApRes(literal("1"), CallModel.Export(stream.name, stream.`type`)).leaf, ApRes(literal("2"), CallModel.Export(stream.name, stream.`type`)).leaf, CanonRes( diff --git a/model/tree/src/main/scala/aqua/tree/TreeNode.scala b/model/tree/src/main/scala/aqua/tree/TreeNode.scala index 13fcd1b5..9656afab 100644 --- a/model/tree/src/main/scala/aqua/tree/TreeNode.scala +++ b/model/tree/src/main/scala/aqua/tree/TreeNode.scala @@ -17,10 +17,13 @@ trait TreeNode[T <: TreeNode[T]] { def wrap(children: Chain[Tree]): Tree = Cofree(self, Eval.now(children)) - protected def wrapNonEmpty(children: Chain[Tree], empty: Tree): Tree = children match { - case Chain.nil => empty - case x ==: Chain.nil => x - case _ => Cofree(self, Eval.now(children)) - } + protected def wrapNonEmpty(children: Chain[Tree], empty: Tree): Tree = + children match { + case Chain.nil => empty + case x ==: Chain.nil => x + // Do not use `wrap` here as children + // could redefine `wrap` through this method + case _ => Cofree(self, Eval.now(children)) + } } diff --git a/semantics/src/main/scala/aqua/semantics/expr/func/ArrowSem.scala b/semantics/src/main/scala/aqua/semantics/expr/func/ArrowSem.scala index 91976379..6a39ad2b 100644 --- a/semantics/src/main/scala/aqua/semantics/expr/func/ArrowSem.scala +++ b/semantics/src/main/scala/aqua/semantics/expr/func/ArrowSem.scala @@ -159,7 +159,8 @@ class ArrowSem[S[_]](val expr: ArrowExpr[S]) extends AnyVal { // wrap streams with restrictions val bodyWithRestrictions = localStreams.foldLeft(bodyModified) { - case (bm, (streamName, _)) => RestrictionTag(streamName, isStream = true).wrap(bm) + case (bm, (streamName, streamType)) => + RestrictionTag(streamName, streamType).wrap(bm) } ArrowRaw(funcArrow, returnValuesModified.toList, bodyWithRestrictions) diff --git a/semantics/src/main/scala/aqua/semantics/expr/func/ForSem.scala b/semantics/src/main/scala/aqua/semantics/expr/func/ForSem.scala index ef770721..34aa24e6 100644 --- a/semantics/src/main/scala/aqua/semantics/expr/func/ForSem.scala +++ b/semantics/src/main/scala/aqua/semantics/expr/func/ForSem.scala @@ -10,7 +10,7 @@ import aqua.semantics.rules.ValuesAlgebra import aqua.semantics.rules.abilities.AbilitiesAlgebra import aqua.semantics.rules.names.NamesAlgebra import aqua.semantics.rules.types.TypesAlgebra -import aqua.types.{ArrayType, BoxType} +import aqua.types.{ArrayType, BoxType, StreamType} import cats.Monad import cats.data.Chain @@ -42,11 +42,10 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal { }, (stOpt: Option[ValueRaw], ops: Raw) => N.streamsDefinedWithinScope() - .map(_.keySet) .map(streams => (stOpt, ops) match { case (Some(vm), FuncOp(op)) => - val innerTag = expr.mode.fold[RawTag](SeqTag) { + val innerTag = expr.mode.fold(SeqTag) { case ForExpr.Mode.ParMode => ParTag case ForExpr.Mode.TryMode => TryTag } @@ -58,8 +57,8 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal { innerTag .wrap( // Restrict the streams created within this scope - streams.toList.foldLeft(op) { case (b, streamName) => - RestrictionTag(streamName, isStream = true).wrap(b) + streams.toList.foldLeft(op) { case (tree, (streamName, streamType)) => + RestrictionTag(streamName, streamType).wrap(tree) }, NextTag(expr.item.value).leaf ) diff --git a/types/src/main/scala/aqua/types/Type.scala b/types/src/main/scala/aqua/types/Type.scala index 49b056e7..f1d5ad7b 100644 --- a/types/src/main/scala/aqua/types/Type.scala +++ b/types/src/main/scala/aqua/types/Type.scala @@ -26,6 +26,16 @@ sealed trait Type { def uniteBottom(other: Type): Type = UniteTypes.bottom.combine(this, other) def properties: Map[String, Type] = Map.empty + + /** + * Use for printing purposes only + * Ideally should be in sync with [[AirGen.varNameToString]] + */ + def airPrefix: String = this match { + case _: StreamType => "$" + case _: CanonStreamType => "#" + case _ => "" + } } // Product is a list of (optionally labelled) types