mirror of
https://github.com/fluencelabs/aqua.git
synced 2024-12-04 14:40:17 +00:00
feat(compiler): Make on
propagate errors [fixes LNG-203] (#788)
* Add fail model * Make `on` propagate error * Fix unit tests * Fix TryTag inlining * Update XorModel.wrap * Add comments * Remove wrapWithXor parameter * Add unit tests * Add integration tests * Add comments * Fix XorBranch topology
This commit is contained in:
parent
a7dba14c7c
commit
b8b0fafda0
@ -8,14 +8,13 @@ case class AquaAPIConfig(
|
||||
targetType: TargetType = TargetType.AirType,
|
||||
logLevel: String = "info",
|
||||
constants: List[String] = Nil,
|
||||
noXor: Boolean = false,
|
||||
noXor: Boolean = false, // TODO: Remove
|
||||
noRelay: Boolean = false,
|
||||
tracing: Boolean = false
|
||||
) {
|
||||
|
||||
def getTransformConfig: TransformConfig = {
|
||||
val config = TransformConfig(
|
||||
wrapWithXor = !noXor,
|
||||
tracing = Option.when(tracing)(TransformConfig.TracingConfig.default)
|
||||
)
|
||||
|
||||
|
@ -24,6 +24,8 @@ object Keyword {
|
||||
|
||||
case object Ap extends Keyword("ap")
|
||||
|
||||
case object Fail extends Keyword("fail")
|
||||
|
||||
case object Canon extends Keyword("canon")
|
||||
|
||||
case object Seq extends Keyword("seq")
|
||||
@ -110,6 +112,8 @@ object Air {
|
||||
|
||||
case class Ap(op: DataView, result: String) extends Air(Keyword.Ap)
|
||||
|
||||
case class Fail(op: DataView) extends Air(Keyword.Fail)
|
||||
|
||||
case class Canon(op: DataView, peerId: DataView, result: String) extends Air(Keyword.Canon)
|
||||
|
||||
case class Comment(comment: String, air: Air) extends Air(Keyword.NA)
|
||||
@ -143,6 +147,7 @@ object Air {
|
||||
case Air.Call(triplet, args, res) ⇒
|
||||
s" ${triplet.show} [${args.map(_.show).mkString(" ")}]${res.fold("")(" " + _)}"
|
||||
case Air.Ap(operand, result) ⇒ s" ${operand.show} $result"
|
||||
case Air.Fail(operand) => s" ${operand.show}"
|
||||
case Air.Canon(operand, peerId, result) ⇒ s" ${peerId.show} ${operand.show} $result"
|
||||
case Air.Comment(_, _) => ";; Should not be displayed"
|
||||
}) + ")\n"
|
||||
|
@ -118,6 +118,11 @@ object AirGen extends Logging {
|
||||
ApGen(valueToData(operand), exportToString(exportTo))
|
||||
)
|
||||
|
||||
case FailRes(operand) =>
|
||||
Eval.later(
|
||||
FailGen(valueToData(operand))
|
||||
)
|
||||
|
||||
case CanonRes(operand, peerId, exportTo) =>
|
||||
Eval.later(
|
||||
CanonGen(valueToData(operand), valueToData(peerId), exportToString(exportTo))
|
||||
@ -164,6 +169,12 @@ case class ApGen(operand: DataView, result: String) extends AirGen {
|
||||
Air.Ap(operand, result)
|
||||
}
|
||||
|
||||
case class FailGen(operand: DataView) extends AirGen {
|
||||
|
||||
override def generate: Air =
|
||||
Air.Fail(operand)
|
||||
}
|
||||
|
||||
case class CanonGen(operand: DataView, peerId: DataView, result: String) extends AirGen {
|
||||
|
||||
override def generate: Air =
|
||||
|
12
build.sbt
12
build.sbt
@ -74,8 +74,8 @@ lazy val cliJS = cli.js
|
||||
.settings(
|
||||
Compile / fastOptJS / artifactPath := baseDirectory.value / "../../cli-npm" / "aqua.js",
|
||||
Compile / fullOptJS / artifactPath := baseDirectory.value / "../../cli-npm" / "aqua.js",
|
||||
scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.ESModule)),
|
||||
scalaJSUseMainModuleInitializer := true
|
||||
scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.ESModule)),
|
||||
scalaJSUseMainModuleInitializer := true
|
||||
)
|
||||
.dependsOn(`js-exports`, `js-imports`)
|
||||
|
||||
@ -155,9 +155,9 @@ lazy val `aqua-apiJS` = `aqua-api`.js
|
||||
.settings(
|
||||
Compile / fastOptJS / artifactPath := baseDirectory.value / "../../api-npm" / "aqua-api.js",
|
||||
Compile / fullOptJS / artifactPath := baseDirectory.value / "../../api-npm" / "aqua-api.js",
|
||||
scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.CommonJSModule)),
|
||||
scalaJSUseMainModuleInitializer := true,
|
||||
Test / test := {}
|
||||
scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.CommonJSModule)),
|
||||
scalaJSUseMainModuleInitializer := true,
|
||||
Test / test := {}
|
||||
)
|
||||
.enablePlugins(ScalaJSPlugin)
|
||||
.dependsOn(`js-exports`)
|
||||
@ -252,7 +252,7 @@ lazy val compiler = crossProject(JVMPlatform, JSPlatform)
|
||||
.crossType(CrossType.Pure)
|
||||
.in(file("compiler"))
|
||||
.settings(commons: _*)
|
||||
.dependsOn(semantics, linker, backend, transform % Test, res % "test->test")
|
||||
.dependsOn(semantics, linker, backend, transform % "test->test", res % "test->test")
|
||||
|
||||
lazy val backend = crossProject(JVMPlatform, JSPlatform)
|
||||
.withoutSuffixFor(JVMPlatform)
|
||||
|
@ -22,7 +22,7 @@ import cats.syntax.applicative.*
|
||||
import cats.syntax.apply.*
|
||||
import cats.syntax.flatMap.*
|
||||
import cats.syntax.functor.*
|
||||
import cats.{Id, Monad, ~>}
|
||||
import cats.{~>, Id, Monad}
|
||||
import com.monovore.decline.{Command, Opts}
|
||||
import fs2.io.file.{Files, Path}
|
||||
import scribe.Logging
|
||||
@ -45,14 +45,15 @@ object RunOpts extends Logging {
|
||||
): TransformConfig = {
|
||||
val tc = TransformConfig(
|
||||
constants =
|
||||
onPeer.map(s => ConstantRaw(OnPeerConst, LiteralRaw.quote(s), false)).toList ++ constants,
|
||||
wrapWithXor = !noXor
|
||||
onPeer.map(s => ConstantRaw(OnPeerConst, LiteralRaw.quote(s), false)).toList ++ constants
|
||||
)
|
||||
tc.copy(relayVarName = tc.relayVarName.filterNot(_ => noRelay))
|
||||
}
|
||||
|
||||
def runOptsCompose[F[_]: Files: Concurrent]
|
||||
: Opts[F[ValidatedNec[String, (Option[AquaPath], List[Path], FuncWithData, Option[NonEmptyList[JsonService]], List[String])]]] = {
|
||||
def runOptsCompose[F[_]: Files: Concurrent]: Opts[F[ValidatedNec[
|
||||
String,
|
||||
(Option[AquaPath], List[Path], FuncWithData, Option[NonEmptyList[JsonService]], List[String])
|
||||
]]] = {
|
||||
(
|
||||
AppOpts.wrapWithOption(AppOpts.inputOpts[F]),
|
||||
AppOpts.importOpts[F],
|
||||
@ -72,8 +73,9 @@ object RunOpts extends Logging {
|
||||
.getOrElse(validNec[String, Option[NonEmptyList[JsonService]]](None).pure[F])
|
||||
pluginsPathsV <- pluginsOp.getOrElse(validNec[String, List[String]](Nil).pure[F])
|
||||
} yield {
|
||||
(inputV, importV, funcWithArgsV, jsonServiceV, pluginsPathsV).mapN { case (i, im, f, j, p) =>
|
||||
(i, im, f, j, p)
|
||||
(inputV, importV, funcWithArgsV, jsonServiceV, pluginsPathsV).mapN {
|
||||
case (i, im, f, j, p) =>
|
||||
(i, im, f, j, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,15 +17,7 @@ import aqua.raw.ops.{Call, CallArrowRawTag}
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw}
|
||||
import aqua.res.{AquaRes, FuncRes}
|
||||
import aqua.run.RunOpts.logger
|
||||
import aqua.run.{
|
||||
CliFunc,
|
||||
FuncCompiler,
|
||||
GeneralOptions,
|
||||
GeneralOpts,
|
||||
RunCommand,
|
||||
RunConfig,
|
||||
RunOpts
|
||||
}
|
||||
import aqua.run.{CliFunc, FuncCompiler, GeneralOptions, GeneralOpts, RunCommand, RunConfig, RunOpts}
|
||||
import aqua.types.{ArrowType, LiteralType, NilType, ScalarType}
|
||||
import cats.data.*
|
||||
import cats.data.Validated.{invalid, invalidNec, valid, validNec, validNel}
|
||||
@ -126,7 +118,7 @@ object ScriptOpts extends Logging {
|
||||
imports: List[Path],
|
||||
funcWithArgs: FuncWithLiteralArgs
|
||||
): F[ValidatedNec[String, String]] = {
|
||||
val tConfig = TransformConfig(relayVarName = None, wrapWithXor = false)
|
||||
val tConfig = TransformConfig(relayVarName = None)
|
||||
val funcCompiler =
|
||||
new FuncCompiler[F](
|
||||
Option(RelativePath(input)),
|
||||
|
@ -27,7 +27,7 @@ object Test extends IOApp.Simple {
|
||||
List(Path("./aqua")),
|
||||
Option(Path("./target")),
|
||||
TypeScriptBackend(false, "IFluenceClient$$"),
|
||||
TransformConfig(wrapWithXor = false),
|
||||
TransformConfig(),
|
||||
false
|
||||
)
|
||||
.map {
|
||||
|
@ -94,7 +94,7 @@ object AquaCli extends IOApp with Logging {
|
||||
compileToAir,
|
||||
compileToJs,
|
||||
noRelay,
|
||||
noXorWrapper,
|
||||
noXorWrapper, // TODO: Remove
|
||||
tracing,
|
||||
isOldFluenceJs,
|
||||
wrapWithOption(helpOpt),
|
||||
@ -137,7 +137,6 @@ object AquaCli extends IOApp with Logging {
|
||||
else TypescriptTarget
|
||||
val bc = {
|
||||
val bc = TransformConfig(
|
||||
wrapWithXor = !noXor,
|
||||
constants = constants,
|
||||
tracing = Option.when(tracingEnabled)(TransformConfig.TracingConfig.default)
|
||||
)
|
||||
|
@ -9,6 +9,7 @@ import aqua.model.{
|
||||
ValueModel,
|
||||
VarModel
|
||||
}
|
||||
import aqua.model.transform.ModelBuilder
|
||||
import aqua.model.transform.TransformConfig
|
||||
import aqua.model.transform.Transform
|
||||
import aqua.parser.ParserError
|
||||
@ -42,6 +43,7 @@ import cats.instances.string.*
|
||||
import cats.syntax.show.*
|
||||
|
||||
class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
import ModelBuilder.*
|
||||
|
||||
private def aquaSource(src: Map[String, String], imports: Map[String, String]) = {
|
||||
new AquaSources[Id, String, String] {
|
||||
@ -150,8 +152,8 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
ctxs.length should be(1)
|
||||
val ctx = ctxs.headOption.get
|
||||
|
||||
val aquaRes =
|
||||
Transform.contextRes(ctx, TransformConfig(wrapWithXor = false))
|
||||
val transformCfg = TransformConfig()
|
||||
val aquaRes = Transform.contextRes(ctx, transformCfg)
|
||||
|
||||
val Some(exec) = aquaRes.funcs.find(_.funcName == "exec")
|
||||
|
||||
@ -167,47 +169,49 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
SeqRes.wrap(
|
||||
getDataSrv("-relay-", ScalarType.string),
|
||||
getDataSrv(peers.name, peers.`type`),
|
||||
RestrictionRes(results.name, resultsType).wrap(
|
||||
SeqRes.wrap(
|
||||
ParRes.wrap(
|
||||
FoldRes(peer.name, peers, Some(ForModel.NeverMode)).wrap(
|
||||
ParRes.wrap(
|
||||
// better if first relay will be outside `for`
|
||||
SeqRes.wrap(
|
||||
through(ValueModel.fromRaw(relay)),
|
||||
CallServiceRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("op")),
|
||||
"identity",
|
||||
CallRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("hahahahah")) :: Nil,
|
||||
Some(CallModel.Export(results.name, results.`type`))
|
||||
XorRes.wrap(
|
||||
RestrictionRes(results.name, resultsType).wrap(
|
||||
SeqRes.wrap(
|
||||
ParRes.wrap(
|
||||
FoldRes(peer.name, peers, Some(ForModel.NeverMode)).wrap(
|
||||
ParRes.wrap(
|
||||
XorRes.wrap(
|
||||
// better if first relay will be outside `for`
|
||||
SeqRes.wrap(
|
||||
through(ValueModel.fromRaw(relay)),
|
||||
CallServiceRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("op")),
|
||||
"identity",
|
||||
CallRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("hahahahah")) :: Nil,
|
||||
Some(CallModel.Export(results.name, results.`type`))
|
||||
),
|
||||
peer
|
||||
).leaf,
|
||||
through(ValueModel.fromRaw(relay)),
|
||||
through(initPeer)
|
||||
),
|
||||
peer
|
||||
).leaf,
|
||||
through(ValueModel.fromRaw(relay)),
|
||||
through(initPeer)
|
||||
),
|
||||
NextRes(peer.name).leaf
|
||||
SeqRes.wrap(
|
||||
through(ValueModel.fromRaw(relay)),
|
||||
through(initPeer),
|
||||
failLastErrorRes
|
||||
)
|
||||
),
|
||||
NextRes(peer.name).leaf
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
join(results, LiteralModel.fromRaw(LiteralRaw.number(2))),
|
||||
CanonRes(results, init, CallModel.Export(canonResult.name, canonResult.`type`)).leaf,
|
||||
ApRes(
|
||||
canonResult,
|
||||
CallModel.Export(flatResult.name, flatResult.`type`)
|
||||
).leaf
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("callbackSrv")),
|
||||
"response",
|
||||
CallRes(
|
||||
flatResult :: Nil,
|
||||
None
|
||||
),
|
||||
join(results, LiteralModel.fromRaw(LiteralRaw.number(2))),
|
||||
CanonRes(results, init, CallModel.Export(canonResult.name, canonResult.`type`)).leaf,
|
||||
ApRes(
|
||||
canonResult,
|
||||
CallModel.Export(flatResult.name, flatResult.`type`)
|
||||
).leaf
|
||||
)
|
||||
),
|
||||
initPeer
|
||||
).leaf
|
||||
errorCall(transformCfg, 0, initPeer)
|
||||
),
|
||||
respCall(transformCfg, flatResult, initPeer)
|
||||
)
|
||||
|
||||
exec.body.equalsOrShowDiff(expected) shouldBe (true)
|
||||
@ -267,8 +271,8 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
ctxs.length should be(1)
|
||||
val ctx = ctxs.headOption.get
|
||||
|
||||
val aquaRes =
|
||||
Transform.contextRes(ctx, TransformConfig(wrapWithXor = false, relayVarName = None))
|
||||
val transformCfg = TransformConfig(relayVarName = None)
|
||||
val aquaRes = Transform.contextRes(ctx, transformCfg)
|
||||
|
||||
val Some(funcWrap) = aquaRes.funcs.find(_.funcName == "wrap")
|
||||
val Some(barfoo) = aquaRes.funcs.find(_.funcName == "barfoo")
|
||||
@ -278,8 +282,8 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
val resCanonVM = VarModel("-res-fix-0", CanonStreamType(ScalarType.string))
|
||||
val resFlatVM = VarModel("-res-flat-0", ArrayType(ScalarType.string))
|
||||
|
||||
barfoo.body.equalsOrShowDiff(
|
||||
SeqRes.wrap(
|
||||
val expected = SeqRes.wrap(
|
||||
XorRes.wrap(
|
||||
RestrictionRes(resVM.name, resStreamType).wrap(
|
||||
SeqRes.wrap(
|
||||
// res <- foo()
|
||||
@ -305,14 +309,12 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
).leaf
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("callbackSrv")),
|
||||
"response",
|
||||
CallRes(resFlatVM :: Nil, None),
|
||||
LiteralModel.fromRaw(ValueRaw.InitPeerId)
|
||||
).leaf
|
||||
)
|
||||
) should be(true)
|
||||
errorCall(transformCfg, 0, initPeer)
|
||||
),
|
||||
respCall(transformCfg, resFlatVM, initPeer)
|
||||
)
|
||||
|
||||
barfoo.body.equalsOrShowDiff(expected) should be(true)
|
||||
|
||||
}
|
||||
}
|
||||
|
39
integration-tests/aqua/examples/onErrorPropagation.aqua
Normal file
39
integration-tests/aqua/examples/onErrorPropagation.aqua
Normal file
@ -0,0 +1,39 @@
|
||||
service Test("test-service"):
|
||||
fail(err: string)
|
||||
|
||||
func onPropagate(peer: string, relay: string) -> u16:
|
||||
res: *u16
|
||||
on peer via relay:
|
||||
res <<- 0 + 1
|
||||
Test.fail("propagated error")
|
||||
res <<- 0 + 2
|
||||
|
||||
join res[3] -- Unreachable
|
||||
|
||||
<- res[3]
|
||||
|
||||
func nestedOnPropagate(peer: string, relay: string, iPeer: string, iRelay: string, friend: string) -> u16:
|
||||
res: *u16
|
||||
on iPeer via iRelay:
|
||||
res <<- 40 + 2
|
||||
on friend:
|
||||
res <<- 2 + 40
|
||||
on peer via relay:
|
||||
Test.fail("propagated error")
|
||||
res <<- 30 + 7
|
||||
|
||||
join res[3] -- Unreachable
|
||||
|
||||
<- res[3]
|
||||
|
||||
func seqOnPropagate(peer: string, relay: string, iPeer: string, iRelay: string) -> u16:
|
||||
res: *u16
|
||||
on iPeer via iRelay:
|
||||
res <<- 40 + 2
|
||||
on peer via relay:
|
||||
Test.fail("propagated error")
|
||||
res <<- 30 + 7
|
||||
|
||||
join res[2] -- Unreachable
|
||||
|
||||
<- res[2]
|
@ -4,6 +4,7 @@ import {getObjAssignCall, getObjCall, getObjRelayCall} from "../examples/objectC
|
||||
import {callArrowCall, reproArgsBug426Call} from '../examples/callArrowCall.js';
|
||||
import {dataAliasCall} from '../examples/dataAliasCall.js';
|
||||
import {onCall} from '../examples/onCall.js';
|
||||
import {onPropagateCall, nestedOnPropagateCall, seqOnPropagateCall} from '../examples/onErrorPropagation.js';
|
||||
import {funcCall} from '../examples/funcCall.js';
|
||||
import {registerPrintln} from '../compiled/examples/println.js';
|
||||
import {helloWorldCall} from '../examples/helloWorldCall.js';
|
||||
@ -467,6 +468,27 @@ describe('Testing examples', () => {
|
||||
expect(onCallResult).toEqual(config.externalAddressesRelay1);
|
||||
});
|
||||
|
||||
it('onErrorPropagate.aqua', async () => {
|
||||
let call = onPropagateCall(peer2, relay2.peerId);
|
||||
expect(call).rejects.toMatchObject({
|
||||
message: expect.stringContaining("propagated error")
|
||||
})
|
||||
});
|
||||
|
||||
it('onErrorPropagate.aqua nested', async () => {
|
||||
let call = nestedOnPropagateCall(peer2, relay2.peerId, config.relays[3].peerId, config.relays[4].peerId, config.relays[5].peerId);
|
||||
expect(call).rejects.toMatchObject({
|
||||
message: expect.stringContaining("propagated error")
|
||||
})
|
||||
});
|
||||
|
||||
it('onErrorPropagate.aqua sequential', async () => {
|
||||
let call = seqOnPropagateCall(peer2, relay2.peerId, config.relays[3].peerId, config.relays[4].peerId);
|
||||
expect(call).rejects.toMatchObject({
|
||||
message: expect.stringContaining("propagated error")
|
||||
})
|
||||
});
|
||||
|
||||
it('complex.aqua', async () => {
|
||||
let complexCallResult = await complexCall(selfPeerId, relayPeerId1);
|
||||
expect(complexCallResult).toEqual(['some str', '3', '1', '4', '1', '1', '3', '2', '4', '2', '2', selfPeerId]);
|
||||
|
43
integration-tests/src/examples/onErrorPropagation.ts
Normal file
43
integration-tests/src/examples/onErrorPropagation.ts
Normal file
@ -0,0 +1,43 @@
|
||||
import {IFluenceClient} from '@fluencelabs/js-client.api';
|
||||
import {registerTest, onPropagate, nestedOnPropagate, seqOnPropagate} from "../compiled/examples/onErrorPropagation.js"
|
||||
|
||||
export async function onPropagateCall(peer2: IFluenceClient, relay2: string): Promise<number> {
|
||||
registerTest(peer2, {
|
||||
fail(err, callParams) {
|
||||
return Promise.reject(err);
|
||||
},
|
||||
})
|
||||
|
||||
return onPropagate(peer2.getPeerId(), relay2)
|
||||
}
|
||||
|
||||
export async function nestedOnPropagateCall(
|
||||
peer2: IFluenceClient,
|
||||
relay2: string,
|
||||
iPeer: string,
|
||||
iRelay: string,
|
||||
friend: string
|
||||
): Promise<number> {
|
||||
registerTest(peer2, {
|
||||
fail(err, callParams) {
|
||||
return Promise.reject(err);
|
||||
},
|
||||
})
|
||||
|
||||
return nestedOnPropagate(peer2.getPeerId(), relay2, iPeer, iRelay, friend)
|
||||
}
|
||||
|
||||
export async function seqOnPropagateCall(
|
||||
peer2: IFluenceClient,
|
||||
relay2: string,
|
||||
iPeer: string,
|
||||
iRelay: string
|
||||
): Promise<number> {
|
||||
registerTest(peer2, {
|
||||
fail(err, callParams) {
|
||||
return Promise.reject(err);
|
||||
},
|
||||
})
|
||||
|
||||
return seqOnPropagate(peer2.getPeerId(), relay2, iPeer, iRelay)
|
||||
}
|
@ -17,6 +17,7 @@ import cats.syntax.option.*
|
||||
import cats.instances.list.*
|
||||
import cats.data.{Chain, State, StateT}
|
||||
import cats.syntax.show.*
|
||||
import cats.syntax.bifunctor.*
|
||||
import scribe.{log, Logging}
|
||||
import aqua.model.inline.Inline.parDesugarPrefixOpt
|
||||
|
||||
@ -186,11 +187,19 @@ object TagInliner extends Logging {
|
||||
flat(vm, tree, true)
|
||||
}
|
||||
(pid, pif) = peerIdDe
|
||||
viaD = Chain.fromSeq(viaDeFlattened.map(_._1))
|
||||
viaF = viaDeFlattened.flatMap(_._2)
|
||||
|
||||
} yield TagInlined.Single(
|
||||
model = OnModel(pid, viaD),
|
||||
(viaD, viaF) = viaDeFlattened.unzip
|
||||
.bimap(Chain.fromSeq, _.flatten)
|
||||
toModel = (children: Chain[OpModel.Tree]) =>
|
||||
XorModel.wrap(
|
||||
OnModel(pid, viaD).wrap(
|
||||
children
|
||||
),
|
||||
// This will return to previous topology
|
||||
// and propagate error up
|
||||
FailModel(ValueModel.lastError).leaf
|
||||
)
|
||||
} yield TagInlined.Mapping(
|
||||
toModel = toModel,
|
||||
prefix = parDesugarPrefix(viaF.prependedAll(pif))
|
||||
)
|
||||
|
||||
|
@ -42,20 +42,21 @@ object ConstantRaw {
|
||||
val lastError: ConstantRaw =
|
||||
ConstantRaw(
|
||||
"LAST_ERROR",
|
||||
ValueRaw.LastError,
|
||||
ValueRaw.lastError,
|
||||
false
|
||||
)
|
||||
|
||||
|
||||
// Host peer id holds %init_peer_id% in case Aqua is not compiled to be executed behind a relay,
|
||||
// or relay's variable otherwise
|
||||
def hostPeerId(relayVarName: Option[String]): ConstantRaw =
|
||||
ConstantRaw(
|
||||
"HOST_PEER_ID",
|
||||
relayVarName.fold[ValueRaw](ValueRaw.InitPeerId)(r => VarRaw(r, ScalarType.string)),
|
||||
false
|
||||
)
|
||||
|
||||
ConstantRaw(
|
||||
"HOST_PEER_ID",
|
||||
relayVarName.fold[ValueRaw](ValueRaw.InitPeerId)(r => VarRaw(r, ScalarType.string)),
|
||||
false
|
||||
)
|
||||
|
||||
def defaultConstants(relayVarName: Option[String]): List[ConstantRaw] =
|
||||
hostPeerId(relayVarName) :: initPeerId :: particleTtl :: particleTimestamp :: nil :: lastError :: Nil
|
||||
}
|
||||
hostPeerId(
|
||||
relayVarName
|
||||
) :: initPeerId :: particleTtl :: particleTimestamp :: nil :: lastError :: Nil
|
||||
}
|
||||
|
@ -27,21 +27,22 @@ object ValueRaw {
|
||||
|
||||
val Nil: LiteralRaw = LiteralRaw("[]", StreamType(BottomType))
|
||||
|
||||
val LastError: VarRaw = VarRaw(
|
||||
"%last_error%",
|
||||
StructType(
|
||||
"LastError",
|
||||
NonEmptyMap.of(
|
||||
// These two fields are mandatory for all errors
|
||||
"message" -> ScalarType.string,
|
||||
"error_code" -> ScalarType.i64,
|
||||
// These fields are specific to AquaVM's errors only
|
||||
"instruction" -> ScalarType.string,
|
||||
"peer_id" -> ScalarType.string
|
||||
)
|
||||
val lastErrorType = StructType(
|
||||
"LastError",
|
||||
NonEmptyMap.of(
|
||||
// These two fields are mandatory for all errors
|
||||
"message" -> ScalarType.string,
|
||||
"error_code" -> ScalarType.i64,
|
||||
// These fields are specific to AquaVM's errors only
|
||||
"instruction" -> ScalarType.string,
|
||||
"peer_id" -> ScalarType.string
|
||||
)
|
||||
)
|
||||
|
||||
val lastError: VarRaw = VarRaw(
|
||||
"%last_error%",
|
||||
lastErrorType
|
||||
)
|
||||
}
|
||||
|
||||
case class ApplyPropertyRaw(value: ValueRaw, property: PropertyRaw) extends ValueRaw {
|
||||
|
@ -76,6 +76,8 @@ object MakeRes {
|
||||
ApRes(operand, CallModel.Export(assignTo, ArrayType(el))).leaf
|
||||
case FlattenModel(operand, assignTo) =>
|
||||
ApRes(operand, CallModel.Export(assignTo, operand.`type`)).leaf
|
||||
case FailModel(value) =>
|
||||
FailRes(value).leaf
|
||||
case CallServiceModel(serviceId, funcName, CallModel(args, exportTo)) =>
|
||||
CallServiceRes(
|
||||
serviceId,
|
||||
|
@ -54,6 +54,10 @@ case class ApRes(operand: ValueModel, exportTo: CallModel.Export) extends Resolv
|
||||
override def toString: String = s"(ap $operand $exportTo)"
|
||||
}
|
||||
|
||||
case class FailRes(operand: ValueModel) extends ResolvedOp {
|
||||
override def toString: String = s"(fail $operand)"
|
||||
}
|
||||
|
||||
case class CanonRes(operand: ValueModel, peerId: ValueModel, exportTo: CallModel.Export)
|
||||
extends ResolvedOp {
|
||||
override def toString: String = s"(canon $peerId $operand $exportTo)"
|
||||
|
@ -1,13 +1,15 @@
|
||||
package aqua.model
|
||||
|
||||
import aqua.model.OpModel.Tree
|
||||
import aqua.tree.{TreeNode, TreeNodeCompanion}
|
||||
import aqua.types.*
|
||||
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
import cats.Show
|
||||
import cats.Eval
|
||||
import cats.data.NonEmptyList
|
||||
import aqua.tree.{TreeNode, TreeNodeCompanion}
|
||||
import aqua.types.*
|
||||
import cats.syntax.functor.*
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
@ -79,7 +81,15 @@ case object ParModel extends ParGroupModel
|
||||
|
||||
case object DetachModel extends ParGroupModel
|
||||
|
||||
case object XorModel extends GroupOpModel
|
||||
case object XorModel extends GroupOpModel {
|
||||
|
||||
// If left branch is empty, return empty
|
||||
override def wrap(children: Chain[Tree]): Tree =
|
||||
children.headOption
|
||||
.filterNot(_.head == EmptyModel)
|
||||
.as(super.wrap(children))
|
||||
.getOrElse(EmptyModel.leaf)
|
||||
}
|
||||
|
||||
case class OnModel(peerId: ValueModel, via: Chain[ValueModel]) extends SeqGroupModel {
|
||||
|
||||
@ -146,6 +156,12 @@ case class FlattenModel(value: ValueModel, assignTo: String) extends OpModel {
|
||||
override def exportsVarNames: Set[String] = Set(assignTo)
|
||||
}
|
||||
|
||||
case class FailModel(value: ValueModel) extends OpModel {
|
||||
override def usesVarNames: Set[String] = value.usesVarNames
|
||||
|
||||
override def exportsVarNames: Set[String] = Set.empty
|
||||
}
|
||||
|
||||
case class PushToStreamModel(value: ValueModel, exportTo: CallModel.Export) extends OpModel {
|
||||
|
||||
override def usesVarNames: Set[String] = value.usesVarNames
|
||||
|
@ -2,8 +2,10 @@ package aqua.model
|
||||
|
||||
import aqua.raw.value.*
|
||||
import aqua.types.*
|
||||
|
||||
import cats.Eq
|
||||
import cats.data.{Chain, NonEmptyMap}
|
||||
import cats.syntax.option.*
|
||||
import scribe.Logging
|
||||
|
||||
sealed trait ValueModel {
|
||||
@ -18,6 +20,19 @@ sealed trait ValueModel {
|
||||
|
||||
object ValueModel {
|
||||
|
||||
def errorCode(error: VarModel): Option[VarModel] =
|
||||
error.intoField("error_code")
|
||||
|
||||
val lastError = VarModel(
|
||||
name = ValueRaw.lastError.name,
|
||||
baseType = ValueRaw.lastError.baseType
|
||||
)
|
||||
|
||||
val lastErrorType = ValueRaw.lastErrorType
|
||||
|
||||
// NOTE: It should be safe as %last_error% should have `error_code` field
|
||||
val lastErrorCode = errorCode(lastError).get
|
||||
|
||||
implicit object ValueModelEq extends Eq[ValueModel] {
|
||||
override def eqv(x: ValueModel, y: ValueModel): Boolean = x == y
|
||||
}
|
||||
@ -46,9 +61,16 @@ case class LiteralModel(value: String, `type`: Type) extends ValueModel {
|
||||
}
|
||||
|
||||
object LiteralModel {
|
||||
|
||||
// AquaVM will return empty string for
|
||||
// %last_error%.$.error_code if there is no %last_error%
|
||||
val emptyErrorCode = quote("")
|
||||
|
||||
def fromRaw(raw: LiteralRaw): LiteralModel = LiteralModel(raw.value, raw.baseType)
|
||||
|
||||
def quote(str: String): LiteralModel = LiteralModel(s"\"$str\"", LiteralType.string)
|
||||
|
||||
def number(n: Int): LiteralModel = LiteralModel(n.toString, LiteralType.number)
|
||||
}
|
||||
|
||||
sealed trait PropertyModel {
|
||||
@ -117,6 +139,17 @@ case class VarModel(name: String, baseType: Type, properties: Chain[PropertyMode
|
||||
private def deriveFrom(vm: VarModel): VarModel =
|
||||
vm.copy(properties = vm.properties ++ properties)
|
||||
|
||||
def intoField(field: String): Option[VarModel] = `type` match {
|
||||
case StructType(_, fields) =>
|
||||
fields(field)
|
||||
.map(fieldType =>
|
||||
copy(
|
||||
properties = properties :+ IntoFieldModel(field, fieldType)
|
||||
)
|
||||
)
|
||||
case _ => none
|
||||
}
|
||||
|
||||
override def resolveWith(vals: Map[String, ValueModel]): ValueModel =
|
||||
vals.get(name) match {
|
||||
case Some(vv: VarModel) =>
|
||||
|
@ -19,6 +19,7 @@ import cats.free.Cofree
|
||||
import cats.syntax.option.*
|
||||
import scribe.Logging
|
||||
import aqua.model.transform.TransformConfig.TracingConfig
|
||||
import aqua.model.transform.pre.{CallbackErrorHandler, ErrorHandler}
|
||||
|
||||
// API for transforming RawTag to Res
|
||||
object Transform extends Logging {
|
||||
@ -82,11 +83,30 @@ object Transform extends Logging {
|
||||
goThrough = Chain.fromOption(relayVar)
|
||||
)
|
||||
|
||||
val errorsCatcher = ErrorsCatcher(
|
||||
enabled = conf.wrapWithXor,
|
||||
serviceId = conf.errorHandlingCallback,
|
||||
funcName = conf.errorFuncName,
|
||||
callable = initCallable
|
||||
val argsProvider: ArgsProvider = ArgsFromService(
|
||||
dataServiceId = conf.dataSrvId
|
||||
)
|
||||
|
||||
val resultsHandler: ResultsHandler = CallbackResultsHandler(
|
||||
callbackSrvId = conf.callbackSrvId,
|
||||
funcName = conf.respFuncName
|
||||
)
|
||||
|
||||
val errorHandler: ErrorHandler = CallbackErrorHandler(
|
||||
serviceId = conf.errorHandlingSrvId,
|
||||
funcName = conf.errorFuncName
|
||||
)
|
||||
|
||||
// Callback on the init peer id, either done via relay or not
|
||||
val callback = initCallable.service(conf.callbackSrvId)
|
||||
|
||||
// preTransformer is applied before function is inlined
|
||||
val preTransformer = FuncPreTransformer(
|
||||
argsProvider,
|
||||
resultsHandler,
|
||||
errorHandler,
|
||||
callback,
|
||||
conf.relayVarName
|
||||
)
|
||||
|
||||
val tracing = Tracing(
|
||||
@ -94,31 +114,16 @@ object Transform extends Logging {
|
||||
initCallable = initCallable
|
||||
)
|
||||
|
||||
val argsProvider: ArgsProvider = ArgsFromService(
|
||||
dataServiceId = conf.dataSrvId,
|
||||
names = relayVar.toList ::: func.arrowType.domain.labelledData
|
||||
)
|
||||
|
||||
// Transform the body of the function: wrap it with initCallable, provide function arguments via service calls
|
||||
val transform: RawTag.Tree => RawTag.Tree =
|
||||
argsProvider.transform andThen initCallable.transform
|
||||
|
||||
// Callback on the init peer id, either done via relay or not
|
||||
val callback = initCallable.service(conf.callbackSrvId)
|
||||
|
||||
// preTransformer is applied before function is inlined
|
||||
val preTransformer = FuncPreTransformer(
|
||||
transform,
|
||||
callback,
|
||||
conf.respFuncName
|
||||
)
|
||||
|
||||
for {
|
||||
// Pre transform and inline the function
|
||||
model <- funcToModelTree(func, preTransformer)
|
||||
// Post transform the function
|
||||
errorsModel = errorsCatcher.transform(model)
|
||||
tracingModel <- tracing(errorsModel)
|
||||
// Post transform the function.
|
||||
// We should wrap `model` with `onInitPeer` here
|
||||
// so that TagInliner would not wrap it with `xor`.
|
||||
// Topology module needs this `on`
|
||||
// as a starting point.
|
||||
initModel = initCallable.onInitPeer.wrap(model)
|
||||
tracingModel <- tracing(initModel)
|
||||
// Resolve topology
|
||||
resolved <- Topology.resolve(tracingModel)
|
||||
// Clear the tree
|
||||
|
@ -15,17 +15,14 @@ case class TransformConfig(
|
||||
errorFuncName: String = "error",
|
||||
respFuncName: String = "response",
|
||||
relayVarName: Option[String] = Some("-relay-"),
|
||||
wrapWithXor: Boolean = true,
|
||||
tracing: Option[TransformConfig.TracingConfig] = None,
|
||||
constants: List[ConstantRaw] = Nil
|
||||
) {
|
||||
|
||||
import LiteralRaw.quote
|
||||
|
||||
val errorId: ValueRaw = quote(errorFuncName)
|
||||
val errorHandlingCallback: ValueModel = LiteralModel fromRaw quote(errorHandlingService)
|
||||
val callbackSrvId: ValueRaw = quote(callbackService)
|
||||
val dataSrvId: ValueRaw = quote(getDataService)
|
||||
val errorId: ValueRaw = LiteralRaw.quote(errorFuncName)
|
||||
val errorHandlingSrvId: ValueRaw = LiteralRaw.quote(errorHandlingService)
|
||||
val callbackSrvId: ValueRaw = LiteralRaw.quote(callbackService)
|
||||
val dataSrvId: ValueRaw = LiteralRaw.quote(getDataService)
|
||||
|
||||
val constantsList: List[ConstantRaw] =
|
||||
ConstantRaw.defaultConstants(relayVarName) ::: constants
|
||||
|
@ -1,77 +0,0 @@
|
||||
package aqua.model.transform.funcop
|
||||
|
||||
import aqua.model.transform.pre.InitPeerCallable
|
||||
import aqua.model.{
|
||||
CallModel,
|
||||
CallServiceModel,
|
||||
ForceExecModel,
|
||||
LiteralModel,
|
||||
MatchMismatchModel,
|
||||
NoExecModel,
|
||||
OnModel,
|
||||
OpModel,
|
||||
SeqModel,
|
||||
ValueModel,
|
||||
VarModel,
|
||||
XorModel
|
||||
}
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw}
|
||||
import aqua.types.LiteralType
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
|
||||
case class ErrorsCatcher(
|
||||
enabled: Boolean,
|
||||
serviceId: ValueModel,
|
||||
funcName: String,
|
||||
callable: InitPeerCallable
|
||||
) {
|
||||
|
||||
private def hasExec(children: Chain[OpModel.Tree]): Boolean =
|
||||
children.exists {
|
||||
case Cofree(head: ForceExecModel, _) =>
|
||||
true
|
||||
case Cofree(_, tail) =>
|
||||
hasExec(tail.value)
|
||||
}
|
||||
|
||||
def transform(op: OpModel.Tree): OpModel.Tree =
|
||||
if (enabled) {
|
||||
var i = 0
|
||||
Cofree
|
||||
.cata[Chain, OpModel, OpModel.Tree](op) {
|
||||
case (ot @ (OnModel(_, _) | MatchMismatchModel(_, _, _)), children)
|
||||
if hasExec(children) =>
|
||||
i = i + 1
|
||||
Eval now ot.wrap(
|
||||
XorModel.wrap(
|
||||
SeqModel.wrap(children.toList: _*),
|
||||
callable.onInitPeer.wrap(
|
||||
CallServiceModel(
|
||||
serviceId,
|
||||
funcName,
|
||||
ErrorsCatcher.lastErrorCall(i)
|
||||
).leaf
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
case (tag, children) =>
|
||||
Eval.now(Cofree(tag, Eval.now(children)))
|
||||
}
|
||||
.value
|
||||
} else op
|
||||
|
||||
}
|
||||
|
||||
object ErrorsCatcher {
|
||||
|
||||
val lastErrorArg: ValueModel =
|
||||
VarModel(ValueRaw.LastError.name, ValueRaw.LastError.baseType, Chain.empty)
|
||||
|
||||
def lastErrorCall(i: Int): CallModel = CallModel(
|
||||
lastErrorArg :: LiteralModel.fromRaw(LiteralRaw.number(i)) :: Nil,
|
||||
Nil
|
||||
)
|
||||
}
|
@ -5,10 +5,11 @@ import aqua.raw.value.{ValueRaw, VarRaw}
|
||||
import aqua.types.{ArrayType, DataType, StreamType}
|
||||
import cats.data.Chain
|
||||
|
||||
trait ArgsProvider extends PreTransform
|
||||
trait ArgsProvider {
|
||||
def provideArgs(args: List[(String, DataType)]): List[RawTag.Tree]
|
||||
}
|
||||
|
||||
case class ArgsFromService(dataServiceId: ValueRaw, names: List[(String, DataType)])
|
||||
extends ArgsProvider {
|
||||
case class ArgsFromService(dataServiceId: ValueRaw) extends ArgsProvider {
|
||||
|
||||
private def getStreamDataOp(name: String, t: StreamType): RawTag.Tree = {
|
||||
val iter = s"$name-iter"
|
||||
@ -44,9 +45,7 @@ case class ArgsFromService(dataServiceId: ValueRaw, names: List[(String, DataTyp
|
||||
.leaf
|
||||
}
|
||||
|
||||
def transform(op: RawTag.Tree): RawTag.Tree =
|
||||
SeqTag.wrap(
|
||||
names.map((getDataOp _).tupled) :+ op: _*
|
||||
)
|
||||
override def provideArgs(args: List[(String, DataType)]): List[RawTag.Tree] =
|
||||
args.map(getDataOp.tupled)
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,28 @@
|
||||
package aqua.model.transform.pre
|
||||
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw}
|
||||
import aqua.raw.ops.{Call, CallArrowRawTag, RawTag}
|
||||
import aqua.types.LiteralType
|
||||
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
|
||||
trait ErrorHandler {
|
||||
def handleLastError: RawTag.Tree
|
||||
}
|
||||
|
||||
case class CallbackErrorHandler(
|
||||
serviceId: ValueRaw,
|
||||
funcName: String
|
||||
) extends ErrorHandler {
|
||||
|
||||
override def handleLastError: RawTag.Tree = {
|
||||
val call = Call(
|
||||
args = ValueRaw.lastError :: LiteralRaw.number(0) :: Nil,
|
||||
exportTo = Nil
|
||||
)
|
||||
|
||||
CallArrowRawTag.service(serviceId, funcName, call).leaf
|
||||
}
|
||||
}
|
@ -2,36 +2,27 @@ package aqua.model.transform.pre
|
||||
|
||||
import aqua.model.FuncArrow
|
||||
import aqua.model.ArgsCall
|
||||
import aqua.raw.ops.{Call, CallArrowRawTag, RawTag, SeqTag}
|
||||
import aqua.raw.ops.{Call, CallArrowRawTag, RawTag, SeqTag, TryTag}
|
||||
import aqua.raw.value.{ValueRaw, VarRaw}
|
||||
import aqua.types.*
|
||||
|
||||
import cats.syntax.show.*
|
||||
import cats.syntax.option.*
|
||||
|
||||
// TODO: doc
|
||||
case class FuncPreTransformer(
|
||||
transform: RawTag.Tree => RawTag.Tree,
|
||||
argsProvider: ArgsProvider,
|
||||
resultsHandler: ResultsHandler,
|
||||
errorHandler: ErrorHandler,
|
||||
callback: (String, Call) => RawTag.Tree,
|
||||
respFuncName: String,
|
||||
relayVarName: Option[String],
|
||||
wrapCallableName: String = "funcAround",
|
||||
arrowCallbackPrefix: String = "init_peer_callable_"
|
||||
) {
|
||||
|
||||
private val returnVar: String = "-return-"
|
||||
|
||||
/**
|
||||
* Wraps return values of a function to a call on itin peer's side
|
||||
*
|
||||
* @param retModel List of returned values
|
||||
* @return AST that consumes return values, passing them to the client
|
||||
*/
|
||||
private def returnCallback(retModel: List[ValueRaw]): RawTag.Tree =
|
||||
callback(
|
||||
respFuncName,
|
||||
Call(
|
||||
retModel,
|
||||
Nil
|
||||
)
|
||||
)
|
||||
private val relayVar = relayVarName.map(_ -> ScalarType.string)
|
||||
|
||||
/**
|
||||
* Convert an arrow-type argument to init user's callback
|
||||
@ -68,23 +59,35 @@ case class FuncPreTransformer(
|
||||
case t => t
|
||||
}).toLabelledList(returnVar)
|
||||
|
||||
val retModel = returnType.map { case (l, t) => VarRaw(l, t) }
|
||||
|
||||
val funcCall = Call(
|
||||
func.arrowType.domain.toLabelledList().map(ad => VarRaw(ad._1, ad._2)),
|
||||
returnType.map { case (l, t) => Call.Export(l, t) }
|
||||
)
|
||||
|
||||
val provideArgs = argsProvider.provideArgs(
|
||||
relayVar.toList ::: func.arrowType.domain.labelledData
|
||||
)
|
||||
|
||||
val handleResults = resultsHandler.handleResults(
|
||||
returnType
|
||||
)
|
||||
|
||||
val handleError = errorHandler.handleLastError
|
||||
|
||||
val call = CallArrowRawTag.func(func.funcName, funcCall).leaf
|
||||
|
||||
val body = SeqTag.wrap(
|
||||
provideArgs ++ List(
|
||||
TryTag.wrap(
|
||||
call,
|
||||
handleError
|
||||
)
|
||||
) ++ handleResults
|
||||
)
|
||||
|
||||
FuncArrow(
|
||||
wrapCallableName,
|
||||
transform(
|
||||
SeqTag.wrap(
|
||||
CallArrowRawTag.func(func.funcName, funcCall).leaf ::
|
||||
returnType.headOption
|
||||
.map(_ => returnCallback(retModel))
|
||||
.toList: _*
|
||||
)
|
||||
),
|
||||
body,
|
||||
ArrowType(ConsType.cons(func.funcName, func.arrowType, NilType), NilType),
|
||||
Nil,
|
||||
func.arrowType.domain
|
||||
|
@ -0,0 +1,27 @@
|
||||
package aqua.model.transform.pre
|
||||
|
||||
import aqua.types.Type
|
||||
import aqua.raw.ops.{Call, CallArrowRawTag, RawTag}
|
||||
import aqua.raw.value.{ValueRaw, VarRaw}
|
||||
|
||||
import cats.syntax.option.*
|
||||
|
||||
trait ResultsHandler {
|
||||
def handleResults(results: List[(String, Type)]): Option[RawTag.Tree]
|
||||
}
|
||||
|
||||
case class CallbackResultsHandler(callbackSrvId: ValueRaw, funcName: String)
|
||||
extends ResultsHandler {
|
||||
|
||||
override def handleResults(results: List[(String, Type)]): Option[RawTag.Tree] =
|
||||
if (results.isEmpty) none
|
||||
else {
|
||||
val resultVars = results.map(VarRaw.apply.tupled)
|
||||
val call = Call(
|
||||
args = resultVars,
|
||||
exportTo = Nil
|
||||
)
|
||||
|
||||
CallArrowRawTag.service(callbackSrvId, funcName, call).leaf.some
|
||||
}
|
||||
}
|
@ -187,43 +187,58 @@ object Topology extends Logging {
|
||||
.filterNot(lastPeerId => before.headOption.exists(_.peerId == lastPeerId))
|
||||
)
|
||||
|
||||
// Return strategy for calculating `beforeOn` for
|
||||
// node pointed on by `cursor`
|
||||
private def decideBefore(cursor: OpModelTreeCursor): Before =
|
||||
cursor.parentOp match {
|
||||
case Some(XorModel) => XorBranch
|
||||
case Some(_: SeqGroupModel) => SeqGroupBranch
|
||||
case None => Root
|
||||
case _ => Default
|
||||
}
|
||||
|
||||
// Return strategy for calculating `beginsOn` for
|
||||
// node pointed on by `cursor`
|
||||
private def decideBegins(cursor: OpModelTreeCursor): Begins =
|
||||
(cursor.parentOp, cursor.op) match {
|
||||
case (_, _: FailModel) => Fail
|
||||
case (Some(_: SeqGroupModel), _: NextModel) => SeqNext
|
||||
case (_, _: ForModel) => For
|
||||
// No begin optimization for detach
|
||||
case (_, ParModel) => ParGroup
|
||||
case _ => Default
|
||||
}
|
||||
|
||||
// Return strategy for calculating `endsOn` for
|
||||
// node pointed on by `cursor`
|
||||
private def decideEnds(cursor: OpModelTreeCursor): Ends =
|
||||
cursor.op match {
|
||||
case _: SeqGroupModel => SeqGroup
|
||||
case XorModel => XorGroup
|
||||
case _: ParGroupModel => ParGroup
|
||||
case _ if cursor.parentOp.isEmpty => Root
|
||||
case _ => Default
|
||||
}
|
||||
|
||||
// Return strategy for calculating `afterOn` for
|
||||
// node pointed on by `cursor`
|
||||
private def decideAfter(cursor: OpModelTreeCursor): After =
|
||||
(cursor.parentOp, cursor.op) match {
|
||||
case (_, _: FailModel) => Fail
|
||||
case (Some(_: ParGroupModel), _) => ParGroupBranch
|
||||
case (Some(XorModel), _) => XorBranch
|
||||
case (Some(_: SeqGroupModel), _) => SeqGroupBranch
|
||||
case (None, _) => Root
|
||||
case _ => Default
|
||||
}
|
||||
|
||||
def make(cursor: OpModelTreeCursor): Topology =
|
||||
Topology(
|
||||
cursor,
|
||||
// Before
|
||||
cursor.parentOp match {
|
||||
case Some(XorModel) => XorBranch
|
||||
case Some(_: SeqGroupModel) => SeqGroupBranch
|
||||
case None => Root
|
||||
case _ => Default
|
||||
},
|
||||
// Begin
|
||||
(cursor.parentOp, cursor.op) match {
|
||||
case (Some(_: SeqGroupModel), _: NextModel) =>
|
||||
SeqNext
|
||||
case (_, _: ForModel) =>
|
||||
For
|
||||
case (_, ParModel) => // No begin optimization for detach
|
||||
ParGroup
|
||||
case _ =>
|
||||
Default
|
||||
},
|
||||
// End
|
||||
cursor.op match {
|
||||
case _: SeqGroupModel => SeqGroup
|
||||
case XorModel => XorGroup
|
||||
case _: ParGroupModel => ParGroup
|
||||
case _ if cursor.parentOp.isEmpty => Root
|
||||
case _ => Default
|
||||
},
|
||||
// After
|
||||
cursor.parentOp match {
|
||||
case Some(_: ParGroupModel) => ParGroupBranch
|
||||
case Some(XorModel) => XorBranch
|
||||
case Some(_: SeqGroupModel) => SeqGroupBranch
|
||||
case None => Root
|
||||
case _ => Default
|
||||
}
|
||||
cursor = cursor,
|
||||
before = decideBefore(cursor),
|
||||
begins = decideBegins(cursor),
|
||||
ends = decideEnds(cursor),
|
||||
after = decideAfter(cursor)
|
||||
)
|
||||
|
||||
def resolve(op: OpModel.Tree, debug: Boolean = false): Eval[Res] =
|
||||
|
@ -6,21 +6,24 @@ import aqua.model.{OnModel, ValueModel}
|
||||
import cats.Eval
|
||||
import cats.data.Chain
|
||||
import cats.syntax.apply.*
|
||||
import cats.syntax.functor.*
|
||||
import cats.syntax.monad.*
|
||||
import cats.instances.tuple.*
|
||||
|
||||
trait Begins {
|
||||
|
||||
def beginsOn(current: Topology): Eval[List[OnModel]] = current.pathOn
|
||||
|
||||
def pathBefore(current: Topology): Eval[Chain[ValueModel]] =
|
||||
(current.beforeOn, current.beginsOn).mapN { case (bef, beg) =>
|
||||
(PathFinder.findPath(bef, beg), bef, beg)
|
||||
}.flatMap { case (pb, bef, beg) =>
|
||||
// Handle the case when we need to go through the relay, but miss the hop as it's the first
|
||||
// peer where we go, but there's no service calls there
|
||||
current.firstExecutesOn.map {
|
||||
case Some(where) if where != beg =>
|
||||
pb ++ Topology.findRelayPathEnforcement(bef, beg)
|
||||
case _ => pb
|
||||
(current.beforeOn, current.beginsOn).tupled
|
||||
.fproduct(PathFinder.findPath.tupled)
|
||||
.flatMap { case ((bef, beg), path) =>
|
||||
// Handle the case when we need to go through the relay, but miss the hop as it's the first
|
||||
// peer where we go, but there's no service calls there
|
||||
current.firstExecutesOn.map {
|
||||
case Some(where) if where != beg =>
|
||||
path ++ Topology.findRelayPathEnforcement(bef, beg)
|
||||
case _ => path
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,33 @@
|
||||
package aqua.model.transform.topology.strategy
|
||||
|
||||
import aqua.model.transform.topology.Topology
|
||||
import aqua.model.ValueModel
|
||||
|
||||
import aqua.model.{OnModel, XorModel}
|
||||
|
||||
import cats.data.Chain
|
||||
import cats.Eval
|
||||
import cats.syntax.apply.*
|
||||
import cats.syntax.functor.*
|
||||
import cats.syntax.traverse.*
|
||||
import cats.syntax.option.*
|
||||
import cats.syntax.applicative.*
|
||||
|
||||
object Fail extends Begins with After {
|
||||
|
||||
// override just to be explicit
|
||||
override def forceExit(current: Topology): Eval[Boolean] =
|
||||
Eval.now(false) // There is no need to insert hops after `fail`
|
||||
|
||||
override def pathBefore(current: Topology): Eval[Chain[ValueModel]] =
|
||||
for {
|
||||
path <- super.pathBefore(current)
|
||||
begins <- current.beginsOn
|
||||
// Get last hop to final peer
|
||||
// if it is not in the path
|
||||
// TODO: Add option to enforce last hop to [[PathFinder]]
|
||||
hop = begins.headOption
|
||||
.map(_.peerId)
|
||||
.filterNot(peer => path.lastOption.contains(peer) || path.isEmpty)
|
||||
} yield path ++ Chain.fromOption(hop)
|
||||
}
|
@ -14,7 +14,7 @@ object XorBranch extends Before with After {
|
||||
override def toString: String = Console.RED + "<xor>/*" + Console.RESET
|
||||
|
||||
override def beforeOn(current: Topology): Eval[List[OnModel]] =
|
||||
current.prevSibling.map(_.endsOn) getOrElse super.beforeOn(current)
|
||||
current.prevSibling.map(_.beginsOn) getOrElse super.beforeOn(current)
|
||||
|
||||
// Find closest par exit up and return its branch current is in
|
||||
// Returns none if there is no par up
|
||||
|
@ -1,31 +1,21 @@
|
||||
package aqua.model.transform
|
||||
|
||||
import aqua.model.{
|
||||
CallModel,
|
||||
CallServiceModel,
|
||||
DetachModel,
|
||||
ForModel,
|
||||
LiteralModel,
|
||||
NextModel,
|
||||
OpModel,
|
||||
ParModel,
|
||||
SeqModel,
|
||||
ValueModel,
|
||||
VarModel
|
||||
}
|
||||
import aqua.model.transform.funcop.ErrorsCatcher
|
||||
import aqua.model.*
|
||||
import aqua.raw.ops.Call
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw}
|
||||
import aqua.{model, res}
|
||||
import aqua.res.{CallRes, CallServiceRes, MakeRes}
|
||||
import aqua.types.{ArrayType, LiteralType, ScalarType}
|
||||
|
||||
import scala.language.implicitConversions
|
||||
import aqua.types.StreamType
|
||||
import aqua.model.IntoIndexModel
|
||||
import aqua.model.inline.raw.ApplyGateRawInliner
|
||||
|
||||
import scala.language.implicitConversions
|
||||
import cats.data.Chain
|
||||
import cats.data.Chain.==:
|
||||
import aqua.model.inline.raw.ApplyGateRawInliner
|
||||
import aqua.model.OnModel
|
||||
import aqua.model.FailModel
|
||||
import aqua.res.ResolvedOp
|
||||
|
||||
object ModelBuilder {
|
||||
implicit def rawToValue(raw: ValueRaw): ValueModel = ValueModel.fromRaw(raw)
|
||||
@ -40,10 +30,13 @@ object ModelBuilder {
|
||||
|
||||
val otherPeer = VarRaw("other-peer", ScalarType.string)
|
||||
|
||||
val otherPeerL = LiteralRaw("\"other-peer\"", LiteralType.string)
|
||||
val otherRelay = LiteralRaw("other-relay", ScalarType.string)
|
||||
val otherPeer2 = LiteralRaw("other-peer-2", ScalarType.string)
|
||||
val otherRelay2 = LiteralRaw("other-relay-2", ScalarType.string)
|
||||
def otherPeerN(n: Int) = LiteralRaw.quote(s"other-peer-$n")
|
||||
def otherRelayN(n: Int) = LiteralRaw.quote(s"other-relay-$n")
|
||||
|
||||
val otherPeerL = LiteralRaw.quote("other-peer")
|
||||
val otherRelay = LiteralRaw.quote("other-relay")
|
||||
val otherPeer2 = otherPeerN(2)
|
||||
val otherRelay2 = otherRelayN(2)
|
||||
val iRelay = VarRaw("i", ScalarType.string)
|
||||
val varNode = VarRaw("node-id", ScalarType.string)
|
||||
val viaList = VarRaw("other-relay-2", ArrayType(ScalarType.string))
|
||||
@ -84,10 +77,10 @@ object ModelBuilder {
|
||||
def errorCall(bc: TransformConfig, i: Int, on: ValueModel = initPeer) =
|
||||
res
|
||||
.CallServiceRes(
|
||||
bc.errorHandlingCallback,
|
||||
ValueModel.fromRaw(bc.errorHandlingSrvId),
|
||||
bc.errorFuncName,
|
||||
CallRes(
|
||||
ErrorsCatcher.lastErrorArg :: LiteralModel(
|
||||
ValueModel.lastError :: LiteralModel(
|
||||
i.toString,
|
||||
LiteralType.number
|
||||
) :: Nil,
|
||||
@ -117,6 +110,22 @@ object ModelBuilder {
|
||||
)
|
||||
.leaf
|
||||
|
||||
val failLastErrorModel = FailModel(ValueModel.lastError).leaf
|
||||
|
||||
val failLastErrorRes = res.FailRes(ValueModel.lastError).leaf
|
||||
|
||||
def onRethrowModel(
|
||||
peer: ValueModel,
|
||||
via: ValueModel*
|
||||
): OpModel.Tree => OpModel.Tree =
|
||||
child =>
|
||||
XorModel.wrap(
|
||||
OnModel(peer, Chain.fromSeq(via)).wrap(
|
||||
child
|
||||
),
|
||||
failLastErrorModel
|
||||
)
|
||||
|
||||
def fold(item: String, iter: ValueRaw, mode: Option[ForModel.Mode], body: OpModel.Tree*) = {
|
||||
val ops = SeqModel.wrap(body: _*)
|
||||
ForModel(item, ValueModel.fromRaw(iter), mode).wrap(ops, NextModel(item).leaf)
|
||||
@ -130,7 +139,7 @@ object ModelBuilder {
|
||||
)
|
||||
}
|
||||
|
||||
def through(peer: ValueModel) =
|
||||
def through(peer: ValueModel): ResolvedOp.Tree =
|
||||
MakeRes.hop(peer)
|
||||
|
||||
/**
|
||||
|
@ -6,11 +6,13 @@ import aqua.model.{CallModel, FuncArrow, LiteralModel, VarModel}
|
||||
import aqua.raw.ops.{Call, CallArrowRawTag, FuncOp, OnTag, RawTag, SeqTag}
|
||||
import aqua.raw.value.{LiteralRaw, VarRaw}
|
||||
import aqua.types.{ArrowType, NilType, ProductType, ScalarType}
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw}
|
||||
import aqua.res.{CallRes, CallServiceRes, MakeRes, SeqRes, XorRes}
|
||||
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import cats.data.Chain
|
||||
import cats.syntax.show.*
|
||||
|
||||
class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
@ -47,13 +49,13 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
val procFC = fc.value.body
|
||||
|
||||
val expectedFC =
|
||||
XorRes.wrap(
|
||||
SeqRes.wrap(
|
||||
dataCall(bc, "-relay-", initPeer),
|
||||
through(relayV),
|
||||
through(otherRelay),
|
||||
SeqRes.wrap(
|
||||
dataCall(bc, "-relay-", initPeer),
|
||||
XorRes.wrap(
|
||||
XorRes.wrap(
|
||||
SeqRes.wrap(
|
||||
through(relayV),
|
||||
through(otherRelay),
|
||||
callRes(1, otherPeer),
|
||||
through(otherRelay),
|
||||
through(relayV)
|
||||
@ -61,15 +63,13 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
SeqRes.wrap(
|
||||
through(otherRelay),
|
||||
through(relayV),
|
||||
errorCall(bc, 1, initPeer)
|
||||
through(initPeer),
|
||||
failLastErrorRes
|
||||
)
|
||||
),
|
||||
XorRes.wrap(
|
||||
respCall(bc, ret, initPeer),
|
||||
errorCall(bc, 2, initPeer)
|
||||
)
|
||||
errorCall(bc, 0, initPeer)
|
||||
),
|
||||
errorCall(bc, 3, initPeer)
|
||||
respCall(bc, ret, initPeer)
|
||||
)
|
||||
|
||||
procFC.equalsOrShowDiff(expectedFC) should be(true)
|
||||
@ -90,7 +90,7 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
None
|
||||
)
|
||||
|
||||
val bc = TransformConfig(wrapWithXor = false)
|
||||
val bc = TransformConfig()
|
||||
|
||||
val fc = Transform.funcRes(func, bc)
|
||||
|
||||
@ -99,10 +99,24 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
val expectedFC =
|
||||
SeqRes.wrap(
|
||||
dataCall(bc, "-relay-", initPeer),
|
||||
callRes(0, initPeer),
|
||||
through(relayV),
|
||||
callRes(1, otherPeer),
|
||||
through(relayV),
|
||||
XorRes.wrap(
|
||||
SeqRes.wrap(
|
||||
callRes(0, initPeer),
|
||||
XorRes.wrap(
|
||||
SeqRes.wrap(
|
||||
through(relayV),
|
||||
callRes(1, otherPeer),
|
||||
through(relayV)
|
||||
),
|
||||
SeqRes.wrap(
|
||||
through(relayV),
|
||||
through(initPeer),
|
||||
failLastErrorRes
|
||||
)
|
||||
)
|
||||
),
|
||||
errorCall(bc, 0, initPeer)
|
||||
),
|
||||
respCall(bc, ret, initPeer)
|
||||
)
|
||||
|
||||
@ -124,13 +138,7 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
val f1: FuncArrow =
|
||||
FuncArrow(
|
||||
"f1",
|
||||
CallArrowRawTag
|
||||
.service(
|
||||
LiteralRaw.quote("srv1"),
|
||||
"foo",
|
||||
Call(Nil, Call.Export("v", ScalarType.string) :: Nil)
|
||||
)
|
||||
.leaf,
|
||||
callOp(1).leaf,
|
||||
stringArrow,
|
||||
VarRaw("v", ScalarType.string) :: Nil,
|
||||
Map.empty,
|
||||
@ -151,22 +159,20 @@ class TransformSpec extends AnyFlatSpec with Matchers {
|
||||
None
|
||||
)
|
||||
|
||||
val bc = TransformConfig(wrapWithXor = false)
|
||||
val bc = TransformConfig()
|
||||
|
||||
val res = Transform.funcRes(f2, bc).value.body
|
||||
val procFC = Transform.funcRes(f2, bc).value.body
|
||||
|
||||
res.equalsOrShowDiff(
|
||||
SeqRes.wrap(
|
||||
dataCall(bc, "-relay-", initPeer),
|
||||
CallServiceRes(
|
||||
LiteralRaw.quote("srv1"),
|
||||
"foo",
|
||||
CallRes(Nil, Some(CallModel.Export("v", ScalarType.string))),
|
||||
initPeer
|
||||
).leaf,
|
||||
respCall(bc, VarRaw("v", ScalarType.string), initPeer)
|
||||
)
|
||||
) should be(true)
|
||||
val expectedFC = SeqRes.wrap(
|
||||
dataCall(bc, "-relay-", initPeer),
|
||||
XorRes.wrap(
|
||||
callRes(1, initPeer),
|
||||
errorCall(bc, 0, initPeer)
|
||||
),
|
||||
respCall(bc, VarRaw("v", ScalarType.string), initPeer)
|
||||
)
|
||||
|
||||
procFC.equalsOrShowDiff(expectedFC) should be(true)
|
||||
}
|
||||
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -24,7 +24,7 @@ class CatchSem[S[_]](val expr: CatchExpr[S]) extends AnyVal {
|
||||
.around(
|
||||
N.beginScope(expr.name) >>
|
||||
L.beginScope() >>
|
||||
N.define(expr.name, ValueRaw.LastError.baseType),
|
||||
N.define(expr.name, ValueRaw.lastError.baseType),
|
||||
(_, g: Raw) =>
|
||||
N.endScope() >> L.endScope() as (
|
||||
g match {
|
||||
@ -32,7 +32,7 @@ class CatchSem[S[_]](val expr: CatchExpr[S]) extends AnyVal {
|
||||
TryTag.Catch
|
||||
.wrap(
|
||||
SeqTag.wrap(
|
||||
AssignmentTag(ValueRaw.LastError, expr.name.value).leaf,
|
||||
AssignmentTag(ValueRaw.lastError, expr.name.value).leaf,
|
||||
op
|
||||
)
|
||||
)
|
||||
|
@ -133,7 +133,7 @@ class SemanticsSpec extends AnyFlatSpec with Matchers with Inside {
|
||||
TryTag.wrap(
|
||||
testServiceCallStr("try"),
|
||||
SeqTag.wrap(
|
||||
AssignmentTag(ValueRaw.LastError, "e").leaf,
|
||||
AssignmentTag(ValueRaw.lastError, "e").leaf,
|
||||
testServiceCallStr("catch")
|
||||
)
|
||||
)
|
||||
@ -159,11 +159,11 @@ class SemanticsSpec extends AnyFlatSpec with Matchers with Inside {
|
||||
TryTag.wrap(
|
||||
testServiceCallStr("try"),
|
||||
SeqTag.wrap(
|
||||
AssignmentTag(ValueRaw.LastError, "e").leaf,
|
||||
AssignmentTag(ValueRaw.lastError, "e").leaf,
|
||||
testServiceCallStr("catch1")
|
||||
),
|
||||
SeqTag.wrap(
|
||||
AssignmentTag(ValueRaw.LastError, "e").leaf,
|
||||
AssignmentTag(ValueRaw.lastError, "e").leaf,
|
||||
testServiceCallStr("catch2")
|
||||
)
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user