diff --git a/api/api/src/main/scala/aqua/api/AquaAPIConfig.scala b/api/api/src/main/scala/aqua/api/AquaAPIConfig.scala index 86abed3b..18c949d2 100644 --- a/api/api/src/main/scala/aqua/api/AquaAPIConfig.scala +++ b/api/api/src/main/scala/aqua/api/AquaAPIConfig.scala @@ -8,14 +8,13 @@ case class AquaAPIConfig( targetType: TargetType = TargetType.AirType, logLevel: String = "info", constants: List[String] = Nil, - noXor: Boolean = false, + noXor: Boolean = false, // TODO: Remove noRelay: Boolean = false, tracing: Boolean = false ) { def getTransformConfig: TransformConfig = { val config = TransformConfig( - wrapWithXor = !noXor, tracing = Option.when(tracing)(TransformConfig.TracingConfig.default) ) diff --git a/backend/air/src/main/scala/aqua/backend/air/Air.scala b/backend/air/src/main/scala/aqua/backend/air/Air.scala index b2ed57c7..84052e68 100644 --- a/backend/air/src/main/scala/aqua/backend/air/Air.scala +++ b/backend/air/src/main/scala/aqua/backend/air/Air.scala @@ -24,6 +24,8 @@ object Keyword { case object Ap extends Keyword("ap") + case object Fail extends Keyword("fail") + case object Canon extends Keyword("canon") case object Seq extends Keyword("seq") @@ -110,6 +112,8 @@ object Air { case class Ap(op: DataView, result: String) extends Air(Keyword.Ap) + case class Fail(op: DataView) extends Air(Keyword.Fail) + case class Canon(op: DataView, peerId: DataView, result: String) extends Air(Keyword.Canon) case class Comment(comment: String, air: Air) extends Air(Keyword.NA) @@ -143,6 +147,7 @@ object Air { case Air.Call(triplet, args, res) ⇒ s" ${triplet.show} [${args.map(_.show).mkString(" ")}]${res.fold("")(" " + _)}" case Air.Ap(operand, result) ⇒ s" ${operand.show} $result" + case Air.Fail(operand) => s" ${operand.show}" case Air.Canon(operand, peerId, result) ⇒ s" ${peerId.show} ${operand.show} $result" case Air.Comment(_, _) => ";; Should not be displayed" }) + ")\n" diff --git a/backend/air/src/main/scala/aqua/backend/air/AirGen.scala b/backend/air/src/main/scala/aqua/backend/air/AirGen.scala index 83c065bb..eaca0d74 100644 --- a/backend/air/src/main/scala/aqua/backend/air/AirGen.scala +++ b/backend/air/src/main/scala/aqua/backend/air/AirGen.scala @@ -118,6 +118,11 @@ object AirGen extends Logging { ApGen(valueToData(operand), exportToString(exportTo)) ) + case FailRes(operand) => + Eval.later( + FailGen(valueToData(operand)) + ) + case CanonRes(operand, peerId, exportTo) => Eval.later( CanonGen(valueToData(operand), valueToData(peerId), exportToString(exportTo)) @@ -164,6 +169,12 @@ case class ApGen(operand: DataView, result: String) extends AirGen { Air.Ap(operand, result) } +case class FailGen(operand: DataView) extends AirGen { + + override def generate: Air = + Air.Fail(operand) +} + case class CanonGen(operand: DataView, peerId: DataView, result: String) extends AirGen { override def generate: Air = diff --git a/build.sbt b/build.sbt index 29073bbf..4832615c 100644 --- a/build.sbt +++ b/build.sbt @@ -74,8 +74,8 @@ lazy val cliJS = cli.js .settings( Compile / fastOptJS / artifactPath := baseDirectory.value / "../../cli-npm" / "aqua.js", Compile / fullOptJS / artifactPath := baseDirectory.value / "../../cli-npm" / "aqua.js", - scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.ESModule)), - scalaJSUseMainModuleInitializer := true + scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.ESModule)), + scalaJSUseMainModuleInitializer := true ) .dependsOn(`js-exports`, `js-imports`) @@ -155,9 +155,9 @@ lazy val `aqua-apiJS` = `aqua-api`.js .settings( Compile / fastOptJS / artifactPath := baseDirectory.value / "../../api-npm" / "aqua-api.js", Compile / fullOptJS / artifactPath := baseDirectory.value / "../../api-npm" / "aqua-api.js", - scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.CommonJSModule)), - scalaJSUseMainModuleInitializer := true, - Test / test := {} + scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.CommonJSModule)), + scalaJSUseMainModuleInitializer := true, + Test / test := {} ) .enablePlugins(ScalaJSPlugin) .dependsOn(`js-exports`) @@ -252,7 +252,7 @@ lazy val compiler = crossProject(JVMPlatform, JSPlatform) .crossType(CrossType.Pure) .in(file("compiler")) .settings(commons: _*) - .dependsOn(semantics, linker, backend, transform % Test, res % "test->test") + .dependsOn(semantics, linker, backend, transform % "test->test", res % "test->test") lazy val backend = crossProject(JVMPlatform, JSPlatform) .withoutSuffixFor(JVMPlatform) diff --git a/cli/cli/.js/src/main/scala/aqua/run/RunOpts.scala b/cli/cli/.js/src/main/scala/aqua/run/RunOpts.scala index f255625f..100df48e 100644 --- a/cli/cli/.js/src/main/scala/aqua/run/RunOpts.scala +++ b/cli/cli/.js/src/main/scala/aqua/run/RunOpts.scala @@ -22,7 +22,7 @@ import cats.syntax.applicative.* import cats.syntax.apply.* import cats.syntax.flatMap.* import cats.syntax.functor.* -import cats.{Id, Monad, ~>} +import cats.{~>, Id, Monad} import com.monovore.decline.{Command, Opts} import fs2.io.file.{Files, Path} import scribe.Logging @@ -45,14 +45,15 @@ object RunOpts extends Logging { ): TransformConfig = { val tc = TransformConfig( constants = - onPeer.map(s => ConstantRaw(OnPeerConst, LiteralRaw.quote(s), false)).toList ++ constants, - wrapWithXor = !noXor + onPeer.map(s => ConstantRaw(OnPeerConst, LiteralRaw.quote(s), false)).toList ++ constants ) tc.copy(relayVarName = tc.relayVarName.filterNot(_ => noRelay)) } - def runOptsCompose[F[_]: Files: Concurrent] - : Opts[F[ValidatedNec[String, (Option[AquaPath], List[Path], FuncWithData, Option[NonEmptyList[JsonService]], List[String])]]] = { + def runOptsCompose[F[_]: Files: Concurrent]: Opts[F[ValidatedNec[ + String, + (Option[AquaPath], List[Path], FuncWithData, Option[NonEmptyList[JsonService]], List[String]) + ]]] = { ( AppOpts.wrapWithOption(AppOpts.inputOpts[F]), AppOpts.importOpts[F], @@ -72,8 +73,9 @@ object RunOpts extends Logging { .getOrElse(validNec[String, Option[NonEmptyList[JsonService]]](None).pure[F]) pluginsPathsV <- pluginsOp.getOrElse(validNec[String, List[String]](Nil).pure[F]) } yield { - (inputV, importV, funcWithArgsV, jsonServiceV, pluginsPathsV).mapN { case (i, im, f, j, p) => - (i, im, f, j, p) + (inputV, importV, funcWithArgsV, jsonServiceV, pluginsPathsV).mapN { + case (i, im, f, j, p) => + (i, im, f, j, p) } } } diff --git a/cli/cli/.js/src/main/scala/aqua/script/ScriptOpts.scala b/cli/cli/.js/src/main/scala/aqua/script/ScriptOpts.scala index 8be60b3e..67368dbe 100644 --- a/cli/cli/.js/src/main/scala/aqua/script/ScriptOpts.scala +++ b/cli/cli/.js/src/main/scala/aqua/script/ScriptOpts.scala @@ -17,15 +17,7 @@ import aqua.raw.ops.{Call, CallArrowRawTag} import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw} import aqua.res.{AquaRes, FuncRes} import aqua.run.RunOpts.logger -import aqua.run.{ - CliFunc, - FuncCompiler, - GeneralOptions, - GeneralOpts, - RunCommand, - RunConfig, - RunOpts -} +import aqua.run.{CliFunc, FuncCompiler, GeneralOptions, GeneralOpts, RunCommand, RunConfig, RunOpts} import aqua.types.{ArrowType, LiteralType, NilType, ScalarType} import cats.data.* import cats.data.Validated.{invalid, invalidNec, valid, validNec, validNel} @@ -126,7 +118,7 @@ object ScriptOpts extends Logging { imports: List[Path], funcWithArgs: FuncWithLiteralArgs ): F[ValidatedNec[String, String]] = { - val tConfig = TransformConfig(relayVarName = None, wrapWithXor = false) + val tConfig = TransformConfig(relayVarName = None) val funcCompiler = new FuncCompiler[F]( Option(RelativePath(input)), diff --git a/cli/cli/.jvm/src/main/scala/aqua/Test.scala b/cli/cli/.jvm/src/main/scala/aqua/Test.scala index 38ae5e76..6f894706 100644 --- a/cli/cli/.jvm/src/main/scala/aqua/Test.scala +++ b/cli/cli/.jvm/src/main/scala/aqua/Test.scala @@ -27,7 +27,7 @@ object Test extends IOApp.Simple { List(Path("./aqua")), Option(Path("./target")), TypeScriptBackend(false, "IFluenceClient$$"), - TransformConfig(wrapWithXor = false), + TransformConfig(), false ) .map { diff --git a/cli/cli/src/main/scala/aqua/AquaCli.scala b/cli/cli/src/main/scala/aqua/AquaCli.scala index 37efa8f1..94eb5fdd 100644 --- a/cli/cli/src/main/scala/aqua/AquaCli.scala +++ b/cli/cli/src/main/scala/aqua/AquaCli.scala @@ -94,7 +94,7 @@ object AquaCli extends IOApp with Logging { compileToAir, compileToJs, noRelay, - noXorWrapper, + noXorWrapper, // TODO: Remove tracing, isOldFluenceJs, wrapWithOption(helpOpt), @@ -137,7 +137,6 @@ object AquaCli extends IOApp with Logging { else TypescriptTarget val bc = { val bc = TransformConfig( - wrapWithXor = !noXor, constants = constants, tracing = Option.when(tracingEnabled)(TransformConfig.TracingConfig.default) ) diff --git a/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala b/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala index 5380985d..f6acb013 100644 --- a/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala +++ b/compiler/src/test/scala/aqua/compiler/AquaCompilerSpec.scala @@ -9,6 +9,7 @@ import aqua.model.{ ValueModel, VarModel } +import aqua.model.transform.ModelBuilder import aqua.model.transform.TransformConfig import aqua.model.transform.Transform import aqua.parser.ParserError @@ -42,6 +43,7 @@ import cats.instances.string.* import cats.syntax.show.* class AquaCompilerSpec extends AnyFlatSpec with Matchers { + import ModelBuilder.* private def aquaSource(src: Map[String, String], imports: Map[String, String]) = { new AquaSources[Id, String, String] { @@ -150,8 +152,8 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { ctxs.length should be(1) val ctx = ctxs.headOption.get - val aquaRes = - Transform.contextRes(ctx, TransformConfig(wrapWithXor = false)) + val transformCfg = TransformConfig() + val aquaRes = Transform.contextRes(ctx, transformCfg) val Some(exec) = aquaRes.funcs.find(_.funcName == "exec") @@ -167,47 +169,49 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { SeqRes.wrap( getDataSrv("-relay-", ScalarType.string), getDataSrv(peers.name, peers.`type`), - RestrictionRes(results.name, resultsType).wrap( - SeqRes.wrap( - ParRes.wrap( - FoldRes(peer.name, peers, Some(ForModel.NeverMode)).wrap( - ParRes.wrap( - // better if first relay will be outside `for` - SeqRes.wrap( - through(ValueModel.fromRaw(relay)), - CallServiceRes( - LiteralModel.fromRaw(LiteralRaw.quote("op")), - "identity", - CallRes( - LiteralModel.fromRaw(LiteralRaw.quote("hahahahah")) :: Nil, - Some(CallModel.Export(results.name, results.`type`)) + XorRes.wrap( + RestrictionRes(results.name, resultsType).wrap( + SeqRes.wrap( + ParRes.wrap( + FoldRes(peer.name, peers, Some(ForModel.NeverMode)).wrap( + ParRes.wrap( + XorRes.wrap( + // better if first relay will be outside `for` + SeqRes.wrap( + through(ValueModel.fromRaw(relay)), + CallServiceRes( + LiteralModel.fromRaw(LiteralRaw.quote("op")), + "identity", + CallRes( + LiteralModel.fromRaw(LiteralRaw.quote("hahahahah")) :: Nil, + Some(CallModel.Export(results.name, results.`type`)) + ), + peer + ).leaf, + through(ValueModel.fromRaw(relay)), + through(initPeer) ), - peer - ).leaf, - through(ValueModel.fromRaw(relay)), - through(initPeer) - ), - NextRes(peer.name).leaf + SeqRes.wrap( + through(ValueModel.fromRaw(relay)), + through(initPeer), + failLastErrorRes + ) + ), + NextRes(peer.name).leaf + ) ) - ) - ), - join(results, LiteralModel.fromRaw(LiteralRaw.number(2))), - CanonRes(results, init, CallModel.Export(canonResult.name, canonResult.`type`)).leaf, - ApRes( - canonResult, - CallModel.Export(flatResult.name, flatResult.`type`) - ).leaf - ) - ), - CallServiceRes( - LiteralModel.fromRaw(LiteralRaw.quote("callbackSrv")), - "response", - CallRes( - flatResult :: Nil, - None + ), + join(results, LiteralModel.fromRaw(LiteralRaw.number(2))), + CanonRes(results, init, CallModel.Export(canonResult.name, canonResult.`type`)).leaf, + ApRes( + canonResult, + CallModel.Export(flatResult.name, flatResult.`type`) + ).leaf + ) ), - initPeer - ).leaf + errorCall(transformCfg, 0, initPeer) + ), + respCall(transformCfg, flatResult, initPeer) ) exec.body.equalsOrShowDiff(expected) shouldBe (true) @@ -267,8 +271,8 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { ctxs.length should be(1) val ctx = ctxs.headOption.get - val aquaRes = - Transform.contextRes(ctx, TransformConfig(wrapWithXor = false, relayVarName = None)) + val transformCfg = TransformConfig(relayVarName = None) + val aquaRes = Transform.contextRes(ctx, transformCfg) val Some(funcWrap) = aquaRes.funcs.find(_.funcName == "wrap") val Some(barfoo) = aquaRes.funcs.find(_.funcName == "barfoo") @@ -278,8 +282,8 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { val resCanonVM = VarModel("-res-fix-0", CanonStreamType(ScalarType.string)) val resFlatVM = VarModel("-res-flat-0", ArrayType(ScalarType.string)) - barfoo.body.equalsOrShowDiff( - SeqRes.wrap( + val expected = SeqRes.wrap( + XorRes.wrap( RestrictionRes(resVM.name, resStreamType).wrap( SeqRes.wrap( // res <- foo() @@ -305,14 +309,12 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers { ).leaf ) ), - CallServiceRes( - LiteralModel.fromRaw(LiteralRaw.quote("callbackSrv")), - "response", - CallRes(resFlatVM :: Nil, None), - LiteralModel.fromRaw(ValueRaw.InitPeerId) - ).leaf - ) - ) should be(true) + errorCall(transformCfg, 0, initPeer) + ), + respCall(transformCfg, resFlatVM, initPeer) + ) + + barfoo.body.equalsOrShowDiff(expected) should be(true) } } diff --git a/integration-tests/aqua/examples/onErrorPropagation.aqua b/integration-tests/aqua/examples/onErrorPropagation.aqua new file mode 100644 index 00000000..f0a67a35 --- /dev/null +++ b/integration-tests/aqua/examples/onErrorPropagation.aqua @@ -0,0 +1,39 @@ +service Test("test-service"): + fail(err: string) + +func onPropagate(peer: string, relay: string) -> u16: + res: *u16 + on peer via relay: + res <<- 0 + 1 + Test.fail("propagated error") + res <<- 0 + 2 + + join res[3] -- Unreachable + + <- res[3] + +func nestedOnPropagate(peer: string, relay: string, iPeer: string, iRelay: string, friend: string) -> u16: + res: *u16 + on iPeer via iRelay: + res <<- 40 + 2 + on friend: + res <<- 2 + 40 + on peer via relay: + Test.fail("propagated error") + res <<- 30 + 7 + + join res[3] -- Unreachable + + <- res[3] + +func seqOnPropagate(peer: string, relay: string, iPeer: string, iRelay: string) -> u16: + res: *u16 + on iPeer via iRelay: + res <<- 40 + 2 + on peer via relay: + Test.fail("propagated error") + res <<- 30 + 7 + + join res[2] -- Unreachable + + <- res[2] \ No newline at end of file diff --git a/integration-tests/src/__test__/examples.spec.ts b/integration-tests/src/__test__/examples.spec.ts index 615fc64f..5821bc68 100644 --- a/integration-tests/src/__test__/examples.spec.ts +++ b/integration-tests/src/__test__/examples.spec.ts @@ -4,6 +4,7 @@ import {getObjAssignCall, getObjCall, getObjRelayCall} from "../examples/objectC import {callArrowCall, reproArgsBug426Call} from '../examples/callArrowCall.js'; import {dataAliasCall} from '../examples/dataAliasCall.js'; import {onCall} from '../examples/onCall.js'; +import {onPropagateCall, nestedOnPropagateCall, seqOnPropagateCall} from '../examples/onErrorPropagation.js'; import {funcCall} from '../examples/funcCall.js'; import {registerPrintln} from '../compiled/examples/println.js'; import {helloWorldCall} from '../examples/helloWorldCall.js'; @@ -467,6 +468,27 @@ describe('Testing examples', () => { expect(onCallResult).toEqual(config.externalAddressesRelay1); }); + it('onErrorPropagate.aqua', async () => { + let call = onPropagateCall(peer2, relay2.peerId); + expect(call).rejects.toMatchObject({ + message: expect.stringContaining("propagated error") + }) + }); + + it('onErrorPropagate.aqua nested', async () => { + let call = nestedOnPropagateCall(peer2, relay2.peerId, config.relays[3].peerId, config.relays[4].peerId, config.relays[5].peerId); + expect(call).rejects.toMatchObject({ + message: expect.stringContaining("propagated error") + }) + }); + + it('onErrorPropagate.aqua sequential', async () => { + let call = seqOnPropagateCall(peer2, relay2.peerId, config.relays[3].peerId, config.relays[4].peerId); + expect(call).rejects.toMatchObject({ + message: expect.stringContaining("propagated error") + }) + }); + it('complex.aqua', async () => { let complexCallResult = await complexCall(selfPeerId, relayPeerId1); expect(complexCallResult).toEqual(['some str', '3', '1', '4', '1', '1', '3', '2', '4', '2', '2', selfPeerId]); diff --git a/integration-tests/src/examples/onErrorPropagation.ts b/integration-tests/src/examples/onErrorPropagation.ts new file mode 100644 index 00000000..3f663330 --- /dev/null +++ b/integration-tests/src/examples/onErrorPropagation.ts @@ -0,0 +1,43 @@ +import {IFluenceClient} from '@fluencelabs/js-client.api'; +import {registerTest, onPropagate, nestedOnPropagate, seqOnPropagate} from "../compiled/examples/onErrorPropagation.js" + +export async function onPropagateCall(peer2: IFluenceClient, relay2: string): Promise { + registerTest(peer2, { + fail(err, callParams) { + return Promise.reject(err); + }, + }) + + return onPropagate(peer2.getPeerId(), relay2) +} + +export async function nestedOnPropagateCall( + peer2: IFluenceClient, + relay2: string, + iPeer: string, + iRelay: string, + friend: string +): Promise { + registerTest(peer2, { + fail(err, callParams) { + return Promise.reject(err); + }, + }) + + return nestedOnPropagate(peer2.getPeerId(), relay2, iPeer, iRelay, friend) +} + +export async function seqOnPropagateCall( + peer2: IFluenceClient, + relay2: string, + iPeer: string, + iRelay: string +): Promise { + registerTest(peer2, { + fail(err, callParams) { + return Promise.reject(err); + }, + }) + + return seqOnPropagate(peer2.getPeerId(), relay2, iPeer, iRelay) +} \ No newline at end of file diff --git a/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala b/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala index 20c2bbd5..e8259d76 100644 --- a/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala @@ -17,6 +17,7 @@ import cats.syntax.option.* import cats.instances.list.* import cats.data.{Chain, State, StateT} import cats.syntax.show.* +import cats.syntax.bifunctor.* import scribe.{log, Logging} import aqua.model.inline.Inline.parDesugarPrefixOpt @@ -186,11 +187,19 @@ object TagInliner extends Logging { flat(vm, tree, true) } (pid, pif) = peerIdDe - viaD = Chain.fromSeq(viaDeFlattened.map(_._1)) - viaF = viaDeFlattened.flatMap(_._2) - - } yield TagInlined.Single( - model = OnModel(pid, viaD), + (viaD, viaF) = viaDeFlattened.unzip + .bimap(Chain.fromSeq, _.flatten) + toModel = (children: Chain[OpModel.Tree]) => + XorModel.wrap( + OnModel(pid, viaD).wrap( + children + ), + // This will return to previous topology + // and propagate error up + FailModel(ValueModel.lastError).leaf + ) + } yield TagInlined.Mapping( + toModel = toModel, prefix = parDesugarPrefix(viaF.prependedAll(pif)) ) diff --git a/model/raw/src/main/scala/aqua/raw/ConstantRaw.scala b/model/raw/src/main/scala/aqua/raw/ConstantRaw.scala index 4f59bcd6..46dcc3bc 100644 --- a/model/raw/src/main/scala/aqua/raw/ConstantRaw.scala +++ b/model/raw/src/main/scala/aqua/raw/ConstantRaw.scala @@ -42,20 +42,21 @@ object ConstantRaw { val lastError: ConstantRaw = ConstantRaw( "LAST_ERROR", - ValueRaw.LastError, + ValueRaw.lastError, false ) - // Host peer id holds %init_peer_id% in case Aqua is not compiled to be executed behind a relay, // or relay's variable otherwise def hostPeerId(relayVarName: Option[String]): ConstantRaw = - ConstantRaw( - "HOST_PEER_ID", - relayVarName.fold[ValueRaw](ValueRaw.InitPeerId)(r => VarRaw(r, ScalarType.string)), - false - ) - + ConstantRaw( + "HOST_PEER_ID", + relayVarName.fold[ValueRaw](ValueRaw.InitPeerId)(r => VarRaw(r, ScalarType.string)), + false + ) + def defaultConstants(relayVarName: Option[String]): List[ConstantRaw] = - hostPeerId(relayVarName) :: initPeerId :: particleTtl :: particleTimestamp :: nil :: lastError :: Nil -} \ No newline at end of file + hostPeerId( + relayVarName + ) :: initPeerId :: particleTtl :: particleTimestamp :: nil :: lastError :: Nil +} diff --git a/model/raw/src/main/scala/aqua/raw/value/ValueRaw.scala b/model/raw/src/main/scala/aqua/raw/value/ValueRaw.scala index 693aa218..21690a08 100644 --- a/model/raw/src/main/scala/aqua/raw/value/ValueRaw.scala +++ b/model/raw/src/main/scala/aqua/raw/value/ValueRaw.scala @@ -27,21 +27,22 @@ object ValueRaw { val Nil: LiteralRaw = LiteralRaw("[]", StreamType(BottomType)) - val LastError: VarRaw = VarRaw( - "%last_error%", - StructType( - "LastError", - NonEmptyMap.of( - // These two fields are mandatory for all errors - "message" -> ScalarType.string, - "error_code" -> ScalarType.i64, - // These fields are specific to AquaVM's errors only - "instruction" -> ScalarType.string, - "peer_id" -> ScalarType.string - ) + val lastErrorType = StructType( + "LastError", + NonEmptyMap.of( + // These two fields are mandatory for all errors + "message" -> ScalarType.string, + "error_code" -> ScalarType.i64, + // These fields are specific to AquaVM's errors only + "instruction" -> ScalarType.string, + "peer_id" -> ScalarType.string ) ) + val lastError: VarRaw = VarRaw( + "%last_error%", + lastErrorType + ) } case class ApplyPropertyRaw(value: ValueRaw, property: PropertyRaw) extends ValueRaw { diff --git a/model/res/src/main/scala/aqua/res/MakeRes.scala b/model/res/src/main/scala/aqua/res/MakeRes.scala index 688920ec..9e94063c 100644 --- a/model/res/src/main/scala/aqua/res/MakeRes.scala +++ b/model/res/src/main/scala/aqua/res/MakeRes.scala @@ -76,6 +76,8 @@ object MakeRes { ApRes(operand, CallModel.Export(assignTo, ArrayType(el))).leaf case FlattenModel(operand, assignTo) => ApRes(operand, CallModel.Export(assignTo, operand.`type`)).leaf + case FailModel(value) => + FailRes(value).leaf case CallServiceModel(serviceId, funcName, CallModel(args, exportTo)) => CallServiceRes( serviceId, diff --git a/model/res/src/main/scala/aqua/res/ResolvedOp.scala b/model/res/src/main/scala/aqua/res/ResolvedOp.scala index 070ffaa5..9a9dc612 100644 --- a/model/res/src/main/scala/aqua/res/ResolvedOp.scala +++ b/model/res/src/main/scala/aqua/res/ResolvedOp.scala @@ -54,6 +54,10 @@ case class ApRes(operand: ValueModel, exportTo: CallModel.Export) extends Resolv override def toString: String = s"(ap $operand $exportTo)" } +case class FailRes(operand: ValueModel) extends ResolvedOp { + override def toString: String = s"(fail $operand)" +} + case class CanonRes(operand: ValueModel, peerId: ValueModel, exportTo: CallModel.Export) extends ResolvedOp { override def toString: String = s"(canon $peerId $operand $exportTo)" diff --git a/model/src/main/scala/aqua/model/OpModel.scala b/model/src/main/scala/aqua/model/OpModel.scala index 770ac487..3a6a9446 100644 --- a/model/src/main/scala/aqua/model/OpModel.scala +++ b/model/src/main/scala/aqua/model/OpModel.scala @@ -1,13 +1,15 @@ package aqua.model import aqua.model.OpModel.Tree +import aqua.tree.{TreeNode, TreeNodeCompanion} +import aqua.types.* + import cats.data.Chain import cats.free.Cofree import cats.Show import cats.Eval import cats.data.NonEmptyList -import aqua.tree.{TreeNode, TreeNodeCompanion} -import aqua.types.* +import cats.syntax.functor.* import scala.annotation.tailrec @@ -79,7 +81,15 @@ case object ParModel extends ParGroupModel case object DetachModel extends ParGroupModel -case object XorModel extends GroupOpModel +case object XorModel extends GroupOpModel { + + // If left branch is empty, return empty + override def wrap(children: Chain[Tree]): Tree = + children.headOption + .filterNot(_.head == EmptyModel) + .as(super.wrap(children)) + .getOrElse(EmptyModel.leaf) +} case class OnModel(peerId: ValueModel, via: Chain[ValueModel]) extends SeqGroupModel { @@ -146,6 +156,12 @@ case class FlattenModel(value: ValueModel, assignTo: String) extends OpModel { override def exportsVarNames: Set[String] = Set(assignTo) } +case class FailModel(value: ValueModel) extends OpModel { + override def usesVarNames: Set[String] = value.usesVarNames + + override def exportsVarNames: Set[String] = Set.empty +} + case class PushToStreamModel(value: ValueModel, exportTo: CallModel.Export) extends OpModel { override def usesVarNames: Set[String] = value.usesVarNames diff --git a/model/src/main/scala/aqua/model/ValueModel.scala b/model/src/main/scala/aqua/model/ValueModel.scala index c15af153..565e0ec5 100644 --- a/model/src/main/scala/aqua/model/ValueModel.scala +++ b/model/src/main/scala/aqua/model/ValueModel.scala @@ -2,8 +2,10 @@ package aqua.model import aqua.raw.value.* import aqua.types.* + import cats.Eq import cats.data.{Chain, NonEmptyMap} +import cats.syntax.option.* import scribe.Logging sealed trait ValueModel { @@ -18,6 +20,19 @@ sealed trait ValueModel { object ValueModel { + def errorCode(error: VarModel): Option[VarModel] = + error.intoField("error_code") + + val lastError = VarModel( + name = ValueRaw.lastError.name, + baseType = ValueRaw.lastError.baseType + ) + + val lastErrorType = ValueRaw.lastErrorType + + // NOTE: It should be safe as %last_error% should have `error_code` field + val lastErrorCode = errorCode(lastError).get + implicit object ValueModelEq extends Eq[ValueModel] { override def eqv(x: ValueModel, y: ValueModel): Boolean = x == y } @@ -46,9 +61,16 @@ case class LiteralModel(value: String, `type`: Type) extends ValueModel { } object LiteralModel { + + // AquaVM will return empty string for + // %last_error%.$.error_code if there is no %last_error% + val emptyErrorCode = quote("") + def fromRaw(raw: LiteralRaw): LiteralModel = LiteralModel(raw.value, raw.baseType) def quote(str: String): LiteralModel = LiteralModel(s"\"$str\"", LiteralType.string) + + def number(n: Int): LiteralModel = LiteralModel(n.toString, LiteralType.number) } sealed trait PropertyModel { @@ -117,6 +139,17 @@ case class VarModel(name: String, baseType: Type, properties: Chain[PropertyMode private def deriveFrom(vm: VarModel): VarModel = vm.copy(properties = vm.properties ++ properties) + def intoField(field: String): Option[VarModel] = `type` match { + case StructType(_, fields) => + fields(field) + .map(fieldType => + copy( + properties = properties :+ IntoFieldModel(field, fieldType) + ) + ) + case _ => none + } + override def resolveWith(vals: Map[String, ValueModel]): ValueModel = vals.get(name) match { case Some(vv: VarModel) => diff --git a/model/transform/src/main/scala/aqua/model/transform/Transform.scala b/model/transform/src/main/scala/aqua/model/transform/Transform.scala index b717dcd0..7917d916 100644 --- a/model/transform/src/main/scala/aqua/model/transform/Transform.scala +++ b/model/transform/src/main/scala/aqua/model/transform/Transform.scala @@ -19,6 +19,7 @@ import cats.free.Cofree import cats.syntax.option.* import scribe.Logging import aqua.model.transform.TransformConfig.TracingConfig +import aqua.model.transform.pre.{CallbackErrorHandler, ErrorHandler} // API for transforming RawTag to Res object Transform extends Logging { @@ -82,11 +83,30 @@ object Transform extends Logging { goThrough = Chain.fromOption(relayVar) ) - val errorsCatcher = ErrorsCatcher( - enabled = conf.wrapWithXor, - serviceId = conf.errorHandlingCallback, - funcName = conf.errorFuncName, - callable = initCallable + val argsProvider: ArgsProvider = ArgsFromService( + dataServiceId = conf.dataSrvId + ) + + val resultsHandler: ResultsHandler = CallbackResultsHandler( + callbackSrvId = conf.callbackSrvId, + funcName = conf.respFuncName + ) + + val errorHandler: ErrorHandler = CallbackErrorHandler( + serviceId = conf.errorHandlingSrvId, + funcName = conf.errorFuncName + ) + + // Callback on the init peer id, either done via relay or not + val callback = initCallable.service(conf.callbackSrvId) + + // preTransformer is applied before function is inlined + val preTransformer = FuncPreTransformer( + argsProvider, + resultsHandler, + errorHandler, + callback, + conf.relayVarName ) val tracing = Tracing( @@ -94,31 +114,16 @@ object Transform extends Logging { initCallable = initCallable ) - val argsProvider: ArgsProvider = ArgsFromService( - dataServiceId = conf.dataSrvId, - names = relayVar.toList ::: func.arrowType.domain.labelledData - ) - - // Transform the body of the function: wrap it with initCallable, provide function arguments via service calls - val transform: RawTag.Tree => RawTag.Tree = - argsProvider.transform andThen initCallable.transform - - // Callback on the init peer id, either done via relay or not - val callback = initCallable.service(conf.callbackSrvId) - - // preTransformer is applied before function is inlined - val preTransformer = FuncPreTransformer( - transform, - callback, - conf.respFuncName - ) - for { // Pre transform and inline the function model <- funcToModelTree(func, preTransformer) - // Post transform the function - errorsModel = errorsCatcher.transform(model) - tracingModel <- tracing(errorsModel) + // Post transform the function. + // We should wrap `model` with `onInitPeer` here + // so that TagInliner would not wrap it with `xor`. + // Topology module needs this `on` + // as a starting point. + initModel = initCallable.onInitPeer.wrap(model) + tracingModel <- tracing(initModel) // Resolve topology resolved <- Topology.resolve(tracingModel) // Clear the tree diff --git a/model/transform/src/main/scala/aqua/model/transform/TransformConfig.scala b/model/transform/src/main/scala/aqua/model/transform/TransformConfig.scala index 506c9954..63557bfc 100644 --- a/model/transform/src/main/scala/aqua/model/transform/TransformConfig.scala +++ b/model/transform/src/main/scala/aqua/model/transform/TransformConfig.scala @@ -15,17 +15,14 @@ case class TransformConfig( errorFuncName: String = "error", respFuncName: String = "response", relayVarName: Option[String] = Some("-relay-"), - wrapWithXor: Boolean = true, tracing: Option[TransformConfig.TracingConfig] = None, constants: List[ConstantRaw] = Nil ) { - import LiteralRaw.quote - - val errorId: ValueRaw = quote(errorFuncName) - val errorHandlingCallback: ValueModel = LiteralModel fromRaw quote(errorHandlingService) - val callbackSrvId: ValueRaw = quote(callbackService) - val dataSrvId: ValueRaw = quote(getDataService) + val errorId: ValueRaw = LiteralRaw.quote(errorFuncName) + val errorHandlingSrvId: ValueRaw = LiteralRaw.quote(errorHandlingService) + val callbackSrvId: ValueRaw = LiteralRaw.quote(callbackService) + val dataSrvId: ValueRaw = LiteralRaw.quote(getDataService) val constantsList: List[ConstantRaw] = ConstantRaw.defaultConstants(relayVarName) ::: constants diff --git a/model/transform/src/main/scala/aqua/model/transform/funcop/ErrorsCatcher.scala b/model/transform/src/main/scala/aqua/model/transform/funcop/ErrorsCatcher.scala deleted file mode 100644 index 46b7e01c..00000000 --- a/model/transform/src/main/scala/aqua/model/transform/funcop/ErrorsCatcher.scala +++ /dev/null @@ -1,77 +0,0 @@ -package aqua.model.transform.funcop - -import aqua.model.transform.pre.InitPeerCallable -import aqua.model.{ - CallModel, - CallServiceModel, - ForceExecModel, - LiteralModel, - MatchMismatchModel, - NoExecModel, - OnModel, - OpModel, - SeqModel, - ValueModel, - VarModel, - XorModel -} -import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw} -import aqua.types.LiteralType -import cats.Eval -import cats.data.Chain -import cats.free.Cofree - -case class ErrorsCatcher( - enabled: Boolean, - serviceId: ValueModel, - funcName: String, - callable: InitPeerCallable -) { - - private def hasExec(children: Chain[OpModel.Tree]): Boolean = - children.exists { - case Cofree(head: ForceExecModel, _) => - true - case Cofree(_, tail) => - hasExec(tail.value) - } - - def transform(op: OpModel.Tree): OpModel.Tree = - if (enabled) { - var i = 0 - Cofree - .cata[Chain, OpModel, OpModel.Tree](op) { - case (ot @ (OnModel(_, _) | MatchMismatchModel(_, _, _)), children) - if hasExec(children) => - i = i + 1 - Eval now ot.wrap( - XorModel.wrap( - SeqModel.wrap(children.toList: _*), - callable.onInitPeer.wrap( - CallServiceModel( - serviceId, - funcName, - ErrorsCatcher.lastErrorCall(i) - ).leaf - ) - ) - ) - - case (tag, children) => - Eval.now(Cofree(tag, Eval.now(children))) - } - .value - } else op - -} - -object ErrorsCatcher { - - val lastErrorArg: ValueModel = - VarModel(ValueRaw.LastError.name, ValueRaw.LastError.baseType, Chain.empty) - - def lastErrorCall(i: Int): CallModel = CallModel( - lastErrorArg :: LiteralModel.fromRaw(LiteralRaw.number(i)) :: Nil, - Nil - ) -} diff --git a/model/transform/src/main/scala/aqua/model/transform/pre/ArgsProvider.scala b/model/transform/src/main/scala/aqua/model/transform/pre/ArgsProvider.scala index d8c4b831..316db843 100644 --- a/model/transform/src/main/scala/aqua/model/transform/pre/ArgsProvider.scala +++ b/model/transform/src/main/scala/aqua/model/transform/pre/ArgsProvider.scala @@ -5,10 +5,11 @@ import aqua.raw.value.{ValueRaw, VarRaw} import aqua.types.{ArrayType, DataType, StreamType} import cats.data.Chain -trait ArgsProvider extends PreTransform +trait ArgsProvider { + def provideArgs(args: List[(String, DataType)]): List[RawTag.Tree] +} -case class ArgsFromService(dataServiceId: ValueRaw, names: List[(String, DataType)]) - extends ArgsProvider { +case class ArgsFromService(dataServiceId: ValueRaw) extends ArgsProvider { private def getStreamDataOp(name: String, t: StreamType): RawTag.Tree = { val iter = s"$name-iter" @@ -44,9 +45,7 @@ case class ArgsFromService(dataServiceId: ValueRaw, names: List[(String, DataTyp .leaf } - def transform(op: RawTag.Tree): RawTag.Tree = - SeqTag.wrap( - names.map((getDataOp _).tupled) :+ op: _* - ) + override def provideArgs(args: List[(String, DataType)]): List[RawTag.Tree] = + args.map(getDataOp.tupled) } diff --git a/model/transform/src/main/scala/aqua/model/transform/pre/ErrorHandler.scala b/model/transform/src/main/scala/aqua/model/transform/pre/ErrorHandler.scala new file mode 100644 index 00000000..679c3536 --- /dev/null +++ b/model/transform/src/main/scala/aqua/model/transform/pre/ErrorHandler.scala @@ -0,0 +1,28 @@ +package aqua.model.transform.pre + +import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw} +import aqua.raw.ops.{Call, CallArrowRawTag, RawTag} +import aqua.types.LiteralType + +import cats.Eval +import cats.data.Chain +import cats.free.Cofree + +trait ErrorHandler { + def handleLastError: RawTag.Tree +} + +case class CallbackErrorHandler( + serviceId: ValueRaw, + funcName: String +) extends ErrorHandler { + + override def handleLastError: RawTag.Tree = { + val call = Call( + args = ValueRaw.lastError :: LiteralRaw.number(0) :: Nil, + exportTo = Nil + ) + + CallArrowRawTag.service(serviceId, funcName, call).leaf + } +} diff --git a/model/transform/src/main/scala/aqua/model/transform/pre/FuncPreTransformer.scala b/model/transform/src/main/scala/aqua/model/transform/pre/FuncPreTransformer.scala index c8bad469..13a64cf2 100644 --- a/model/transform/src/main/scala/aqua/model/transform/pre/FuncPreTransformer.scala +++ b/model/transform/src/main/scala/aqua/model/transform/pre/FuncPreTransformer.scala @@ -2,36 +2,27 @@ package aqua.model.transform.pre import aqua.model.FuncArrow import aqua.model.ArgsCall -import aqua.raw.ops.{Call, CallArrowRawTag, RawTag, SeqTag} +import aqua.raw.ops.{Call, CallArrowRawTag, RawTag, SeqTag, TryTag} import aqua.raw.value.{ValueRaw, VarRaw} import aqua.types.* + import cats.syntax.show.* +import cats.syntax.option.* // TODO: doc case class FuncPreTransformer( - transform: RawTag.Tree => RawTag.Tree, + argsProvider: ArgsProvider, + resultsHandler: ResultsHandler, + errorHandler: ErrorHandler, callback: (String, Call) => RawTag.Tree, - respFuncName: String, + relayVarName: Option[String], wrapCallableName: String = "funcAround", arrowCallbackPrefix: String = "init_peer_callable_" ) { private val returnVar: String = "-return-" - /** - * Wraps return values of a function to a call on itin peer's side - * - * @param retModel List of returned values - * @return AST that consumes return values, passing them to the client - */ - private def returnCallback(retModel: List[ValueRaw]): RawTag.Tree = - callback( - respFuncName, - Call( - retModel, - Nil - ) - ) + private val relayVar = relayVarName.map(_ -> ScalarType.string) /** * Convert an arrow-type argument to init user's callback @@ -68,23 +59,35 @@ case class FuncPreTransformer( case t => t }).toLabelledList(returnVar) - val retModel = returnType.map { case (l, t) => VarRaw(l, t) } - val funcCall = Call( func.arrowType.domain.toLabelledList().map(ad => VarRaw(ad._1, ad._2)), returnType.map { case (l, t) => Call.Export(l, t) } ) + val provideArgs = argsProvider.provideArgs( + relayVar.toList ::: func.arrowType.domain.labelledData + ) + + val handleResults = resultsHandler.handleResults( + returnType + ) + + val handleError = errorHandler.handleLastError + + val call = CallArrowRawTag.func(func.funcName, funcCall).leaf + + val body = SeqTag.wrap( + provideArgs ++ List( + TryTag.wrap( + call, + handleError + ) + ) ++ handleResults + ) + FuncArrow( wrapCallableName, - transform( - SeqTag.wrap( - CallArrowRawTag.func(func.funcName, funcCall).leaf :: - returnType.headOption - .map(_ => returnCallback(retModel)) - .toList: _* - ) - ), + body, ArrowType(ConsType.cons(func.funcName, func.arrowType, NilType), NilType), Nil, func.arrowType.domain diff --git a/model/transform/src/main/scala/aqua/model/transform/pre/ResultsHandler.scala b/model/transform/src/main/scala/aqua/model/transform/pre/ResultsHandler.scala new file mode 100644 index 00000000..4345de0d --- /dev/null +++ b/model/transform/src/main/scala/aqua/model/transform/pre/ResultsHandler.scala @@ -0,0 +1,27 @@ +package aqua.model.transform.pre + +import aqua.types.Type +import aqua.raw.ops.{Call, CallArrowRawTag, RawTag} +import aqua.raw.value.{ValueRaw, VarRaw} + +import cats.syntax.option.* + +trait ResultsHandler { + def handleResults(results: List[(String, Type)]): Option[RawTag.Tree] +} + +case class CallbackResultsHandler(callbackSrvId: ValueRaw, funcName: String) + extends ResultsHandler { + + override def handleResults(results: List[(String, Type)]): Option[RawTag.Tree] = + if (results.isEmpty) none + else { + val resultVars = results.map(VarRaw.apply.tupled) + val call = Call( + args = resultVars, + exportTo = Nil + ) + + CallArrowRawTag.service(callbackSrvId, funcName, call).leaf.some + } +} diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala b/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala index f297c62a..f8e2d201 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala @@ -187,43 +187,58 @@ object Topology extends Logging { .filterNot(lastPeerId => before.headOption.exists(_.peerId == lastPeerId)) ) + // Return strategy for calculating `beforeOn` for + // node pointed on by `cursor` + private def decideBefore(cursor: OpModelTreeCursor): Before = + cursor.parentOp match { + case Some(XorModel) => XorBranch + case Some(_: SeqGroupModel) => SeqGroupBranch + case None => Root + case _ => Default + } + + // Return strategy for calculating `beginsOn` for + // node pointed on by `cursor` + private def decideBegins(cursor: OpModelTreeCursor): Begins = + (cursor.parentOp, cursor.op) match { + case (_, _: FailModel) => Fail + case (Some(_: SeqGroupModel), _: NextModel) => SeqNext + case (_, _: ForModel) => For + // No begin optimization for detach + case (_, ParModel) => ParGroup + case _ => Default + } + + // Return strategy for calculating `endsOn` for + // node pointed on by `cursor` + private def decideEnds(cursor: OpModelTreeCursor): Ends = + cursor.op match { + case _: SeqGroupModel => SeqGroup + case XorModel => XorGroup + case _: ParGroupModel => ParGroup + case _ if cursor.parentOp.isEmpty => Root + case _ => Default + } + + // Return strategy for calculating `afterOn` for + // node pointed on by `cursor` + private def decideAfter(cursor: OpModelTreeCursor): After = + (cursor.parentOp, cursor.op) match { + case (_, _: FailModel) => Fail + case (Some(_: ParGroupModel), _) => ParGroupBranch + case (Some(XorModel), _) => XorBranch + case (Some(_: SeqGroupModel), _) => SeqGroupBranch + case (None, _) => Root + case _ => Default + } + def make(cursor: OpModelTreeCursor): Topology = Topology( - cursor, - // Before - cursor.parentOp match { - case Some(XorModel) => XorBranch - case Some(_: SeqGroupModel) => SeqGroupBranch - case None => Root - case _ => Default - }, - // Begin - (cursor.parentOp, cursor.op) match { - case (Some(_: SeqGroupModel), _: NextModel) => - SeqNext - case (_, _: ForModel) => - For - case (_, ParModel) => // No begin optimization for detach - ParGroup - case _ => - Default - }, - // End - cursor.op match { - case _: SeqGroupModel => SeqGroup - case XorModel => XorGroup - case _: ParGroupModel => ParGroup - case _ if cursor.parentOp.isEmpty => Root - case _ => Default - }, - // After - cursor.parentOp match { - case Some(_: ParGroupModel) => ParGroupBranch - case Some(XorModel) => XorBranch - case Some(_: SeqGroupModel) => SeqGroupBranch - case None => Root - case _ => Default - } + cursor = cursor, + before = decideBefore(cursor), + begins = decideBegins(cursor), + ends = decideEnds(cursor), + after = decideAfter(cursor) ) def resolve(op: OpModel.Tree, debug: Boolean = false): Eval[Res] = diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Begins.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Begins.scala index b3d81036..8176b556 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Begins.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Begins.scala @@ -6,21 +6,24 @@ import aqua.model.{OnModel, ValueModel} import cats.Eval import cats.data.Chain import cats.syntax.apply.* +import cats.syntax.functor.* +import cats.syntax.monad.* +import cats.instances.tuple.* trait Begins { def beginsOn(current: Topology): Eval[List[OnModel]] = current.pathOn def pathBefore(current: Topology): Eval[Chain[ValueModel]] = - (current.beforeOn, current.beginsOn).mapN { case (bef, beg) => - (PathFinder.findPath(bef, beg), bef, beg) - }.flatMap { case (pb, bef, beg) => - // Handle the case when we need to go through the relay, but miss the hop as it's the first - // peer where we go, but there's no service calls there - current.firstExecutesOn.map { - case Some(where) if where != beg => - pb ++ Topology.findRelayPathEnforcement(bef, beg) - case _ => pb + (current.beforeOn, current.beginsOn).tupled + .fproduct(PathFinder.findPath.tupled) + .flatMap { case ((bef, beg), path) => + // Handle the case when we need to go through the relay, but miss the hop as it's the first + // peer where we go, but there's no service calls there + current.firstExecutesOn.map { + case Some(where) if where != beg => + path ++ Topology.findRelayPathEnforcement(bef, beg) + case _ => path + } } - } } diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Fail.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Fail.scala new file mode 100644 index 00000000..f5335db3 --- /dev/null +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/Fail.scala @@ -0,0 +1,33 @@ +package aqua.model.transform.topology.strategy + +import aqua.model.transform.topology.Topology +import aqua.model.ValueModel + +import aqua.model.{OnModel, XorModel} + +import cats.data.Chain +import cats.Eval +import cats.syntax.apply.* +import cats.syntax.functor.* +import cats.syntax.traverse.* +import cats.syntax.option.* +import cats.syntax.applicative.* + +object Fail extends Begins with After { + + // override just to be explicit + override def forceExit(current: Topology): Eval[Boolean] = + Eval.now(false) // There is no need to insert hops after `fail` + + override def pathBefore(current: Topology): Eval[Chain[ValueModel]] = + for { + path <- super.pathBefore(current) + begins <- current.beginsOn + // Get last hop to final peer + // if it is not in the path + // TODO: Add option to enforce last hop to [[PathFinder]] + hop = begins.headOption + .map(_.peerId) + .filterNot(peer => path.lastOption.contains(peer) || path.isEmpty) + } yield path ++ Chain.fromOption(hop) +} diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala index e4fa534f..54382bbc 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/strategy/XorBranch.scala @@ -14,7 +14,7 @@ object XorBranch extends Before with After { override def toString: String = Console.RED + "/*" + Console.RESET override def beforeOn(current: Topology): Eval[List[OnModel]] = - current.prevSibling.map(_.endsOn) getOrElse super.beforeOn(current) + current.prevSibling.map(_.beginsOn) getOrElse super.beforeOn(current) // Find closest par exit up and return its branch current is in // Returns none if there is no par up diff --git a/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala b/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala index 8cae03c0..3e5f4cec 100644 --- a/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala +++ b/model/transform/src/test/scala/aqua/model/transform/ModelBuilder.scala @@ -1,31 +1,21 @@ package aqua.model.transform -import aqua.model.{ - CallModel, - CallServiceModel, - DetachModel, - ForModel, - LiteralModel, - NextModel, - OpModel, - ParModel, - SeqModel, - ValueModel, - VarModel -} -import aqua.model.transform.funcop.ErrorsCatcher +import aqua.model.* import aqua.raw.ops.Call import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw} import aqua.{model, res} import aqua.res.{CallRes, CallServiceRes, MakeRes} import aqua.types.{ArrayType, LiteralType, ScalarType} - -import scala.language.implicitConversions import aqua.types.StreamType import aqua.model.IntoIndexModel +import aqua.model.inline.raw.ApplyGateRawInliner + +import scala.language.implicitConversions import cats.data.Chain import cats.data.Chain.==: -import aqua.model.inline.raw.ApplyGateRawInliner +import aqua.model.OnModel +import aqua.model.FailModel +import aqua.res.ResolvedOp object ModelBuilder { implicit def rawToValue(raw: ValueRaw): ValueModel = ValueModel.fromRaw(raw) @@ -40,10 +30,13 @@ object ModelBuilder { val otherPeer = VarRaw("other-peer", ScalarType.string) - val otherPeerL = LiteralRaw("\"other-peer\"", LiteralType.string) - val otherRelay = LiteralRaw("other-relay", ScalarType.string) - val otherPeer2 = LiteralRaw("other-peer-2", ScalarType.string) - val otherRelay2 = LiteralRaw("other-relay-2", ScalarType.string) + def otherPeerN(n: Int) = LiteralRaw.quote(s"other-peer-$n") + def otherRelayN(n: Int) = LiteralRaw.quote(s"other-relay-$n") + + val otherPeerL = LiteralRaw.quote("other-peer") + val otherRelay = LiteralRaw.quote("other-relay") + val otherPeer2 = otherPeerN(2) + val otherRelay2 = otherRelayN(2) val iRelay = VarRaw("i", ScalarType.string) val varNode = VarRaw("node-id", ScalarType.string) val viaList = VarRaw("other-relay-2", ArrayType(ScalarType.string)) @@ -84,10 +77,10 @@ object ModelBuilder { def errorCall(bc: TransformConfig, i: Int, on: ValueModel = initPeer) = res .CallServiceRes( - bc.errorHandlingCallback, + ValueModel.fromRaw(bc.errorHandlingSrvId), bc.errorFuncName, CallRes( - ErrorsCatcher.lastErrorArg :: LiteralModel( + ValueModel.lastError :: LiteralModel( i.toString, LiteralType.number ) :: Nil, @@ -117,6 +110,22 @@ object ModelBuilder { ) .leaf + val failLastErrorModel = FailModel(ValueModel.lastError).leaf + + val failLastErrorRes = res.FailRes(ValueModel.lastError).leaf + + def onRethrowModel( + peer: ValueModel, + via: ValueModel* + ): OpModel.Tree => OpModel.Tree = + child => + XorModel.wrap( + OnModel(peer, Chain.fromSeq(via)).wrap( + child + ), + failLastErrorModel + ) + def fold(item: String, iter: ValueRaw, mode: Option[ForModel.Mode], body: OpModel.Tree*) = { val ops = SeqModel.wrap(body: _*) ForModel(item, ValueModel.fromRaw(iter), mode).wrap(ops, NextModel(item).leaf) @@ -130,7 +139,7 @@ object ModelBuilder { ) } - def through(peer: ValueModel) = + def through(peer: ValueModel): ResolvedOp.Tree = MakeRes.hop(peer) /** diff --git a/model/transform/src/test/scala/aqua/model/transform/TransformSpec.scala b/model/transform/src/test/scala/aqua/model/transform/TransformSpec.scala index 429136dc..781d696a 100644 --- a/model/transform/src/test/scala/aqua/model/transform/TransformSpec.scala +++ b/model/transform/src/test/scala/aqua/model/transform/TransformSpec.scala @@ -6,11 +6,13 @@ import aqua.model.{CallModel, FuncArrow, LiteralModel, VarModel} import aqua.raw.ops.{Call, CallArrowRawTag, FuncOp, OnTag, RawTag, SeqTag} import aqua.raw.value.{LiteralRaw, VarRaw} import aqua.types.{ArrowType, NilType, ProductType, ScalarType} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw} import aqua.res.{CallRes, CallServiceRes, MakeRes, SeqRes, XorRes} + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers import cats.data.Chain +import cats.syntax.show.* class TransformSpec extends AnyFlatSpec with Matchers { @@ -47,13 +49,13 @@ class TransformSpec extends AnyFlatSpec with Matchers { val procFC = fc.value.body val expectedFC = - XorRes.wrap( - SeqRes.wrap( - dataCall(bc, "-relay-", initPeer), - through(relayV), - through(otherRelay), + SeqRes.wrap( + dataCall(bc, "-relay-", initPeer), + XorRes.wrap( XorRes.wrap( SeqRes.wrap( + through(relayV), + through(otherRelay), callRes(1, otherPeer), through(otherRelay), through(relayV) @@ -61,15 +63,13 @@ class TransformSpec extends AnyFlatSpec with Matchers { SeqRes.wrap( through(otherRelay), through(relayV), - errorCall(bc, 1, initPeer) + through(initPeer), + failLastErrorRes ) ), - XorRes.wrap( - respCall(bc, ret, initPeer), - errorCall(bc, 2, initPeer) - ) + errorCall(bc, 0, initPeer) ), - errorCall(bc, 3, initPeer) + respCall(bc, ret, initPeer) ) procFC.equalsOrShowDiff(expectedFC) should be(true) @@ -90,7 +90,7 @@ class TransformSpec extends AnyFlatSpec with Matchers { None ) - val bc = TransformConfig(wrapWithXor = false) + val bc = TransformConfig() val fc = Transform.funcRes(func, bc) @@ -99,10 +99,24 @@ class TransformSpec extends AnyFlatSpec with Matchers { val expectedFC = SeqRes.wrap( dataCall(bc, "-relay-", initPeer), - callRes(0, initPeer), - through(relayV), - callRes(1, otherPeer), - through(relayV), + XorRes.wrap( + SeqRes.wrap( + callRes(0, initPeer), + XorRes.wrap( + SeqRes.wrap( + through(relayV), + callRes(1, otherPeer), + through(relayV) + ), + SeqRes.wrap( + through(relayV), + through(initPeer), + failLastErrorRes + ) + ) + ), + errorCall(bc, 0, initPeer) + ), respCall(bc, ret, initPeer) ) @@ -124,13 +138,7 @@ class TransformSpec extends AnyFlatSpec with Matchers { val f1: FuncArrow = FuncArrow( "f1", - CallArrowRawTag - .service( - LiteralRaw.quote("srv1"), - "foo", - Call(Nil, Call.Export("v", ScalarType.string) :: Nil) - ) - .leaf, + callOp(1).leaf, stringArrow, VarRaw("v", ScalarType.string) :: Nil, Map.empty, @@ -151,22 +159,20 @@ class TransformSpec extends AnyFlatSpec with Matchers { None ) - val bc = TransformConfig(wrapWithXor = false) + val bc = TransformConfig() - val res = Transform.funcRes(f2, bc).value.body + val procFC = Transform.funcRes(f2, bc).value.body - res.equalsOrShowDiff( - SeqRes.wrap( - dataCall(bc, "-relay-", initPeer), - CallServiceRes( - LiteralRaw.quote("srv1"), - "foo", - CallRes(Nil, Some(CallModel.Export("v", ScalarType.string))), - initPeer - ).leaf, - respCall(bc, VarRaw("v", ScalarType.string), initPeer) - ) - ) should be(true) + val expectedFC = SeqRes.wrap( + dataCall(bc, "-relay-", initPeer), + XorRes.wrap( + callRes(1, initPeer), + errorCall(bc, 0, initPeer) + ), + respCall(bc, VarRaw("v", ScalarType.string), initPeer) + ) + + procFC.equalsOrShowDiff(expectedFC) should be(true) } } diff --git a/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala b/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala index bb355172..95481cfa 100644 --- a/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala +++ b/model/transform/src/test/scala/aqua/model/transform/topology/TopologySpec.scala @@ -66,7 +66,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "go through relay to any other node, directly" in { + it should "go through relay to any other node, directly" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( OnModel(otherPeer, Chain.empty).wrap( @@ -85,7 +85,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "go through relay to any other node, via another relay" in { + it should "go through relay to any other node, via another relay" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( OnModel(otherPeer, Chain.one(otherRelay)).wrap( @@ -109,7 +109,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "build return path in par if there are exported variables" in { + it should "build return path in par if there are exported variables" in { val exportTo = CallModel.Export("result", ScalarType.string) :: Nil val result = VarRaw("result", ScalarType.string) @@ -148,7 +148,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "work fine with par" in { + it should "work fine with par" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( ParModel.wrap( OnModel( @@ -174,17 +174,19 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "create correct calls in try" in { + it should "create correct calls in try" in { val init = XorModel.wrap(callModel(1)) val proc = Topology.resolve(init).value - proc.equalsOrShowDiff( - Cofree[Chain, ResolvedOp](XorRes, Eval.now(Chain.one(callRes(1, initPeer)))) - ) should be(true) + val expected = XorRes.wrap( + callRes(1, initPeer) + ) + + proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "work fine with par with on" in { + it should "work fine with par with on" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( ParModel.wrap( OnModel( @@ -216,7 +218,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "go through relay to any other node, via another relay, in complex xor/seq" in { + it should "go through relay to any other node, via another relay, in complex xor/seq" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( OnModel(otherPeer, Chain.one(otherRelay)).wrap( @@ -248,7 +250,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "simplify a route with init_peer_id" in { + it should "simplify a route with init_peer_id" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( SeqModel.wrap( OnModel( @@ -270,7 +272,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "get back to init peer" in { + it should "get back to init peer" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( SeqModel.wrap( @@ -297,7 +299,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "not stackoverflow" in { + it should "not stackoverflow" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( SeqModel.wrap( callModel(1), @@ -315,7 +317,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { Topology.resolve(init).value } - "topology resolver" should "get back to init peer after a long chain" in { + it should "get back to init peer after a long chain" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( SeqModel.wrap( OnModel(otherPeer, Chain.one(otherRelay)).wrap( @@ -335,28 +337,27 @@ class TopologySpec extends AnyFlatSpec with Matchers { val proc = Topology.resolve(init).value - val expected = - SeqRes.wrap( - through(relay), - through(otherRelay), - callRes(0, otherPeer), - through(otherRelay), - callRes(1, otherPeer2), - MatchMismatchRes(otherPeer, otherRelay, true).wrap( - SeqRes.wrap( - through(otherRelay), - callRes(2, otherPeer) - ) - ), - through(otherRelay), - through(relay), - callRes(3, initPeer) - ) + val expected = SeqRes.wrap( + through(relay), + through(otherRelay), + callRes(0, otherPeer), + through(otherRelay), + callRes(1, otherPeer2), + MatchMismatchRes(otherPeer, otherRelay, true).wrap( + SeqRes.wrap( + through(otherRelay), + callRes(2, otherPeer) + ) + ), + through(otherRelay), + through(relay), + callRes(3, initPeer) + ) proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "resolve xor path" in { + it should "resolve xor path" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( SeqModel.wrap( @@ -413,7 +414,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { // this example doesn't create a hop on relay after fold // but the test create it, so there is not a one-on-one simulation // change it or write an integration test - "topology resolver" should "create returning hops on chain of 'on'" in { + it should "create returning hops on chain of 'on'" in { val init = OnModel(initPeer, Chain.one(relay)).wrap( OnModel(otherPeer, Chain.empty).wrap( @@ -444,11 +445,12 @@ class TopologySpec extends AnyFlatSpec with Matchers { through(relay), callRes(3, initPeer) ) + proc.equalsOrShowDiff(expected) should be(true) } // https://github.com/fluencelabs/aqua/issues/427 - "topology resolver" should "create returning hops after for-par with inner `on` and xor" in { + it should "create returning hops after for-par with inner `on` and xor" in { val streamRaw = VarRaw("stream", StreamType(ScalarType.string)) val streamRawEl = VarRaw("stream", StreamType(ScalarType.string)).withProperty( @@ -459,125 +461,124 @@ class TopologySpec extends AnyFlatSpec with Matchers { val (joinModel, joinRes) = joinModelRes(streamEl) - val init = - SeqModel.wrap( - DeclareStreamModel(stream).leaf, - OnModel(initPeer, Chain.one(relay)).wrap( - foldPar( - "i", - valueArray, - OnModel(iRelay, Chain.empty).wrap( - XorModel.wrap( - callModel(2, CallModel.Export(streamRaw.name, streamRaw.`type`) :: Nil), - OnModel(initPeer, Chain.one(relay)).wrap( - callModel(4, Nil, Nil) - ) + val init = SeqModel.wrap( + DeclareStreamModel(stream).leaf, + OnModel(initPeer, Chain.one(relay)).wrap( + foldPar( + "i", + valueArray, + OnModel(iRelay, Chain.empty).wrap( + XorModel.wrap( + callModel(2, CallModel.Export(streamRaw.name, streamRaw.`type`) :: Nil), + OnModel(initPeer, Chain.one(relay)).wrap( + callModel(4, Nil, Nil) ) ) - ), - joinModel, - callModel(3, Nil, streamRaw :: Nil) - ) + ) + ), + joinModel, + callModel(3, Nil, streamRaw :: Nil) ) + ) val proc = Topology.resolve(init).value - val expected = - SeqRes.wrap( - through(relay), - ParRes.wrap( - FoldRes("i", valueArray, Some(ForModel.NeverMode)).wrap( - ParRes.wrap( - // better if first relay will be outside `for` - SeqRes.wrap( - through(relay), + val expected = SeqRes.wrap( + through(relay), + ParRes.wrap( + FoldRes("i", valueArray, Some(ForModel.NeverMode)).wrap( + ParRes.wrap( + // better if first relay will be outside `for` + SeqRes.wrap( + through(relay), + XorRes.wrap( + SeqRes.wrap( + callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))), + through(relay), + through(initPeer) + ), + SeqRes.wrap( + through(relay), + callRes(4, initPeer) + ) + ) + ), + NextRes("i").leaf + ) + ) + ), + joinRes, + callRes(3, initPeer, None, stream :: Nil) + ) + + proc.equalsOrShowDiff(expected) should be(true) + } + + // https://github.com/fluencelabs/aqua/issues/427 + it should "create returning hops after for-par with inner `on` and xor, version 2" in { + + val streamRaw = VarRaw("stream", StreamType(ScalarType.string)) + val streamRawEl = VarRaw("stream", StreamType(ScalarType.string)).withProperty( + IntoIndexRaw(LiteralRaw("2", ScalarType.u32), ScalarType.string) + ) + val stream = ValueModel.fromRaw(streamRaw) + val streamEl = ValueModel.fromRaw(streamRawEl) + + val (joinModel, joinRes) = joinModelRes(streamEl) + + val init = SeqModel.wrap( + DeclareStreamModel(stream).leaf, + OnModel(initPeer, Chain.one(relay)).wrap( + foldPar( + "i", + valueArray, + OnModel(iRelay, Chain.empty).wrap( + XorModel.wrap( + XorModel.wrap( + callModel(2, CallModel.Export(streamRaw.name, streamRaw.`type`) :: Nil) + ), + OnModel(initPeer, Chain.one(relay)).wrap( + callModel(4, Nil, Nil) + ) + ) + ) + ), + joinModel, + callModel(3, Nil, streamRaw :: Nil) + ) + ) + + val proc = Topology.resolve(init).value + + val expected = SeqRes.wrap( + through(relay), + ParRes.wrap( + FoldRes("i", valueArray, Some(ForModel.NeverMode)).wrap( + ParRes.wrap( + // better if first relay will be outside `for` + SeqRes.wrap( + through(relay), + XorRes.wrap( XorRes.wrap( SeqRes.wrap( callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))), through(relay), through(initPeer) - ), - SeqRes.wrap( - through(relay), - callRes(4, initPeer) ) - ) - ), - NextRes("i").leaf - ) - ) - ), - joinRes, - callRes(3, initPeer, None, stream :: Nil) - ) - - proc.equalsOrShowDiff(expected) should be(true) - } - - // https://github.com/fluencelabs/aqua/issues/427 - "topology resolver" should "create returning hops after for-par with inner `on` and xor, version 2" in { - - val streamRaw = VarRaw("stream", StreamType(ScalarType.string)) - val streamRawEl = VarRaw("stream", StreamType(ScalarType.string)).withProperty( - IntoIndexRaw(LiteralRaw("2", ScalarType.u32), ScalarType.string) - ) - val stream = ValueModel.fromRaw(streamRaw) - val streamEl = ValueModel.fromRaw(streamRawEl) - - val (joinModel, joinRes) = joinModelRes(streamEl) - - val init = - SeqModel.wrap( - DeclareStreamModel(stream).leaf, - OnModel(initPeer, Chain.one(relay)).wrap( - foldPar( - "i", - valueArray, - OnModel(iRelay, Chain.empty).wrap( - XorModel.wrap( - XorModel.wrap( - callModel(2, CallModel.Export(streamRaw.name, streamRaw.`type`) :: Nil) ), - OnModel(initPeer, Chain.one(relay)).wrap( - callModel(4, Nil, Nil) - ) - ) - ) - ), - joinModel, - callModel(3, Nil, streamRaw :: Nil) - ) - ) - - val proc = Topology.resolve(init).value - - val expected = - SeqRes.wrap( - through(relay), - ParRes.wrap( - FoldRes("i", valueArray, Some(ForModel.NeverMode)).wrap( - ParRes.wrap( - // better if first relay will be outside `for` - SeqRes.wrap( - through(relay), - XorRes.wrap( - XorRes.wrap( - SeqRes.wrap( - callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))), - through(relay), - through(initPeer) - ) - ), + SeqRes.wrap( + through(relay), callRes(4, initPeer) ) - ), - NextRes("i").leaf - ) + ) + ), + NextRes("i").leaf ) - ), - joinRes, - callRes(3, initPeer, None, stream :: Nil) - ) + ) + ), + joinRes, + callRes(3, initPeer, None, stream :: Nil) + ) // println(Console.MAGENTA + init.show + Console.RESET) // println(Console.YELLOW + proc.show + Console.RESET) @@ -586,23 +587,22 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "create returning hops on nested 'on'" in { - val init = - OnModel(initPeer, Chain.one(relay)).wrap( - callModel(0), - OnModel(otherPeer, Chain.empty).wrap( - callModel(1), - fold( - "i", - valueArray, - None, - OnModel(otherPeer2, Chain.one(otherRelay2)).wrap( - callModel(2) - ) + it should "create returning hops on nested 'on'" in { + val init = OnModel(initPeer, Chain.one(relay)).wrap( + callModel(0), + OnModel(otherPeer, Chain.empty).wrap( + callModel(1), + fold( + "i", + valueArray, + None, + OnModel(otherPeer2, Chain.one(otherRelay2)).wrap( + callModel(2) ) - ), - callModel(3) - ) + ) + ), + callModel(3) + ) val proc = Topology.resolve(init).value @@ -625,19 +625,18 @@ class TopologySpec extends AnyFlatSpec with Matchers { } // https://github.com/fluencelabs/aqua/issues/205 - "topology resolver" should "optimize path over fold" in { + it should "optimize path over fold" in { val i = VarRaw("i", ScalarType.string) - val init = - OnModel(initPeer, Chain.one(relay)).wrap( - fold( - "i", - valueArray, - None, - OnModel(i, Chain.one(otherRelay)).wrap( - callModel(1) - ) + val init = OnModel(initPeer, Chain.one(relay)).wrap( + fold( + "i", + valueArray, + None, + OnModel(i, Chain.one(otherRelay)).wrap( + callModel(1) ) ) + ) val proc = Topology.resolve(init).value @@ -659,9 +658,36 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "handle detach" in { - val init = - OnModel(initPeer, Chain.one(relay)).wrap( + it should "handle detach" in { + val init = OnModel(initPeer, Chain.one(relay)).wrap( + DetachModel.wrap( + OnModel(otherPeer, Chain.empty).wrap( + callModel(1, CallModel.Export(varNode.name, varNode.baseType) :: Nil) + ) + ), + callModel(2, Nil, varNode :: Nil) + ) + + val proc = Topology.resolve(init).value + + val expected = SeqRes.wrap( + ParRes.wrap( + SeqRes.wrap( + through(relay), + callRes(1, otherPeer, Some(CallModel.Export(varNode.name, varNode.baseType))), + through(relay), + through(initPeer) // pingback + ) + ), + callRes(2, initPeer, None, varNode :: Nil) + ) + + proc.equalsOrShowDiff(expected) should be(true) + } + + it should "handle moved detach" in { + val init = OnModel(initPeer, Chain.one(relay)).wrap( + OnModel(otherPeer2, Chain.empty).wrap( DetachModel.wrap( OnModel(otherPeer, Chain.empty).wrap( callModel(1, CallModel.Export(varNode.name, varNode.baseType) :: Nil) @@ -669,56 +695,25 @@ class TopologySpec extends AnyFlatSpec with Matchers { ), callModel(2, Nil, varNode :: Nil) ) + ) val proc = Topology.resolve(init).value - val expected = - SeqRes.wrap( - ParRes.wrap( - SeqRes.wrap( - through(relay), - callRes(1, otherPeer, Some(CallModel.Export(varNode.name, varNode.baseType))), - through(relay), - through(initPeer) // pingback - ) - ), - callRes(2, initPeer, None, varNode :: Nil) - ) - - proc.equalsOrShowDiff(expected) should be(true) - } - - "topology resolver" should "handle moved detach" in { - val init = - OnModel(initPeer, Chain.one(relay)).wrap( - OnModel(otherPeer2, Chain.empty).wrap( - DetachModel.wrap( - OnModel(otherPeer, Chain.empty).wrap( - callModel(1, CallModel.Export(varNode.name, varNode.baseType) :: Nil) - ) - ), - callModel(2, Nil, varNode :: Nil) + val expected = SeqRes.wrap( + through(relay), + ParRes.wrap( + SeqRes.wrap( + callRes(1, otherPeer, Some(CallModel.Export(varNode.name, varNode.baseType))), + through(otherPeer2) // pingback ) - ) - - val proc = Topology.resolve(init).value - - val expected = - SeqRes.wrap( - through(relay), - ParRes.wrap( - SeqRes.wrap( - callRes(1, otherPeer, Some(CallModel.Export(varNode.name, varNode.baseType))), - through(otherPeer2) // pingback - ) - ), - callRes(2, otherPeer2, None, varNode :: Nil) - ) + ), + callRes(2, otherPeer2, None, varNode :: Nil) + ) proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "make right hops on for-par behaviour" in { + it should "make right hops on for-par behaviour" in { val init = SeqModel.wrap( OnModel(otherPeer, Chain.one(relayV)).wrap( callModel(1), @@ -737,66 +732,63 @@ class TopologySpec extends AnyFlatSpec with Matchers { val proc = Topology.resolve(init).value - val expected = - SeqRes.wrap( - callRes(1, otherPeer), - ParRes.wrap( - FoldRes("i", valueArray, Some(ForModel.NeverMode)).wrap( - ParRes.wrap( - SeqRes.wrap( - // TODO: should be outside of fold - through(relayV), - callRes( - 2, - LiteralRaw("i", ScalarType.string), - Some(CallModel.Export("used", StreamType(ScalarType.string))) - ), - // after call `i` topology should send to `otherPeer2` if it's not fire-and-forget – to trigger execution - through(otherPeer2) + val expected = SeqRes.wrap( + callRes(1, otherPeer), + ParRes.wrap( + FoldRes("i", valueArray, Some(ForModel.NeverMode)).wrap( + ParRes.wrap( + SeqRes.wrap( + // TODO: should be outside of fold + through(relayV), + callRes( + 2, + LiteralRaw("i", ScalarType.string), + Some(CallModel.Export("used", StreamType(ScalarType.string))) ), - NextRes("i").leaf - ) + // after call `i` topology should send to `otherPeer2` if it's not fire-and-forget – to trigger execution + through(otherPeer2) + ), + NextRes("i").leaf ) - ), - callRes(3, otherPeer2, None, VarModel("used", StreamType(ScalarType.string)) :: Nil) - ) + ) + ), + callRes(3, otherPeer2, None, VarModel("used", StreamType(ScalarType.string)) :: Nil) + ) proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "handle detach moved to relay" in { - val init = - OnModel(initPeer, Chain.one(relay)).wrap( - OnModel(relay, Chain.empty).wrap( - DetachModel.wrap( - OnModel(otherPeer, Chain.empty).wrap( - callModel(1, CallModel.Export(varNode.name, varNode.baseType) :: Nil) - ) + it should "handle detach moved to relay" in { + val init = OnModel(initPeer, Chain.one(relay)).wrap( + OnModel(relay, Chain.empty).wrap( + DetachModel.wrap( + OnModel(otherPeer, Chain.empty).wrap( + callModel(1, CallModel.Export(varNode.name, varNode.baseType) :: Nil) ) - ), - callModel(2, Nil, varNode :: Nil) - ) + ) + ), + callModel(2, Nil, varNode :: Nil) + ) val proc = Topology.resolve(init).value - val expected = - SeqRes.wrap( - through(relay), - ParRes.wrap( - SeqRes.wrap( - callRes(1, otherPeer, Some(CallModel.Export(varNode.name, varNode.baseType))), - through(relay), // pingback - through(initPeer) // pingback - ) - ), - callRes(2, initPeer, None, varNode :: Nil) - ) + val expected = SeqRes.wrap( + through(relay), + ParRes.wrap( + SeqRes.wrap( + callRes(1, otherPeer, Some(CallModel.Export(varNode.name, varNode.baseType))), + through(relay), // pingback + through(initPeer) // pingback + ) + ), + callRes(2, initPeer, None, varNode :: Nil) + ) proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "place ping inside par" in { + it should "place ping inside par" in { val i = LiteralRaw("i", ScalarType.string) val used = VarRaw("used", StreamType(ScalarType.string)) val usedWithIdx = @@ -804,103 +796,99 @@ class TopologySpec extends AnyFlatSpec with Matchers { val (joinModel, joinRes) = joinModelRes(usedWithIdx) - val init = - OnModel(initPeer, Chain.one(relay)).wrap( - foldPar( - "i", - valueArray, - OnModel(i, Chain.empty).wrap( + val init = OnModel(initPeer, Chain.one(relay)).wrap( + foldPar( + "i", + valueArray, + OnModel(i, Chain.empty).wrap( + callModel(1, CallModel.Export(used.name, used.`type`) :: Nil) + ) + ), + joinModel, + callModel(3, Nil, used :: Nil) + ) + + val proc = Topology.resolve(init).value + + val expected = SeqRes.wrap( + ParRes.wrap( + FoldRes("i", ValueModel.fromRaw(valueArray), Some(ForModel.NeverMode)).wrap( + ParRes.wrap( + SeqRes.wrap( + through(relay), + callRes( + 1, + ValueModel.fromRaw(i), + Some(CallModel.Export(used.name, used.`type`)) + ), + through(relay), + through(initPeer) + ), + NextRes("i").leaf + ) + ) + ), + joinRes, + callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil) + ) + + proc.equalsOrShowDiff(expected) should be(true) + } + + it should "place ping inside par with xor" in { + val i = LiteralRaw("i", ScalarType.string) + val used = VarRaw("used", StreamType(ScalarType.string)) + val usedWithIdx = + used.withProperty(IntoIndexRaw(LiteralRaw("1", ScalarType.u32), ScalarType.string)) + + val (joinModel, joinRes) = joinModelRes(usedWithIdx) + + val init = OnModel(initPeer, Chain.one(relay)).wrap( + foldPar( + "i", + valueArray, + OnModel(i, Chain.empty).wrap( + XorModel.wrap( callModel(1, CallModel.Export(used.name, used.`type`) :: Nil) ) - ), - joinModel, - callModel(3, Nil, used :: Nil) - ) + ) + ), + joinModel, + callModel(3, Nil, used :: Nil) + ) val proc = Topology.resolve(init).value - val expected = - SeqRes.wrap( - ParRes.wrap( - FoldRes("i", ValueModel.fromRaw(valueArray), Some(ForModel.NeverMode)).wrap( - ParRes.wrap( - SeqRes.wrap( - through(relay), - callRes( - 1, - ValueModel.fromRaw(i), - Some(CallModel.Export(used.name, used.`type`)) - ), - through(relay), - through(initPeer) - ), - NextRes("i").leaf - ) - ) - ), - joinRes, - callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil) - ) - - proc.equalsOrShowDiff(expected) should be(true) - } - - "topology resolver" should "place ping inside par with xor" in { - val i = LiteralRaw("i", ScalarType.string) - val used = VarRaw("used", StreamType(ScalarType.string)) - val usedWithIdx = - used.withProperty(IntoIndexRaw(LiteralRaw("1", ScalarType.u32), ScalarType.string)) - - val (joinModel, joinRes) = joinModelRes(usedWithIdx) - - val init = - OnModel(initPeer, Chain.one(relay)).wrap( - foldPar( - "i", - valueArray, - OnModel(i, Chain.empty).wrap( - XorModel.wrap( - callModel(1, CallModel.Export(used.name, used.`type`) :: Nil) - ) - ) - ), - joinModel, - callModel(3, Nil, used :: Nil) - ) - - val proc = Topology.resolve(init).value - - val expected = - SeqRes.wrap( - ParRes.wrap( - FoldRes("i", ValueModel.fromRaw(valueArray), Some(ForModel.NeverMode)).wrap( - ParRes.wrap( - SeqRes.wrap( - through(relay), - XorRes.wrap( - SeqRes.wrap( - callRes( - 1, - ValueModel.fromRaw(i), - Some(CallModel.Export(used.name, used.`type`)) - ), - through(relay), - through(initPeer) - ) + val expected = SeqRes.wrap( + ParRes.wrap( + FoldRes("i", ValueModel.fromRaw(valueArray), Some(ForModel.NeverMode)).wrap( + ParRes.wrap( + SeqRes.wrap( + through(relay), + XorRes.wrap( + SeqRes.wrap( + callRes( + 1, + ValueModel.fromRaw(i), + Some(CallModel.Export(used.name, used.`type`)) + ), + through(relay), + through(initPeer) ) - ), - NextRes("i").leaf - ) + ) + ), + NextRes("i").leaf ) - ), - joinRes, - callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil) - ) + ) + ), + joinRes, + callRes(3, initPeer, None, ValueModel.fromRaw(used) :: Nil) + ) proc.equalsOrShowDiff(expected) should be(true) } - "topology resolver" should "handle empty for correctly [bug LNG-149]" in { + it should "handle empty for correctly [bug LNG-149]" in { val streamName = "array-inline" val iterName = "a-0" val streamType = StreamType(LiteralType.number) @@ -947,4 +935,271 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrShowDiff(expected) shouldEqual true } + + it should "handle error rethrow for `on`" in { + val model = OnModel(initPeer, Chain.one(relay)).wrap( + SeqModel.wrap( + callModel(1), + onRethrowModel(otherPeerL, otherRelay)( + callModel(2) + ) + ) + ) + + val proc = Topology.resolve(model).value + + val expected = SeqRes.wrap( + callRes(1, initPeer), + XorRes.wrap( + SeqRes.wrap( + through(relay), + through(otherRelay), + callRes(2, otherPeerL) + ), + SeqRes.wrap( + through(otherRelay), + through(relay), + through(initPeer), + failLastErrorRes + ) + ) + ) + + proc.equalsOrShowDiff(expected) shouldEqual true + } + + it should "handle error rethrow for nested `on`" in { + val model = OnModel(initPeer, Chain.one(relay)).wrap( + SeqModel.wrap( + callModel(1), + onRethrowModel(otherPeerL, otherRelay)( + SeqModel.wrap( + callModel(2), + onRethrowModel(otherPeer2, otherRelay2)( + callModel(3) + ) + ) + ) + ) + ) + + val proc = Topology.resolve(model).value + + val expected = SeqRes.wrap( + callRes(1, initPeer), + XorRes.wrap( + SeqRes.wrap( + through(relay), + through(otherRelay), + callRes(2, otherPeerL), + XorRes.wrap( + SeqRes.wrap( + through(otherRelay), + through(otherRelay2), + callRes(3, otherPeer2), + // TODO: Why back hops are generated? + through(otherRelay2), + through(otherRelay) + ), + SeqRes.wrap( + through(otherRelay2), + through(otherRelay), + through(otherPeerL), + failLastErrorRes + ) + ) + ), + SeqRes.wrap( + through(otherRelay), + through(relay), + through(initPeer), + failLastErrorRes + ) + ) + ) + + proc.equalsOrShowDiff(expected) shouldEqual true + } + + it should "handle error rethrow for nested `on` without `via`" in { + val model = OnModel(initPeer, Chain.one(relay)).wrap( + SeqModel.wrap( + callModel(1), + onRethrowModel(otherPeerL, otherRelay)( + SeqModel.wrap( + callModel(2), + onRethrowModel(otherPeerN(3))( + SeqModel.wrap( + callModel(3), + onRethrowModel(otherPeer2, otherRelay2)( + callModel(4) + ) + ) + ) + ) + ) + ) + ) + + val proc = Topology.resolve(model).value + + val expected = SeqRes.wrap( + callRes(1, initPeer), + XorRes.wrap( + SeqRes.wrap( + through(relay), + through(otherRelay), + callRes(2, otherPeerL), + XorRes.wrap( + SeqRes.wrap( + through(otherRelay), + callRes(3, otherPeerN(3)), + XorRes.wrap( + SeqRes.wrap( + through(otherRelay2), + callRes(4, otherPeer2), + // TODO: Why back hops are generated? + through(otherRelay2), + through(otherRelay) + ), + SeqRes.wrap( + through(otherRelay2), + through(otherPeerN(3)), + failLastErrorRes + ) + ) + ), + SeqRes.wrap( + through(otherRelay), + through(otherPeerL), + failLastErrorRes + ) + ) + ), + SeqRes.wrap( + through(otherRelay), + through(relay), + through(initPeer), + failLastErrorRes + ) + ) + ) + + proc.equalsOrShowDiff(expected) shouldEqual true + } + + it should "handle error rethrow for sequential `on`" in { + val model = OnModel(initPeer, Chain.one(relay)).wrap( + SeqModel.wrap( + callModel(1), + onRethrowModel(otherPeerL, otherRelay)( + callModel(2) + ), + onRethrowModel(otherPeer2, otherRelay2)( + callModel(3) + ) + ) + ) + + val proc = Topology.resolve(model).value + + val expected = SeqRes.wrap( + callRes(1, initPeer), + XorRes.wrap( + SeqRes.wrap( + through(relay), + through(otherRelay), + callRes(2, otherPeerL), + through(otherRelay), + through(relay) + ), + SeqRes.wrap( + through(otherRelay), + through(relay), + through(initPeer), + failLastErrorRes + ) + ), + XorRes.wrap( + SeqRes.wrap( + through(relay), + through(otherRelay2), + callRes(3, otherPeer2) + ), + SeqRes.wrap( + through(otherRelay2), + through(relay), + through(initPeer), + failLastErrorRes + ) + ) + ) + + proc.equalsOrShowDiff(expected) shouldEqual true + } + + it should "handle error rethrow for sequential `on` without `via`" in { + val model = OnModel(initPeer, Chain.one(relay)).wrap( + SeqModel.wrap( + callModel(1), + onRethrowModel(otherPeerL, otherRelay)( + callModel(2) + ), + onRethrowModel(otherPeerN(3))( + callModel(3) + ), + onRethrowModel(otherPeer2, otherRelay2)( + callModel(4) + ) + ) + ) + + val proc = Topology.resolve(model).value + + val expected = SeqRes.wrap( + callRes(1, initPeer), + XorRes.wrap( + SeqRes.wrap( + through(relay), + through(otherRelay), + callRes(2, otherPeerL), + through(otherRelay), + through(relay) + ), + SeqRes.wrap( + through(otherRelay), + through(relay), + through(initPeer), + failLastErrorRes + ) + ), + XorRes.wrap( + SeqRes.wrap( + through(relay), + callRes(3, otherPeerN(3)), + through(relay) + ), + SeqRes.wrap( + through(relay), + through(initPeer), + failLastErrorRes + ) + ), + XorRes.wrap( + SeqRes.wrap( + through(relay), + through(otherRelay2), + callRes(4, otherPeer2) + ), + SeqRes.wrap( + through(otherRelay2), + through(relay), + through(initPeer), + failLastErrorRes + ) + ) + ) + + proc.equalsOrShowDiff(expected) shouldEqual true + } } diff --git a/semantics/src/main/scala/aqua/semantics/expr/func/CatchSem.scala b/semantics/src/main/scala/aqua/semantics/expr/func/CatchSem.scala index 9fba2e3e..556607e9 100644 --- a/semantics/src/main/scala/aqua/semantics/expr/func/CatchSem.scala +++ b/semantics/src/main/scala/aqua/semantics/expr/func/CatchSem.scala @@ -24,7 +24,7 @@ class CatchSem[S[_]](val expr: CatchExpr[S]) extends AnyVal { .around( N.beginScope(expr.name) >> L.beginScope() >> - N.define(expr.name, ValueRaw.LastError.baseType), + N.define(expr.name, ValueRaw.lastError.baseType), (_, g: Raw) => N.endScope() >> L.endScope() as ( g match { @@ -32,7 +32,7 @@ class CatchSem[S[_]](val expr: CatchExpr[S]) extends AnyVal { TryTag.Catch .wrap( SeqTag.wrap( - AssignmentTag(ValueRaw.LastError, expr.name.value).leaf, + AssignmentTag(ValueRaw.lastError, expr.name.value).leaf, op ) ) diff --git a/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala b/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala index 08aac4be..72199e10 100644 --- a/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala +++ b/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala @@ -133,7 +133,7 @@ class SemanticsSpec extends AnyFlatSpec with Matchers with Inside { TryTag.wrap( testServiceCallStr("try"), SeqTag.wrap( - AssignmentTag(ValueRaw.LastError, "e").leaf, + AssignmentTag(ValueRaw.lastError, "e").leaf, testServiceCallStr("catch") ) ) @@ -159,11 +159,11 @@ class SemanticsSpec extends AnyFlatSpec with Matchers with Inside { TryTag.wrap( testServiceCallStr("try"), SeqTag.wrap( - AssignmentTag(ValueRaw.LastError, "e").leaf, + AssignmentTag(ValueRaw.lastError, "e").leaf, testServiceCallStr("catch1") ), SeqTag.wrap( - AssignmentTag(ValueRaw.LastError, "e").leaf, + AssignmentTag(ValueRaw.lastError, "e").leaf, testServiceCallStr("catch2") ) )