mirror of
https://github.com/fluencelabs/aqua.git
synced 2024-12-04 14:40:17 +00:00
feat(compiler): Make topology hop with non-FFI snippet [fixes LNG-125] (#764)
* Remove MakeRes.canon * Replace noop with hop * Rewrite join * Remove JoinModel, fix tests * Share code between tests * Pass type in RestrictionTag(Model) * Fix MakeRes.hop * Fix wrapping * Rename vars, add comments * Fix XorBranch topology * Fix tests
This commit is contained in:
parent
22f380a491
commit
c1fe24b04d
@ -46,8 +46,6 @@ object DataView {
|
||||
|
||||
case class Variable(name: String) extends DataView
|
||||
|
||||
case class Stream(name: String) extends DataView
|
||||
|
||||
case class VarLens(name: String, lens: String, isField: Boolean = true) extends DataView {
|
||||
def append(sublens: String): VarLens = copy(lens = lens + sublens)
|
||||
}
|
||||
@ -57,7 +55,6 @@ object DataView {
|
||||
case InitPeerId ⇒ "%init_peer_id%"
|
||||
case LastError ⇒ "%last_error%"
|
||||
case Variable(name) ⇒ name
|
||||
case Stream(name) ⇒ name
|
||||
case VarLens(name, lens, isField) ⇒
|
||||
if (isField) name + ".$" + lens
|
||||
else name + lens
|
||||
@ -90,7 +87,12 @@ object Air {
|
||||
|
||||
case class Next(label: String) extends Air(Keyword.Next)
|
||||
|
||||
case class Fold(iterable: DataView, label: String, instruction: Air, lastNextInstruction: Option[Air]) extends Air(Keyword.Fold)
|
||||
case class Fold(
|
||||
iterable: DataView,
|
||||
label: String,
|
||||
instruction: Air,
|
||||
lastNextInstruction: Option[Air]
|
||||
) extends Air(Keyword.Fold)
|
||||
|
||||
case class Match(left: DataView, right: DataView, instruction: Air) extends Air(Keyword.Match)
|
||||
|
||||
|
@ -3,7 +3,7 @@ package aqua.backend.air
|
||||
import aqua.model.*
|
||||
import aqua.raw.ops.Call
|
||||
import aqua.res.*
|
||||
import aqua.types.{ArrayType, CanonStreamType, StreamType}
|
||||
import aqua.types.{ArrayType, CanonStreamType, StreamType, Type}
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
@ -26,14 +26,17 @@ object AirGen extends Logging {
|
||||
s".[$idx]${propertyToString(tail)}"
|
||||
}
|
||||
|
||||
def varNameToString(name: String, `type`: Type): String =
|
||||
(`type` match {
|
||||
case _: StreamType => "$" + name
|
||||
case _: CanonStreamType => "#" + name
|
||||
case _ => name
|
||||
}).replace('.', '_')
|
||||
|
||||
def valueToData(vm: ValueModel): DataView = vm match {
|
||||
case LiteralModel(value, _) => DataView.StringScalar(value)
|
||||
case VarModel(name, t, property) =>
|
||||
val n = (t match {
|
||||
case _: StreamType => "$" + name
|
||||
case _: CanonStreamType => "#" + name
|
||||
case _ => name
|
||||
}).replace('.', '_')
|
||||
val n = varNameToString(name, t)
|
||||
if (property.isEmpty) DataView.Variable(n)
|
||||
else {
|
||||
val functors = property.find {
|
||||
@ -97,8 +100,8 @@ object AirGen extends Logging {
|
||||
case ForModel.NeverMode => NeverGen
|
||||
}
|
||||
Eval later ForGen(valueToData(iterable), item, opsToSingle(ops), m)
|
||||
case RestrictionRes(item, isStream) =>
|
||||
Eval later NewGen(item, isStream, opsToSingle(ops))
|
||||
case RestrictionRes(item, itemType) =>
|
||||
Eval later NewGen(varNameToString(item, itemType), opsToSingle(ops))
|
||||
case CallServiceRes(serviceId, funcName, CallRes(args, exportTo), peerId) =>
|
||||
Eval.later(
|
||||
ServiceCallGen(
|
||||
@ -179,14 +182,17 @@ case class MatchMismatchGen(
|
||||
else Air.Mismatch(left, right, body.generate)
|
||||
}
|
||||
|
||||
case class ForGen(iterable: DataView, item: String, body: AirGen, mode: Option[AirGen]) extends AirGen {
|
||||
case class ForGen(iterable: DataView, item: String, body: AirGen, mode: Option[AirGen])
|
||||
extends AirGen {
|
||||
override def generate: Air = Air.Fold(iterable, item, body.generate, mode.map(_.generate))
|
||||
}
|
||||
|
||||
case class NewGen(item: String, isStream: Boolean, body: AirGen) extends AirGen {
|
||||
case class NewGen(name: String, body: AirGen) extends AirGen {
|
||||
|
||||
override def generate: Air =
|
||||
Air.New(if (isStream) DataView.Stream("$" + item) else DataView.Variable(item), body.generate)
|
||||
override def generate: Air = Air.New(
|
||||
DataView.Variable(name),
|
||||
body.generate
|
||||
)
|
||||
}
|
||||
|
||||
case class NextGen(item: String) extends AirGen {
|
||||
|
@ -233,7 +233,7 @@ lazy val transform = crossProject(JVMPlatform, JSPlatform)
|
||||
.crossType(CrossType.Pure)
|
||||
.in(file("model/transform"))
|
||||
.settings(commons: _*)
|
||||
.dependsOn(model, res, inline)
|
||||
.dependsOn(model, res, inline, res % "test->test")
|
||||
|
||||
lazy val semantics = crossProject(JVMPlatform, JSPlatform)
|
||||
.withoutSuffixFor(JVMPlatform)
|
||||
@ -252,7 +252,7 @@ lazy val compiler = crossProject(JVMPlatform, JSPlatform)
|
||||
.crossType(CrossType.Pure)
|
||||
.in(file("compiler"))
|
||||
.settings(commons: _*)
|
||||
.dependsOn(semantics, linker, backend, transform % Test)
|
||||
.dependsOn(semantics, linker, backend, transform % Test, res % "test->test")
|
||||
|
||||
lazy val backend = crossProject(JVMPlatform, JSPlatform)
|
||||
.withoutSuffixFor(JVMPlatform)
|
||||
|
@ -1,6 +1,14 @@
|
||||
package aqua.compiler
|
||||
|
||||
import aqua.model.{CallModel, ForModel, FunctorModel, IntoIndexModel, LiteralModel, ValueModel, VarModel}
|
||||
import aqua.model.{
|
||||
CallModel,
|
||||
ForModel,
|
||||
FunctorModel,
|
||||
IntoIndexModel,
|
||||
LiteralModel,
|
||||
ValueModel,
|
||||
VarModel
|
||||
}
|
||||
import aqua.model.transform.TransformConfig
|
||||
import aqua.model.transform.Transform
|
||||
import aqua.parser.ParserError
|
||||
@ -10,7 +18,21 @@ import aqua.parser.lift.Span
|
||||
import aqua.parser.lift.Span.S
|
||||
import aqua.raw.ConstantRaw
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw}
|
||||
import aqua.res.{ApRes, CallRes, CallServiceRes, CanonRes, FoldRes, MakeRes, MatchMismatchRes, NextRes, ParRes, RestrictionRes, SeqRes, XorRes}
|
||||
import aqua.res.{
|
||||
ApRes,
|
||||
CallRes,
|
||||
CallServiceRes,
|
||||
CanonRes,
|
||||
FoldRes,
|
||||
MakeRes,
|
||||
MatchMismatchRes,
|
||||
NextRes,
|
||||
ParRes,
|
||||
RestrictionRes,
|
||||
SeqRes,
|
||||
XorRes
|
||||
}
|
||||
import aqua.res.ResBuilder
|
||||
import aqua.types.{ArrayType, CanonStreamType, LiteralType, ScalarType, StreamType, Type}
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
@ -83,8 +105,8 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
}
|
||||
|
||||
def through(peer: ValueModel, log: String = null) =
|
||||
MakeRes.noop(peer, log)
|
||||
def through(peer: ValueModel) =
|
||||
MakeRes.hop(peer)
|
||||
|
||||
val relay = VarRaw("-relay-", ScalarType.string)
|
||||
|
||||
@ -97,42 +119,10 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
).leaf
|
||||
}
|
||||
|
||||
val init = LiteralModel.fromRaw(ValueRaw.InitPeerId)
|
||||
private val init = LiteralModel.fromRaw(ValueRaw.InitPeerId)
|
||||
|
||||
private def join(vm: VarModel, length: ValueModel) = {
|
||||
val testVM = VarModel(vm.name + "_test", vm.`type`)
|
||||
val iter = VarModel(vm.name + "_fold_var", ScalarType.string)
|
||||
val canon = VarModel(vm.name + "_iter_canon", CanonStreamType(ScalarType.string))
|
||||
val canonRes = VarModel(vm.name + "_result_canon", CanonStreamType(ScalarType.string))
|
||||
val arrayRes = VarModel(vm.name + "_gate", ArrayType(ScalarType.string))
|
||||
val idx = VarModel(vm.name + "_incr", ScalarType.u32)
|
||||
|
||||
RestrictionRes(testVM.name, true).wrap(
|
||||
CallServiceRes(
|
||||
LiteralModel("\"math\"", ScalarType.string),
|
||||
"add",
|
||||
CallRes(
|
||||
length :: LiteralModel.fromRaw(LiteralRaw.number(1)) :: Nil,
|
||||
Some(CallModel.Export(idx.name, idx.`type`))
|
||||
),
|
||||
init
|
||||
).leaf,
|
||||
FoldRes(iter.name, vm, Some(ForModel.NeverMode)).wrap(
|
||||
ApRes(iter, CallModel.Export(testVM.name, testVM.`type`)).leaf,
|
||||
CanonRes(testVM, init, CallModel.Export(canon.name, canon.`type`)).leaf,
|
||||
XorRes.wrap(
|
||||
MatchMismatchRes(
|
||||
canon.copy(properties = Chain.one(FunctorModel("length", ScalarType.u32))),
|
||||
idx,
|
||||
true
|
||||
).leaf,
|
||||
NextRes(iter.name).leaf
|
||||
)
|
||||
),
|
||||
CanonRes(testVM, init, CallModel.Export(canonRes.name, canonRes.`type`)).leaf,
|
||||
ApRes(canonRes, CallModel.Export(arrayRes.name, arrayRes.`type`)).leaf
|
||||
)
|
||||
}
|
||||
private def join(vm: VarModel, idx: ValueModel) =
|
||||
ResBuilder.join(vm, idx, init)
|
||||
|
||||
"aqua compiler" should "create right topology" in {
|
||||
|
||||
@ -177,7 +167,7 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
SeqRes.wrap(
|
||||
getDataSrv("-relay-", ScalarType.string),
|
||||
getDataSrv(peers.name, peers.`type`),
|
||||
RestrictionRes("results", true).wrap(
|
||||
RestrictionRes(results.name, resultsType).wrap(
|
||||
SeqRes.wrap(
|
||||
ParRes.wrap(
|
||||
FoldRes(peer.name, peers, Some(ForModel.NeverMode)).wrap(
|
||||
@ -203,10 +193,10 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
),
|
||||
join(results, LiteralModel.fromRaw(LiteralRaw.number(2))),
|
||||
CanonRes(results, init, CallModel.Export(canonResult.name, canonResult.`type`)).leaf,
|
||||
ApRes(
|
||||
canonResult,
|
||||
CallModel.Export(flatResult.name, flatResult.`type`)
|
||||
).leaf
|
||||
ApRes(
|
||||
canonResult,
|
||||
CallModel.Export(flatResult.name, flatResult.`type`)
|
||||
).leaf
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
@ -283,23 +273,24 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
val Some(funcWrap) = aquaRes.funcs.find(_.funcName == "wrap")
|
||||
val Some(barfoo) = aquaRes.funcs.find(_.funcName == "barfoo")
|
||||
|
||||
val resVM = VarModel("res", StreamType(ScalarType.string))
|
||||
val resStreamType = StreamType(ScalarType.string)
|
||||
val resVM = VarModel("res", resStreamType)
|
||||
val resCanonVM = VarModel("-res-fix-0", CanonStreamType(ScalarType.string))
|
||||
val resFlatVM = VarModel("-res-flat-0", ArrayType(ScalarType.string))
|
||||
|
||||
barfoo.body.equalsOrShowDiff(
|
||||
SeqRes.wrap(
|
||||
RestrictionRes("res", true).wrap(
|
||||
RestrictionRes(resVM.name, resStreamType).wrap(
|
||||
SeqRes.wrap(
|
||||
// res <- foo()
|
||||
ApRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("I am MyFooBar foo")),
|
||||
CallModel.Export("res", StreamType(ScalarType.string))
|
||||
CallModel.Export(resVM.name, resVM.`type`)
|
||||
).leaf,
|
||||
// res <- bar()
|
||||
ApRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote(" I am MyFooBar bar")),
|
||||
CallModel.Export("res", StreamType(ScalarType.string))
|
||||
CallModel.Export(resVM.name, resVM.`type`)
|
||||
).leaf,
|
||||
// canonicalization
|
||||
CanonRes(
|
||||
|
@ -15,7 +15,5 @@ func getTwoResults(node: string) -> []u64:
|
||||
on n:
|
||||
try:
|
||||
res <- Peer.timestamp_sec()
|
||||
Op2.identity(res!)
|
||||
Op2.identity(res!1)
|
||||
Op2.identity(res!2)
|
||||
join res!2
|
||||
<- res
|
@ -351,8 +351,8 @@ object TagInliner extends Logging {
|
||||
}
|
||||
} yield model.fold(TagInlined.Empty())(m => TagInlined.Single(model = m))
|
||||
|
||||
case RestrictionTag(name, isStream) =>
|
||||
pure(RestrictionModel(name, isStream))
|
||||
case RestrictionTag(name, typ) =>
|
||||
pure(RestrictionModel(name, typ))
|
||||
|
||||
case DeclareStreamTag(value) =>
|
||||
value match
|
||||
|
@ -7,12 +7,92 @@ import aqua.raw.value.{ApplyGateRaw, LiteralRaw, VarRaw}
|
||||
import cats.data.State
|
||||
import cats.data.Chain
|
||||
import aqua.model.inline.RawValueInliner.unfold
|
||||
import aqua.types.{CanonStreamType, ScalarType, StreamType, ArrayType}
|
||||
import aqua.types.{ArrayType, CanonStreamType, ScalarType, StreamType}
|
||||
import cats.syntax.monoid.*
|
||||
import scribe.Logging
|
||||
|
||||
object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging {
|
||||
|
||||
/**
|
||||
* To wait for the element of a stream by the given index, the following model is generated:
|
||||
* (seq
|
||||
* (seq
|
||||
* (seq
|
||||
* (call <peer> ("math" "add") [0 1] stream_incr)
|
||||
* (fold $stream s
|
||||
* (seq
|
||||
* (seq
|
||||
* (ap s $stream_test)
|
||||
* (canon <peer> $stream_test #stream_iter_canon)
|
||||
* )
|
||||
* (xor
|
||||
* (match #stream_iter_canon.length stream_incr
|
||||
* (null)
|
||||
* )
|
||||
* (next s)
|
||||
* )
|
||||
* )
|
||||
* (never)
|
||||
* )
|
||||
* )
|
||||
* (canon <peer> $stream_test #stream_result_canon)
|
||||
* )
|
||||
* (ap #stream_result_canon stream_gate)
|
||||
* )
|
||||
*/
|
||||
def joinStreamOnIndexModel(
|
||||
streamName: String,
|
||||
streamType: StreamType,
|
||||
idxModel: ValueModel,
|
||||
idxIncrName: String,
|
||||
testName: String,
|
||||
iterName: String,
|
||||
canonName: String,
|
||||
iterCanonName: String,
|
||||
resultName: String
|
||||
): OpModel.Tree = {
|
||||
val varSTest = VarModel(testName, streamType)
|
||||
val iter = VarModel(iterName, streamType.element)
|
||||
|
||||
val iterCanon = VarModel(iterCanonName, CanonStreamType(streamType.element))
|
||||
|
||||
val resultCanon =
|
||||
VarModel(canonName, CanonStreamType(streamType.element))
|
||||
|
||||
val incrVar = VarModel(idxIncrName, ScalarType.u32)
|
||||
|
||||
RestrictionModel(varSTest.name, streamType).wrap(
|
||||
increment(idxModel, incrVar),
|
||||
ForModel(iter.name, VarModel(streamName, streamType), Some(ForModel.NeverMode)).wrap(
|
||||
PushToStreamModel(
|
||||
iter,
|
||||
CallModel.Export(varSTest.name, varSTest.`type`)
|
||||
).leaf,
|
||||
CanonicalizeModel(
|
||||
varSTest,
|
||||
CallModel.Export(iterCanon.name, iterCanon.`type`)
|
||||
).leaf,
|
||||
XorModel.wrap(
|
||||
MatchMismatchModel(
|
||||
iterCanon
|
||||
.copy(properties = Chain.one(FunctorModel("length", ScalarType.`u32`))),
|
||||
incrVar,
|
||||
true
|
||||
).leaf,
|
||||
NextModel(iter.name).leaf
|
||||
)
|
||||
),
|
||||
CanonicalizeModel(
|
||||
varSTest,
|
||||
CallModel.Export(resultCanon.name, CanonStreamType(streamType.element))
|
||||
).leaf,
|
||||
FlattenModel(
|
||||
resultCanon,
|
||||
resultName
|
||||
).leaf
|
||||
)
|
||||
}
|
||||
|
||||
override def apply[S: Mangler: Exports: Arrows](
|
||||
afr: ApplyGateRaw,
|
||||
propertyAllowed: Boolean
|
||||
@ -27,73 +107,19 @@ object ApplyGateRawInliner extends RawInliner[ApplyGateRaw] with Logging {
|
||||
idxFolded <- unfold(afr.idx)
|
||||
(idxModel, idxInline) = idxFolded
|
||||
} yield {
|
||||
val varSTest = VarModel(uniqueTestName, afr.streamType)
|
||||
val iter = VarModel(uniqueIter, afr.streamType.element)
|
||||
|
||||
val iterCanon = VarModel(uniqueIterCanon, CanonStreamType(afr.streamType.element))
|
||||
|
||||
val resultCanon =
|
||||
VarModel(uniqueCanonName, CanonStreamType(afr.streamType.element))
|
||||
|
||||
val incrVar = VarModel(uniqueIdxIncr, ScalarType.u32)
|
||||
|
||||
// To wait for the element of a stream by the given index, the following model is generated:
|
||||
// (seq
|
||||
// (seq
|
||||
// (seq
|
||||
// (call %init_peer_id% ("math" "add") [0 1] stream_incr)
|
||||
// (fold $stream s
|
||||
// (seq
|
||||
// (seq
|
||||
// (ap s $stream_test)
|
||||
// (canon %init_peer_id% $stream_test #stream_iter_canon)
|
||||
// )
|
||||
// (xor
|
||||
// (match #stream_iter_canon.length stream_incr
|
||||
// (null)
|
||||
// )
|
||||
// (next s)
|
||||
// )
|
||||
// )
|
||||
// (never)
|
||||
// )
|
||||
// )
|
||||
// (canon %init_peer_id% $stream_test #stream_result_canon)
|
||||
// )
|
||||
// (ap #stream_result_canon stream_gate)
|
||||
// )
|
||||
val gate = RestrictionModel(varSTest.name, true).wrap(
|
||||
increment(idxModel, incrVar),
|
||||
ForModel(iter.name, VarModel(afr.name, afr.streamType), Some(ForModel.NeverMode)).wrap(
|
||||
PushToStreamModel(
|
||||
iter,
|
||||
CallModel.Export(varSTest.name, varSTest.`type`)
|
||||
).leaf,
|
||||
CanonicalizeModel(
|
||||
varSTest,
|
||||
CallModel.Export(iterCanon.name, iterCanon.`type`)
|
||||
).leaf,
|
||||
XorModel.wrap(
|
||||
MatchMismatchModel(
|
||||
iterCanon
|
||||
.copy(properties = Chain.one(FunctorModel("length", ScalarType.`u32`))),
|
||||
incrVar,
|
||||
true
|
||||
).leaf,
|
||||
NextModel(iter.name).leaf
|
||||
)
|
||||
),
|
||||
CanonicalizeModel(
|
||||
varSTest,
|
||||
CallModel.Export(resultCanon.name, CanonStreamType(afr.streamType.element))
|
||||
).leaf,
|
||||
FlattenModel(
|
||||
resultCanon,
|
||||
uniqueResultName
|
||||
).leaf
|
||||
val gate = joinStreamOnIndexModel(
|
||||
streamName = afr.name,
|
||||
streamType = afr.streamType,
|
||||
idxModel = idxModel,
|
||||
idxIncrName = uniqueIdxIncr,
|
||||
testName = uniqueTestName,
|
||||
iterName = uniqueIter,
|
||||
canonName = uniqueCanonName,
|
||||
iterCanonName = uniqueIterCanon,
|
||||
resultName = uniqueResultName
|
||||
)
|
||||
|
||||
val tree = SeqModel.wrap((idxInline.predo.toList :+ gate):_*)
|
||||
val tree = SeqModel.wrap(idxInline.predo.toList :+ gate)
|
||||
|
||||
val treeInline =
|
||||
Inline(idxInline.flattenValues, predo = Chain.one(tree))
|
||||
|
@ -3,7 +3,6 @@ package aqua.model.inline.raw
|
||||
import aqua.model.{
|
||||
CallModel,
|
||||
CallServiceModel,
|
||||
CanonicalizeModel,
|
||||
FlattenModel,
|
||||
ForModel,
|
||||
FunctorModel,
|
||||
@ -15,7 +14,6 @@ import aqua.model.{
|
||||
OpModel,
|
||||
PropertyModel,
|
||||
PushToStreamModel,
|
||||
RestrictionModel,
|
||||
SeqModel,
|
||||
ValueModel,
|
||||
VarModel,
|
||||
@ -153,7 +151,7 @@ object ApplyPropertiesRawInliner extends RawInliner[ApplyPropertyRaw] with Loggi
|
||||
properties.map {
|
||||
case iir @ IntoIndexRaw(vr, t) =>
|
||||
unfold(vr, propertiesAllowed = false).flatMap {
|
||||
case (vm@VarModel(_, _, _), inline) if vm.properties.nonEmpty =>
|
||||
case (vm @ VarModel(_, _, _), inline) if vm.properties.nonEmpty =>
|
||||
removeProperties(vm).map { case (vf, inlf) =>
|
||||
PropertyRawWithModel(iir, Option(IntoIndexModel(vf.name, t))) -> Inline(
|
||||
inline.flattenValues ++ inlf.flattenValues,
|
||||
|
@ -24,21 +24,24 @@ object CollectionRawInliner extends RawInliner[CollectionRaw] {
|
||||
raw: CollectionRaw,
|
||||
propertiesAllowed: Boolean
|
||||
): State[S, (ValueModel, Inline)] = unfoldCollection(raw)
|
||||
|
||||
|
||||
def unfoldCollection[S: Mangler: Exports: Arrows](
|
||||
raw: CollectionRaw,
|
||||
assignToName: Option[String] = None
|
||||
): State[S, (ValueModel, Inline)] =
|
||||
for {
|
||||
streamName <-
|
||||
raw.boxType match {
|
||||
case _: StreamType => assignToName.map(s => State.pure(s)).getOrElse(Mangler[S].findAndForbidName("stream-inline"))
|
||||
case _: CanonStreamType => Mangler[S].findAndForbidName("canon_stream-inline")
|
||||
case _: ArrayType => Mangler[S].findAndForbidName("array-inline")
|
||||
case _: OptionType => Mangler[S].findAndForbidName("option-inline")
|
||||
}
|
||||
streamName <- raw.boxType match {
|
||||
case _: StreamType =>
|
||||
assignToName
|
||||
.map(s => State.pure(s))
|
||||
.getOrElse(Mangler[S].findAndForbidName("stream-inline"))
|
||||
case _: CanonStreamType => Mangler[S].findAndForbidName("canon_stream-inline")
|
||||
case _: ArrayType => Mangler[S].findAndForbidName("array-inline")
|
||||
case _: OptionType => Mangler[S].findAndForbidName("option-inline")
|
||||
}
|
||||
|
||||
stream = VarModel(streamName, StreamType(raw.elementType))
|
||||
streamType = StreamType(raw.elementType)
|
||||
stream = VarModel(streamName, streamType)
|
||||
streamExp = CallModel.Export(stream.name, stream.`type`)
|
||||
|
||||
valsWithInlines <- raw.values
|
||||
@ -48,13 +51,13 @@ object CollectionRawInliner extends RawInliner[CollectionRaw] {
|
||||
|
||||
// push values to the stream, that is gathering the collection
|
||||
vals = valsWithInlines.map { case (v, _) =>
|
||||
PushToStreamModel(v, streamExp).leaf
|
||||
}
|
||||
PushToStreamModel(v, streamExp).leaf
|
||||
}
|
||||
|
||||
// all inlines will be added before pushing values to the stream
|
||||
inlines = valsWithInlines.flatMap { case (_, t) =>
|
||||
Chain.fromOption(t)
|
||||
}
|
||||
Chain.fromOption(t)
|
||||
}
|
||||
|
||||
canonName <-
|
||||
if (raw.boxType.isStream) State.pure(streamName)
|
||||
@ -67,19 +70,19 @@ object CollectionRawInliner extends RawInliner[CollectionRaw] {
|
||||
} yield VarModel(canonName, canon.`type`) -> Inline.tree(
|
||||
raw.boxType match {
|
||||
case ArrayType(_) =>
|
||||
RestrictionModel(streamName, isStream = true).wrap(
|
||||
SeqModel.wrap((inlines ++ vals :+ CanonicalizeModel(stream, canon).leaf).toList: _*)
|
||||
RestrictionModel(streamName, streamType).wrap(
|
||||
SeqModel.wrap(inlines ++ vals :+ CanonicalizeModel(stream, canon).leaf)
|
||||
)
|
||||
case OptionType(_) =>
|
||||
RestrictionModel(streamName, isStream = true).wrap(
|
||||
RestrictionModel(streamName, streamType).wrap(
|
||||
SeqModel.wrap(
|
||||
SeqModel.wrap(inlines.toList:_*),
|
||||
XorModel.wrap((vals :+ NullModel.leaf).toList: _*),
|
||||
CanonicalizeModel(stream, canon).leaf
|
||||
SeqModel.wrap(inlines),
|
||||
XorModel.wrap(vals :+ NullModel.leaf),
|
||||
CanonicalizeModel(stream, canon).leaf
|
||||
)
|
||||
)
|
||||
case _ =>
|
||||
SeqModel.wrap((inlines ++ vals).toList: _*)
|
||||
case _ =>
|
||||
SeqModel.wrap(inlines ++ vals)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers {
|
||||
.callArrow[InliningState](
|
||||
FuncArrow(
|
||||
"stream-callback",
|
||||
RestrictionTag(streamVar.name, true).wrap(
|
||||
RestrictionTag(streamVar.name, streamType).wrap(
|
||||
SeqTag.wrap(
|
||||
DeclareStreamTag(streamVar).leaf,
|
||||
CallArrowRawTag.func("cb", Call(streamVar :: Nil, Nil)).leaf
|
||||
@ -115,7 +115,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers {
|
||||
._2
|
||||
|
||||
model.equalsOrShowDiff(
|
||||
RestrictionModel(streamVar.name, true).wrap(
|
||||
RestrictionModel(streamVar.name, streamType).wrap(
|
||||
MetaModel
|
||||
.CallArrowModel("cb")
|
||||
.wrap(
|
||||
@ -191,7 +191,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers {
|
||||
.callArrow[InliningState](
|
||||
FuncArrow(
|
||||
"stream-callback",
|
||||
RestrictionTag(streamVar.name, true).wrap(
|
||||
RestrictionTag(streamVar.name, streamType).wrap(
|
||||
SeqTag.wrap(
|
||||
DeclareStreamTag(streamVar).leaf,
|
||||
CallArrowRawTag.func("cb", Call(streamVarLambda :: Nil, Nil)).leaf
|
||||
@ -218,7 +218,7 @@ class ArrowInlinerSpec extends AnyFlatSpec with Matchers {
|
||||
._2
|
||||
|
||||
model.equalsOrShowDiff(
|
||||
RestrictionModel(streamVar.name, true).wrap(
|
||||
RestrictionModel(streamVar.name, streamType).wrap(
|
||||
CallServiceModel(
|
||||
LiteralModel.quote("test-service"),
|
||||
"some-call",
|
||||
|
@ -7,6 +7,7 @@ import aqua.raw.ops.*
|
||||
import aqua.raw.value.{CollectionRaw, LiteralRaw, MakeStructRaw, VarRaw}
|
||||
import aqua.types.{CanonStreamType, OptionType, ScalarType, StreamType, StructType}
|
||||
import cats.data.{NonEmptyList, NonEmptyMap}
|
||||
import cats.syntax.show.*
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
||||
@ -25,15 +26,14 @@ class CollectionRawInlinerSpec extends AnyFlatSpec with Matchers {
|
||||
val raw = CollectionRaw(NonEmptyList.of(makeStruct), OptionType(nestedType))
|
||||
|
||||
val (v, tree) =
|
||||
RawValueInliner.valueToModel[InliningState](raw, false).run(InliningState()).value._2
|
||||
RawValueInliner.valueToModel[InliningState](raw, false).runA(InliningState()).value
|
||||
|
||||
val resultValue = VarModel("option-inline-0", CanonStreamType(nestedType))
|
||||
|
||||
v shouldBe resultValue
|
||||
|
||||
tree.get.equalsOrShowDiff(
|
||||
// create a stream
|
||||
RestrictionModel("option-inline", true).wrap(
|
||||
val expected =
|
||||
RestrictionModel("option-inline", StreamType(nestedType)).wrap( // create a stream
|
||||
SeqModel.wrap(
|
||||
// create an object
|
||||
CallServiceModel(
|
||||
@ -61,8 +61,8 @@ class CollectionRawInlinerSpec extends AnyFlatSpec with Matchers {
|
||||
).leaf
|
||||
)
|
||||
)
|
||||
) shouldBe true
|
||||
|
||||
tree.get.equalsOrShowDiff(expected) shouldBe true
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ import aqua.raw.arrow.FuncRaw
|
||||
import aqua.raw.ops.RawTag.Tree
|
||||
import aqua.raw.value.{CallArrowRaw, ValueRaw, VarRaw}
|
||||
import aqua.tree.{TreeNode, TreeNodeCompanion}
|
||||
import aqua.types.{ArrowType, ProductType}
|
||||
import aqua.types.{ArrowType, DataType, ProductType}
|
||||
import cats.{Eval, Show}
|
||||
import cats.data.{Chain, NonEmptyList}
|
||||
import cats.free.Cofree
|
||||
@ -121,7 +121,7 @@ case class NextTag(item: String) extends RawTag {
|
||||
copy(item = map.getOrElse(item, item))
|
||||
}
|
||||
|
||||
case class RestrictionTag(name: String, isStream: Boolean) extends SeqGroupTag {
|
||||
case class RestrictionTag(name: String, `type`: DataType) extends SeqGroupTag {
|
||||
|
||||
override def restrictsVarNames: Set[String] = Set(name)
|
||||
|
||||
|
@ -6,46 +6,37 @@ import cats.data.{Chain, NonEmptyList}
|
||||
import cats.free.Cofree
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw}
|
||||
import aqua.model.*
|
||||
import aqua.types.*
|
||||
|
||||
// TODO docs
|
||||
/**
|
||||
* Helpers for translating [[OpModel]] to [[ResolvedOp]]
|
||||
*/
|
||||
object MakeRes {
|
||||
val op: ValueModel = LiteralModel.fromRaw(LiteralRaw.quote("op"))
|
||||
|
||||
def noop(onPeer: ValueModel, log: String = null): ResolvedOp.Tree =
|
||||
CallServiceRes(
|
||||
op,
|
||||
"noop",
|
||||
CallRes(
|
||||
Option(log).filter(_ == "").map(LiteralRaw.quote).map(LiteralModel.fromRaw).toList,
|
||||
None
|
||||
),
|
||||
onPeer
|
||||
).leaf
|
||||
/**
|
||||
* Make topology hop to peer
|
||||
*
|
||||
* @param onPeer peer to make hop to
|
||||
* @return [[ResolvedOp.Tree]] corresponsing to a hop
|
||||
*/
|
||||
def hop(onPeer: ValueModel): ResolvedOp.Tree = {
|
||||
// Those names can't be produced from compilation
|
||||
// so they are safe to use
|
||||
val streamName = "-ephemeral-stream-"
|
||||
val canonName = "-ephemeral-canon-"
|
||||
val elementType = BottomType
|
||||
val streamType = StreamType(elementType)
|
||||
val canonType = CanonStreamType(elementType)
|
||||
|
||||
def canon(onPeer: ValueModel, operand: ValueModel, target: CallModel.Export): ResolvedOp.Tree =
|
||||
CallServiceRes(
|
||||
op,
|
||||
"identity",
|
||||
CallRes(operand :: Nil, Some(target)),
|
||||
onPeer
|
||||
).leaf
|
||||
|
||||
def join(onPeer: ValueModel, operands: NonEmptyList[ValueModel]): ResolvedOp.Tree =
|
||||
CallServiceRes(
|
||||
op,
|
||||
"noop",
|
||||
CallRes(operands.toList, None),
|
||||
onPeer
|
||||
).leaf
|
||||
|
||||
private val initPeerId = ValueModel.fromRaw(ValueRaw.InitPeerId)
|
||||
|
||||
private def orInit(currentPeerId: Option[ValueModel]): ValueModel =
|
||||
currentPeerId.getOrElse(initPeerId)
|
||||
|
||||
private def isNillLiteral(vm: ValueModel): Boolean = vm match {
|
||||
case LiteralModel(value, t) if value == ValueRaw.Nil.value && t == ValueRaw.Nil.`type` => true
|
||||
case _ => false
|
||||
RestrictionRes(streamName, streamType).wrap(
|
||||
RestrictionRes(canonName, canonType).wrap(
|
||||
CanonRes(
|
||||
operand = VarModel(streamName, streamType),
|
||||
peerId = onPeer,
|
||||
exportTo = CallModel.Export(canonName, canonType)
|
||||
).leaf
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def resolve(
|
||||
@ -56,7 +47,7 @@ object MakeRes {
|
||||
case MatchMismatchModel(a, b, s) =>
|
||||
MatchMismatchRes(a, b, s).leaf
|
||||
case ForModel(item, iter, mode) if !isNillLiteral(iter) => FoldRes(item, iter, mode).leaf
|
||||
case RestrictionModel(item, isStream) => RestrictionRes(item, isStream).leaf
|
||||
case RestrictionModel(item, itemType) => RestrictionRes(item, itemType).leaf
|
||||
case DetachModel => ParRes.leaf
|
||||
case ParModel => ParRes.leaf
|
||||
case XorModel => XorRes.leaf
|
||||
@ -85,8 +76,6 @@ object MakeRes {
|
||||
ApRes(operand, CallModel.Export(assignTo, ArrayType(el))).leaf
|
||||
case FlattenModel(operand, assignTo) =>
|
||||
ApRes(operand, CallModel.Export(assignTo, operand.`type`)).leaf
|
||||
case JoinModel(operands) =>
|
||||
join(orInit(currentPeerId), operands)
|
||||
case CallServiceModel(serviceId, funcName, CallModel(args, exportTo)) =>
|
||||
CallServiceRes(
|
||||
serviceId,
|
||||
@ -99,4 +88,14 @@ object MakeRes {
|
||||
NullRes.leaf
|
||||
|
||||
}
|
||||
|
||||
private val initPeerId = ValueModel.fromRaw(ValueRaw.InitPeerId)
|
||||
|
||||
private def orInit(currentPeerId: Option[ValueModel]): ValueModel =
|
||||
currentPeerId.getOrElse(initPeerId)
|
||||
|
||||
private def isNillLiteral(vm: ValueModel): Boolean = vm match {
|
||||
case LiteralModel(value, t) if value == ValueRaw.Nil.value && t == ValueRaw.Nil.`type` => true
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package aqua.res
|
||||
import aqua.model.{CallModel, ForModel, ValueModel, VarModel}
|
||||
import aqua.raw.ops.Call
|
||||
import aqua.tree.{TreeNode, TreeNodeCompanion}
|
||||
import aqua.types.DataType
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
import cats.Show
|
||||
@ -31,12 +32,13 @@ case class MatchMismatchRes(left: ValueModel, right: ValueModel, shouldMatch: Bo
|
||||
override def toString: String = s"(${if (shouldMatch) "match" else "mismatch"} $left $right)"
|
||||
}
|
||||
|
||||
case class FoldRes(item: String, iterable: ValueModel, mode: Option[ForModel.Mode] = None) extends ResolvedOp {
|
||||
case class FoldRes(item: String, iterable: ValueModel, mode: Option[ForModel.Mode] = None)
|
||||
extends ResolvedOp {
|
||||
override def toString: String = s"(fold $iterable $item ${mode.map(_.toString).getOrElse("")}"
|
||||
}
|
||||
|
||||
case class RestrictionRes(item: String, isStream: Boolean) extends ResolvedOp {
|
||||
override def toString: String = s"(new ${if (isStream) "$" else ""}$item "
|
||||
case class RestrictionRes(item: String, `type`: DataType) extends ResolvedOp {
|
||||
override def toString: String = s"(new ${`type`.airPrefix}$item "
|
||||
}
|
||||
|
||||
case class CallServiceRes(
|
||||
@ -52,7 +54,8 @@ case class ApRes(operand: ValueModel, exportTo: CallModel.Export) extends Resolv
|
||||
override def toString: String = s"(ap $operand $exportTo)"
|
||||
}
|
||||
|
||||
case class CanonRes(operand: ValueModel, peerId: ValueModel, exportTo: CallModel.Export) extends ResolvedOp {
|
||||
case class CanonRes(operand: ValueModel, peerId: ValueModel, exportTo: CallModel.Export)
|
||||
extends ResolvedOp {
|
||||
override def toString: String = s"(canon $peerId $operand $exportTo)"
|
||||
}
|
||||
|
||||
|
47
model/res/src/test/scala/aqua/res/ResBuilder.scala
Normal file
47
model/res/src/test/scala/aqua/res/ResBuilder.scala
Normal file
@ -0,0 +1,47 @@
|
||||
package aqua.res
|
||||
|
||||
import aqua.model.*
|
||||
import aqua.types.*
|
||||
import aqua.raw.value.*
|
||||
|
||||
import cats.data.Chain
|
||||
|
||||
object ResBuilder {
|
||||
|
||||
def join(stream: VarModel, onIdx: ValueModel, peer: ValueModel) = {
|
||||
val testVM = VarModel(stream.name + "_test", stream.`type`)
|
||||
val testStreamType = stream.`type`.asInstanceOf[StreamType] // Unsafe
|
||||
val iter = VarModel(stream.name + "_fold_var", ScalarType.string)
|
||||
val canon = VarModel(stream.name + "_iter_canon", CanonStreamType(ScalarType.string))
|
||||
val canonRes = VarModel(stream.name + "_result_canon", CanonStreamType(ScalarType.string))
|
||||
val arrayRes = VarModel(stream.name + "_gate", ArrayType(ScalarType.string))
|
||||
val idx = VarModel(stream.name + "_incr", ScalarType.u32)
|
||||
|
||||
RestrictionRes(testVM.name, testStreamType).wrap(
|
||||
CallServiceRes(
|
||||
LiteralModel("\"math\"", ScalarType.string),
|
||||
"add",
|
||||
CallRes(
|
||||
onIdx :: LiteralModel.fromRaw(LiteralRaw.number(1)) :: Nil,
|
||||
Some(CallModel.Export(idx.name, idx.`type`))
|
||||
),
|
||||
peer
|
||||
).leaf,
|
||||
FoldRes(iter.name, stream, Some(ForModel.NeverMode)).wrap(
|
||||
ApRes(iter, CallModel.Export(testVM.name, testVM.`type`)).leaf,
|
||||
CanonRes(testVM, peer, CallModel.Export(canon.name, canon.`type`)).leaf,
|
||||
XorRes.wrap(
|
||||
MatchMismatchRes(
|
||||
canon.copy(properties = Chain.one(FunctorModel("length", ScalarType.u32))),
|
||||
idx,
|
||||
true
|
||||
).leaf,
|
||||
NextRes(iter.name).leaf
|
||||
)
|
||||
),
|
||||
CanonRes(testVM, peer, CallModel.Export(canonRes.name, canonRes.`type`)).leaf,
|
||||
ApRes(canonRes, CallModel.Export(arrayRes.name, arrayRes.`type`)).leaf
|
||||
)
|
||||
}
|
||||
|
||||
}
|
@ -7,7 +7,7 @@ import cats.Show
|
||||
import cats.Eval
|
||||
import cats.data.NonEmptyList
|
||||
import aqua.tree.{TreeNode, TreeNodeCompanion}
|
||||
import aqua.types.ScalarType
|
||||
import aqua.types.*
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
@ -95,7 +95,9 @@ case class NextModel(item: String) extends OpModel {
|
||||
|
||||
}
|
||||
|
||||
case class RestrictionModel(name: String, isStream: Boolean) extends SeqGroupModel {
|
||||
// TODO: Refactor out `name` and `type` to
|
||||
// something like VarModel without properties
|
||||
case class RestrictionModel(name: String, `type`: DataType) extends SeqGroupModel {
|
||||
override def usesVarNames: Set[String] = Set.empty
|
||||
|
||||
override def restrictsVarNames: Set[String] = Set(name)
|
||||
@ -187,14 +189,6 @@ case class CanonicalizeModel(operand: ValueModel, exportTo: CallModel.Export)
|
||||
override def usesVarNames: Set[String] = operand.usesVarNames
|
||||
}
|
||||
|
||||
case class JoinModel(operands: NonEmptyList[ValueModel]) extends ForceExecModel {
|
||||
|
||||
override def toString: String = s"join ${operands.toList.mkString(", ")}"
|
||||
|
||||
override lazy val usesVarNames: Set[String] =
|
||||
operands.toList.flatMap(_.usesVarNames).toSet
|
||||
}
|
||||
|
||||
case class CaptureTopologyModel(name: String) extends NoExecModel
|
||||
case class ApplyTopologyModel(name: String) extends SeqGroupModel
|
||||
|
||||
|
@ -273,8 +273,8 @@ object Topology extends Logging {
|
||||
|
||||
val chainZipperEv = resolved.traverse(tree =>
|
||||
(
|
||||
rc.topology.pathBefore.map(through(_, s"before ${currI}")),
|
||||
rc.topology.pathAfter.map(through(_, s"after ${currI}", reversed = true))
|
||||
rc.topology.pathBefore.map(through(_)),
|
||||
rc.topology.pathAfter.map(through(_, reversed = true))
|
||||
).mapN { case (pathBefore, pathAfter) =>
|
||||
ChainZipper(pathBefore, tree, pathAfter)
|
||||
}.flatTap(logResolvedDebugInfo(rc, _, tree))
|
||||
@ -299,20 +299,19 @@ object Topology extends Logging {
|
||||
// Walks through peer IDs, doing a noop function on each
|
||||
def through(
|
||||
peerIds: Chain[ValueModel],
|
||||
log: String = null,
|
||||
reversed: Boolean = false
|
||||
): Chain[Res] = peerIds.map { v =>
|
||||
v.`type` match {
|
||||
case _: BoxType =>
|
||||
val itemName = "-via-peer-"
|
||||
val steps = Chain(
|
||||
MakeRes.noop(VarModel(itemName, ScalarType.string, Chain.empty), log),
|
||||
MakeRes.hop(VarModel(itemName, ScalarType.string, Chain.empty)),
|
||||
NextRes(itemName).leaf
|
||||
)
|
||||
|
||||
FoldRes(itemName, v).wrap(if (reversed) steps.reverse else steps)
|
||||
case _ =>
|
||||
MakeRes.noop(v, log)
|
||||
MakeRes.hop(v)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,9 @@ trait After {
|
||||
Eval.now(Chain.empty)
|
||||
}
|
||||
|
||||
// If exit is forced, make a path outside this node
|
||||
// – from where it ends to where execution is expected to continue,
|
||||
// explicitly pinging the next node (useful inside par branches)
|
||||
def pathAfterAndPingNext(current: Topology): Eval[Chain[ValueModel]] =
|
||||
current.forceExit.flatMap {
|
||||
case false => Eval.now(Chain.empty)
|
||||
@ -48,7 +51,12 @@ trait After {
|
||||
case (e, a, _) if e == a => Chain.empty
|
||||
case (e, a, l) if l.contains(e) =>
|
||||
// Pingback in case no relays involved
|
||||
Chain.fromOption(a.headOption.map(_.peerId))
|
||||
Chain.fromOption(
|
||||
a.headOption
|
||||
// Add nothing if last node is the same
|
||||
.filterNot(e.headOption.contains)
|
||||
.map(_.peerId)
|
||||
)
|
||||
case (e, a, _) =>
|
||||
// We wasn't at e, so need to get through the last peer in case it matches with the relay
|
||||
Topology.findRelayPathEnforcement(a, e) ++ Chain.fromOption(
|
||||
|
@ -1,10 +1,13 @@
|
||||
package aqua.model.transform.topology.strategy
|
||||
|
||||
import aqua.model.transform.topology.Topology
|
||||
import aqua.model.{OnModel, ParGroupModel, SeqGroupModel, ValueModel}
|
||||
import aqua.model.{OnModel, ParGroupModel, SeqGroupModel, ValueModel, XorModel}
|
||||
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.syntax.functor.*
|
||||
import cats.instances.lazyList.*
|
||||
import cats.syntax.option.*
|
||||
|
||||
// Parent == Xor
|
||||
object XorBranch extends Before with After {
|
||||
@ -13,24 +16,29 @@ object XorBranch extends Before with After {
|
||||
override def beforeOn(current: Topology): Eval[List[OnModel]] =
|
||||
current.prevSibling.map(_.endsOn) getOrElse super.beforeOn(current)
|
||||
|
||||
private def closestParExit(current: Topology): Option[Topology] =
|
||||
// Find closest par exit up and return its branch current is in
|
||||
// Returns none if there is no par up
|
||||
// or current is not at its exit
|
||||
private def closestParExitChild(current: Topology): Option[Topology] =
|
||||
current.parents
|
||||
.map(t => t -> t.parent.map(_.cursor.op))
|
||||
.takeWhile {
|
||||
case (t, Some(_: ParGroupModel)) => true
|
||||
case (t, Some(_: SeqGroupModel)) => t.nextSibling.isEmpty
|
||||
.fproduct(_.parent.map(_.cursor.op))
|
||||
.dropWhile {
|
||||
case (t, Some(_: SeqGroupModel)) =>
|
||||
t.nextSibling.isEmpty
|
||||
case (_, Some(XorModel)) =>
|
||||
true
|
||||
case _ => false
|
||||
}
|
||||
.map(_._1)
|
||||
.map(t => t -> t.cursor.op)
|
||||
.collectFirst { case (t, _: ParGroupModel) =>
|
||||
// println(Console.GREEN + s"collect ${t}" + Console.RESET)
|
||||
t
|
||||
}
|
||||
.headOption
|
||||
.collect { case (t, Some(_: ParGroupModel)) => t }
|
||||
|
||||
private def closestParExit(current: Topology): Option[Topology] =
|
||||
closestParExitChild(current).flatMap(_.parent)
|
||||
|
||||
override def forceExit(current: Topology): Eval[Boolean] =
|
||||
closestParExit(current)
|
||||
.fold(Eval.later(current.cursor.moveUp.exists(_.hasExecLater)))(_.forceExit)
|
||||
closestParExitChild(current).fold(
|
||||
Eval.later(current.cursor.moveUp.exists(_.hasExecLater))
|
||||
)(_.forceExit) // Force exit if par branch needs it
|
||||
|
||||
override def afterOn(current: Topology): Eval[List[OnModel]] =
|
||||
current.forceExit.flatMap {
|
||||
@ -41,5 +49,8 @@ object XorBranch extends Before with After {
|
||||
|
||||
// Parent of this branch's parent xor – fixes the case when this xor is in par
|
||||
override def pathAfter(current: Topology): Eval[Chain[ValueModel]] =
|
||||
closestParExit(current).fold(super.pathAfter(current))(_ => pathAfterAndPingNext(current))
|
||||
closestParExit(current).fold(super.pathAfter(current))(_ =>
|
||||
// Ping next if we are exiting from par
|
||||
super.pathAfterAndPingNext(current)
|
||||
)
|
||||
}
|
||||
|
@ -21,6 +21,11 @@ import aqua.res.{CallRes, CallServiceRes, MakeRes}
|
||||
import aqua.types.{ArrayType, LiteralType, ScalarType}
|
||||
|
||||
import scala.language.implicitConversions
|
||||
import aqua.types.StreamType
|
||||
import aqua.model.IntoIndexModel
|
||||
import cats.data.Chain
|
||||
import cats.data.Chain.==:
|
||||
import aqua.model.inline.raw.ApplyGateRawInliner
|
||||
|
||||
object ModelBuilder {
|
||||
implicit def rawToValue(raw: ValueRaw): ValueModel = ValueModel.fromRaw(raw)
|
||||
@ -125,6 +130,32 @@ object ModelBuilder {
|
||||
)
|
||||
}
|
||||
|
||||
def through(peer: ValueModel, log: String = null) =
|
||||
MakeRes.noop(peer, log)
|
||||
def through(peer: ValueModel) =
|
||||
MakeRes.hop(peer)
|
||||
|
||||
/**
|
||||
* @param stream stream [[VarModel]]
|
||||
* @param idx id [[ValueModel]]
|
||||
* @return [[OpModel.Tree]] of join of `stream[idx]`
|
||||
*/
|
||||
def join(stream: VarModel, idx: ValueModel): OpModel.Tree =
|
||||
stream match {
|
||||
case VarModel(
|
||||
streamName,
|
||||
streamType: StreamType,
|
||||
Chain.`nil`
|
||||
) =>
|
||||
ApplyGateRawInliner.joinStreamOnIndexModel(
|
||||
streamName = streamName,
|
||||
streamType = streamType,
|
||||
idxModel = idx,
|
||||
idxIncrName = streamName + "_incr",
|
||||
testName = streamName + "_test",
|
||||
iterName = streamName + "_fold_var",
|
||||
canonName = streamName + "_result_canon",
|
||||
iterCanonName = streamName + "_iter_canon",
|
||||
resultName = streamName + "_gate"
|
||||
)
|
||||
case _ => ???
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import aqua.raw.value.{IntoIndexRaw, LiteralRaw, VarRaw}
|
||||
import aqua.types.{LiteralType, ScalarType, StreamType}
|
||||
import cats.Eval
|
||||
import cats.data.{Chain, NonEmptyList}
|
||||
import cats.data.Chain.*
|
||||
import cats.free.Cofree
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
@ -16,10 +17,31 @@ import cats.syntax.option.*
|
||||
import aqua.types.ArrayType
|
||||
import aqua.raw.ConstantRaw.initPeerId
|
||||
import aqua.model.ForModel.NullMode
|
||||
import aqua.raw.value.ValueRaw
|
||||
|
||||
class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
import ModelBuilder._
|
||||
import ModelBuilder.{join as joinModel, *}
|
||||
import ResBuilder.join as joinRes
|
||||
|
||||
def joinModelRes(streamEl: ValueRaw | ValueModel): (OpModel.Tree, ResolvedOp.Tree) =
|
||||
streamEl match {
|
||||
case vm: ValueModel => vm
|
||||
case vr: ValueRaw => ValueModel.fromRaw(vr)
|
||||
} match {
|
||||
case stream @ VarModel(name, baseType, IntoIndexModel(idx, idxType) ==: Chain.`nil`) =>
|
||||
val idxModel =
|
||||
if (idx.forall(Character.isDigit)) LiteralModel(idx, idxType)
|
||||
else VarModel(idx, idxType)
|
||||
|
||||
val streamWithoutIdx = stream.copy(properties = Chain.`nil`)
|
||||
|
||||
(
|
||||
joinModel(streamWithoutIdx, idxModel),
|
||||
joinRes(streamWithoutIdx, idxModel, ValueModel.fromRaw(initPeer))
|
||||
)
|
||||
case _ => ???
|
||||
}
|
||||
|
||||
"topology resolver" should "do nothing on init peer" in {
|
||||
|
||||
@ -435,6 +457,8 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
val stream = ValueModel.fromRaw(streamRaw)
|
||||
val streamEl = ValueModel.fromRaw(streamRawEl)
|
||||
|
||||
val (joinModel, joinRes) = joinModelRes(streamEl)
|
||||
|
||||
val init =
|
||||
SeqModel.wrap(
|
||||
DeclareStreamModel(stream).leaf,
|
||||
@ -451,7 +475,7 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
)
|
||||
)
|
||||
),
|
||||
JoinModel(NonEmptyList.one(streamEl)).leaf,
|
||||
joinModel,
|
||||
callModel(3, Nil, streamRaw :: Nil)
|
||||
)
|
||||
)
|
||||
@ -468,27 +492,25 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
XorRes.wrap(
|
||||
callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))),
|
||||
SeqRes.wrap(
|
||||
callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))),
|
||||
through(relay),
|
||||
through(initPeer)
|
||||
),
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
callRes(4, initPeer)
|
||||
)
|
||||
),
|
||||
through(relay),
|
||||
through(initPeer)
|
||||
)
|
||||
),
|
||||
NextRes("i").leaf
|
||||
)
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel(s"\"op\"", LiteralType.string),
|
||||
s"noop",
|
||||
CallRes(streamEl :: Nil, None),
|
||||
initPeer
|
||||
).leaf,
|
||||
joinRes,
|
||||
callRes(3, initPeer, None, stream :: Nil)
|
||||
)
|
||||
|
||||
proc.equalsOrShowDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
@ -502,6 +524,8 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
val stream = ValueModel.fromRaw(streamRaw)
|
||||
val streamEl = ValueModel.fromRaw(streamRawEl)
|
||||
|
||||
val (joinModel, joinRes) = joinModelRes(streamEl)
|
||||
|
||||
val init =
|
||||
SeqModel.wrap(
|
||||
DeclareStreamModel(stream).leaf,
|
||||
@ -520,7 +544,7 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
)
|
||||
)
|
||||
),
|
||||
JoinModel(NonEmptyList.one(streamEl)).leaf,
|
||||
joinModel,
|
||||
callModel(3, Nil, streamRaw :: Nil)
|
||||
)
|
||||
)
|
||||
@ -538,26 +562,20 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
through(relay),
|
||||
XorRes.wrap(
|
||||
XorRes.wrap(
|
||||
callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`)))
|
||||
SeqRes.wrap(
|
||||
callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))),
|
||||
through(relay),
|
||||
through(initPeer)
|
||||
)
|
||||
),
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
callRes(4, initPeer)
|
||||
)
|
||||
),
|
||||
through(relay),
|
||||
through(initPeer)
|
||||
callRes(4, initPeer)
|
||||
)
|
||||
),
|
||||
NextRes("i").leaf
|
||||
)
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel(s"\"op\"", LiteralType.string),
|
||||
s"noop",
|
||||
CallRes(streamEl :: Nil, None),
|
||||
initPeer
|
||||
).leaf,
|
||||
joinRes,
|
||||
callRes(3, initPeer, None, stream :: Nil)
|
||||
)
|
||||
|
||||
@ -784,6 +802,8 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
val usedWithIdx =
|
||||
used.withProperty(IntoIndexRaw(LiteralRaw("1", ScalarType.u32), ScalarType.string))
|
||||
|
||||
val (joinModel, joinRes) = joinModelRes(usedWithIdx)
|
||||
|
||||
val init =
|
||||
OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
foldPar(
|
||||
@ -793,7 +813,7 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
callModel(1, CallModel.Export(used.name, used.`type`) :: Nil)
|
||||
)
|
||||
),
|
||||
JoinModel(NonEmptyList.one(usedWithIdx)).leaf,
|
||||
joinModel,
|
||||
callModel(3, Nil, used :: Nil)
|
||||
)
|
||||
|
||||
@ -818,12 +838,7 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
)
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel(s"\"op\"", LiteralType.string),
|
||||
s"noop",
|
||||
CallRes(usedWithIdx :: Nil, None),
|
||||
initPeer
|
||||
).leaf,
|
||||
joinRes,
|
||||
callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil)
|
||||
)
|
||||
|
||||
@ -835,6 +850,9 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
val used = VarRaw("used", StreamType(ScalarType.string))
|
||||
val usedWithIdx =
|
||||
used.withProperty(IntoIndexRaw(LiteralRaw("1", ScalarType.u32), ScalarType.string))
|
||||
|
||||
val (joinModel, joinRes) = joinModelRes(usedWithIdx)
|
||||
|
||||
val init =
|
||||
OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
foldPar(
|
||||
@ -846,7 +864,7 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
)
|
||||
)
|
||||
),
|
||||
JoinModel(NonEmptyList.one(usedWithIdx)).leaf,
|
||||
joinModel,
|
||||
callModel(3, Nil, used :: Nil)
|
||||
)
|
||||
|
||||
@ -860,25 +878,22 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
XorRes.wrap(
|
||||
callRes(
|
||||
1,
|
||||
ValueModel.fromRaw(i),
|
||||
Some(CallModel.Export(used.name, used.`type`))
|
||||
SeqRes.wrap(
|
||||
callRes(
|
||||
1,
|
||||
ValueModel.fromRaw(i),
|
||||
Some(CallModel.Export(used.name, used.`type`))
|
||||
),
|
||||
through(relay),
|
||||
through(initPeer)
|
||||
)
|
||||
),
|
||||
through(relay),
|
||||
through(initPeer)
|
||||
)
|
||||
),
|
||||
NextRes("i").leaf
|
||||
)
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel(s"\"op\"", LiteralType.string),
|
||||
s"noop",
|
||||
CallRes(usedWithIdx :: Nil, None),
|
||||
initPeer
|
||||
).leaf,
|
||||
joinRes,
|
||||
callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil)
|
||||
)
|
||||
|
||||
@ -888,7 +903,8 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
"topology resolver" should "handle empty for correctly [bug LNG-149]" in {
|
||||
val streamName = "array-inline"
|
||||
val iterName = "a-0"
|
||||
val stream = VarModel(streamName, StreamType(LiteralType.number))
|
||||
val streamType = StreamType(LiteralType.number)
|
||||
val stream = VarModel(streamName, streamType)
|
||||
val array = VarModel(s"$streamName-0", ArrayType(LiteralType.number))
|
||||
|
||||
val literal = (i: String) => LiteralModel(i, LiteralType.number)
|
||||
@ -901,7 +917,7 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
val model = OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
SeqModel.wrap(
|
||||
RestrictionModel(streamName, true).wrap(
|
||||
RestrictionModel(streamName, streamType).wrap(
|
||||
push("1"),
|
||||
push("2"),
|
||||
CanonicalizeModel(stream, CallModel.Export(array.name, array.`type`)).leaf
|
||||
@ -915,7 +931,7 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
val proc = Topology.resolve(model).value
|
||||
|
||||
val expected = SeqRes.wrap(
|
||||
RestrictionRes(streamName, true).wrap(
|
||||
RestrictionRes(streamName, streamType).wrap(
|
||||
ApRes(literal("1"), CallModel.Export(stream.name, stream.`type`)).leaf,
|
||||
ApRes(literal("2"), CallModel.Export(stream.name, stream.`type`)).leaf,
|
||||
CanonRes(
|
||||
|
@ -17,10 +17,13 @@ trait TreeNode[T <: TreeNode[T]] {
|
||||
|
||||
def wrap(children: Chain[Tree]): Tree = Cofree(self, Eval.now(children))
|
||||
|
||||
protected def wrapNonEmpty(children: Chain[Tree], empty: Tree): Tree = children match {
|
||||
case Chain.nil => empty
|
||||
case x ==: Chain.nil => x
|
||||
case _ => Cofree(self, Eval.now(children))
|
||||
}
|
||||
protected def wrapNonEmpty(children: Chain[Tree], empty: Tree): Tree =
|
||||
children match {
|
||||
case Chain.nil => empty
|
||||
case x ==: Chain.nil => x
|
||||
// Do not use `wrap` here as children
|
||||
// could redefine `wrap` through this method
|
||||
case _ => Cofree(self, Eval.now(children))
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -159,7 +159,8 @@ class ArrowSem[S[_]](val expr: ArrowExpr[S]) extends AnyVal {
|
||||
|
||||
// wrap streams with restrictions
|
||||
val bodyWithRestrictions = localStreams.foldLeft(bodyModified) {
|
||||
case (bm, (streamName, _)) => RestrictionTag(streamName, isStream = true).wrap(bm)
|
||||
case (bm, (streamName, streamType)) =>
|
||||
RestrictionTag(streamName, streamType).wrap(bm)
|
||||
}
|
||||
|
||||
ArrowRaw(funcArrow, returnValuesModified.toList, bodyWithRestrictions)
|
||||
|
@ -10,7 +10,7 @@ import aqua.semantics.rules.ValuesAlgebra
|
||||
import aqua.semantics.rules.abilities.AbilitiesAlgebra
|
||||
import aqua.semantics.rules.names.NamesAlgebra
|
||||
import aqua.semantics.rules.types.TypesAlgebra
|
||||
import aqua.types.{ArrayType, BoxType}
|
||||
import aqua.types.{ArrayType, BoxType, StreamType}
|
||||
|
||||
import cats.Monad
|
||||
import cats.data.Chain
|
||||
@ -42,11 +42,10 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal {
|
||||
},
|
||||
(stOpt: Option[ValueRaw], ops: Raw) =>
|
||||
N.streamsDefinedWithinScope()
|
||||
.map(_.keySet)
|
||||
.map(streams =>
|
||||
(stOpt, ops) match {
|
||||
case (Some(vm), FuncOp(op)) =>
|
||||
val innerTag = expr.mode.fold[RawTag](SeqTag) {
|
||||
val innerTag = expr.mode.fold(SeqTag) {
|
||||
case ForExpr.Mode.ParMode => ParTag
|
||||
case ForExpr.Mode.TryMode => TryTag
|
||||
}
|
||||
@ -58,8 +57,8 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal {
|
||||
innerTag
|
||||
.wrap(
|
||||
// Restrict the streams created within this scope
|
||||
streams.toList.foldLeft(op) { case (b, streamName) =>
|
||||
RestrictionTag(streamName, isStream = true).wrap(b)
|
||||
streams.toList.foldLeft(op) { case (tree, (streamName, streamType)) =>
|
||||
RestrictionTag(streamName, streamType).wrap(tree)
|
||||
},
|
||||
NextTag(expr.item.value).leaf
|
||||
)
|
||||
|
@ -26,6 +26,16 @@ sealed trait Type {
|
||||
def uniteBottom(other: Type): Type = UniteTypes.bottom.combine(this, other)
|
||||
|
||||
def properties: Map[String, Type] = Map.empty
|
||||
|
||||
/**
|
||||
* Use for printing purposes only
|
||||
* Ideally should be in sync with [[AirGen.varNameToString]]
|
||||
*/
|
||||
def airPrefix: String = this match {
|
||||
case _: StreamType => "$"
|
||||
case _: CanonStreamType => "#"
|
||||
case _ => ""
|
||||
}
|
||||
}
|
||||
|
||||
// Product is a list of (optionally labelled) types
|
||||
|
Loading…
Reference in New Issue
Block a user