diff --git a/aqua-src/antithesis.aqua b/aqua-src/antithesis.aqua index 446f57b9..b9fff8f6 100644 --- a/aqua-src/antithesis.aqua +++ b/aqua-src/antithesis.aqua @@ -1,33 +1,92 @@ -aqua M +aqua A -export returnSrvAsAbility +export get_logs -ability MyAb: - call() -> string +service Op("op"): + id(s1: string) + identity(s: string) -> string -service MySrv("default-id"): - call() -> string +service MyOp("op"): + id(s1: string) + identity(s: string) -> string -func mySrvDefault() -> MyAb: - <- MySrv +func get_logs(a: string): + if a == "sdf": + streamA <- Op.identity("some serv") + Op.id(streamA) + streamA: *string + streamA <- Op.identity("stream") -func mySrvResolved() -> MyAb: - MySrv "resolved-id" - <- MySrv - -func mySrvThird() -> MyAb: - MySrv "third-id" - <- MySrv - -func useMyAb{MyAb}() -> string: - <- MyAb.call() - -func returnSrvAsAbility() -> []string: - result: *string - MySrvDefault <- mySrvDefault() - MySrvResolved <- mySrvResolved() - MySrvThird <- mySrvThird() - result <- useMyAb{MySrvDefault}() - result <- useMyAb{MySrvResolved}() - result <- useMyAb{MySrvThird}() - <- result \ No newline at end of file +-- ability WorkerJob: +-- runOnSingleWorker(w: string) -> []string +-- +-- func runJob(j: -> []string) -> []string: +-- <- j() +-- +-- func disjoint_run{WorkerJob}() -> -> []string: +-- run = func () -> []string: +-- r <- WorkerJob.runOnSingleWorker("a") +-- <- r +-- <- run +-- +-- func empty() -> string: +-- a = "empty" +-- <- a +-- +-- func lng317Bug() -> []string: +-- +-- res: *string +-- +-- outer = () -> string: +-- <- empty() +-- +-- clos = () -> -> []string: +-- job2 = () -> []string: +-- res <- outer() +-- res <- MyOp.identity("identity") +-- <- res +-- <- job2 +-- worker_job = WorkerJob(runOnSingleWorker = clos()) +-- subnet_job <- disjoint_run{worker_job}() +-- finalRes <- runJob(subnet_job) +-- <- finalRes +-- +-- ability Job: +-- run(s: string) -> string +-- +-- func disrun(getJob: -> Job) -> Job: +-- j <- getJob() +-- <- j +-- +-- func lng325Bug() -> string: +-- brokenStream: *string +-- +-- job = () -> Job: +-- run = (str: string) -> string: +-- brokenStream <- MyOp.identity(str) +-- <- "run" +-- +-- <- Job(run = run) +-- +-- subnetJob <- disrun(job) +-- subnetJob.run("firstStream string") +-- <- brokenStream! +-- +-- func secondStream() -> string: +-- brokenStream: *string +-- +-- secondJob = () -> Job: +-- secondRun = (str: string) -> string: +-- brokenStream <- MyOp.identity(str) +-- <- "run" +-- +-- <- Job(run = secondRun) +-- +-- subnetJob <- disrun(secondJob) +-- subnetJob.run("secondStream string") +-- <- brokenStream! +-- +-- func lng325BugTwoFuncs() -> string, string: +-- res1 <- lng325Bug() +-- res2 <- secondStream() +-- <- res1, res2 \ No newline at end of file diff --git a/backend/air/src/main/scala/aqua/backend/air/Air.scala b/backend/air/src/main/scala/aqua/backend/air/Air.scala index b619b29a..a18b1f0a 100644 --- a/backend/air/src/main/scala/aqua/backend/air/Air.scala +++ b/backend/air/src/main/scala/aqua/backend/air/Air.scala @@ -144,7 +144,7 @@ object Air { showNext(inst) sb.append(space) case Air.Fold(iter, label, inst, lastInst) ⇒ - sb.append(" ").append(s" ${iter.show} $label\n") + sb.append(s" ${iter.show} $label\n") showNext(inst) showNext(lastInst) sb.append(space) diff --git a/integration-tests/aqua/examples/closures.aqua b/integration-tests/aqua/examples/closures.aqua index 0c3b4257..f0c17e12 100644 --- a/integration-tests/aqua/examples/closures.aqua +++ b/integration-tests/aqua/examples/closures.aqua @@ -1,6 +1,7 @@ aqua Closure declares * export LocalSrv, closureIn, closureOut, closureBig, closureOut2, lng58Bug, multipleClosuresBugLNG262, lng317Bug +export lng325Bug, lng325BugTwoFuncs import "@fluencelabs/aqua-lib/builtin.aqua" @@ -114,4 +115,44 @@ func lng317Bug() -> []string: worker_job = WorkerJob(runOnSingleWorker = clos()) subnet_job <- disjoint_run{worker_job}() finalRes <- runJob(subnet_job) - <- finalRes \ No newline at end of file + <- finalRes + +ability Job: + run(s: string) -> string + +func disrun(getJob: -> Job) -> Job: + j <- getJob() + <- j + +func lng325Bug() -> string: + brokenStream: *string + + job = () -> Job: + run = (str: string) -> string: + brokenStream <- MyOp.identity(str) + <- "run" + + <- Job(run = run) + + subnetJob <- disrun(job) + subnetJob.run("firstStream string") + <- brokenStream! + +func secondStream() -> string: + brokenStream: *string + + secondJob = () -> Job: + secondRun = (str: string) -> string: + brokenStream <- MyOp.identity(str) + <- "run" + + <- Job(run = secondRun) + + subnetJob <- disrun(secondJob) + subnetJob.run("secondStream string") + <- brokenStream! + +func lng325BugTwoFuncs() -> string, string: + res1 <- lng325Bug() + res2 <- secondStream() + <- res1, res2 \ No newline at end of file diff --git a/integration-tests/src/__test__/examples.spec.ts b/integration-tests/src/__test__/examples.spec.ts index dce66e92..76e7a286 100644 --- a/integration-tests/src/__test__/examples.spec.ts +++ b/integration-tests/src/__test__/examples.spec.ts @@ -122,6 +122,8 @@ import { closuresCall, multipleClosuresLNG262BugCall, lng317BugCall, + lng325BugCall, + lng325BugTwoFuncsCall } from "../examples/closures.js"; import { closureArrowCaptureCall } from "../examples/closureArrowCapture.js"; import { @@ -1106,6 +1108,16 @@ describe("Testing examples", () => { expect(result).toEqual(["empty", "identity"]); }); + it("closures.aqua bug LNG-325", async () => { + let result = await lng325BugCall(); + expect(result).toEqual("firstStream string"); + }); + + it("closures.aqua bug LNG-325 two functions", async () => { + let result = await lng325BugTwoFuncsCall(); + expect(result).toEqual(["firstStream string", "secondStream string"]); + }); + it("closureArrowCapture.aqua", async () => { let result = await closureArrowCaptureCall("input"); expect(result).toEqual("call: ".repeat(4) + "input"); diff --git a/integration-tests/src/examples/closures.ts b/integration-tests/src/examples/closures.ts index 4c3aaca6..70c32916 100644 --- a/integration-tests/src/examples/closures.ts +++ b/integration-tests/src/examples/closures.ts @@ -6,7 +6,9 @@ import { closureOut2, lng58Bug, lng317Bug, - multipleClosuresBugLNG262 + multipleClosuresBugLNG262, + lng325Bug, + lng325BugTwoFuncs } from "../compiled/examples/closures.js"; import { config } from "../config.js"; @@ -42,3 +44,11 @@ export async function multipleClosuresLNG262BugCall(): Promise<[number, number]> export async function lng317BugCall(): Promise { return lng317Bug(); } + +export async function lng325BugCall(): Promise { + return lng325Bug(); +} + +export async function lng325BugTwoFuncsCall(): Promise<[string, string]> { + return lng325BugTwoFuncs(); +} diff --git a/model/inline/src/main/scala/aqua/model/inline/RawValueInliner.scala b/model/inline/src/main/scala/aqua/model/inline/RawValueInliner.scala index 365c5bfa..d0482fd8 100644 --- a/model/inline/src/main/scala/aqua/model/inline/RawValueInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/RawValueInliner.scala @@ -113,20 +113,34 @@ object RawValueInliner extends Logging { call: Call, flatStreamArguments: Boolean ): State[S, (CallModel, Option[OpModel.Tree])] = { - valueListToModel(call.args).flatMap { args => - if (flatStreamArguments) - args.map { arg => - TagInliner.flat(arg._1, arg._2) - }.sequence - else - State.pure(args) - }.map { list => + for { + args <- valueListToModel(call.args) + args <- { + if (flatStreamArguments) + args.traverse(TagInliner.flat.tupled) + else + State.pure(args) + } + exportTo <- call.exportTo.traverse { + case c@Call.Export(_, _, isExistingStream) if isExistingStream => + // process streams, because they can be stored in Exports outside function/closure with different name + valueToModel(c.toRaw) + case ce => + State.pure((VarModel(ce.name, ce.`type`), None)) + } + } yield { + val (argsVars, argsOps) = args.unzip.map(_.flatten) + val (exportVars, exportOps) = exportTo.unzip.map(_.flatten) + val exportModel = exportVars.collect { + // exportTo can be only a variable + case VarModel(name, baseType, _) => CallModel.Export(name, baseType) + } ( CallModel( - list.map(_._1), - call.exportTo.map(CallModel.callExport) + argsVars, + exportModel ), - parDesugarPrefix(list.flatMap(_._2)) + parDesugarPrefix(exportOps ++ argsOps) ) } } diff --git a/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala b/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala index e3146b17..efaade71 100644 --- a/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/TagInliner.scala @@ -403,12 +403,8 @@ object TagInliner extends Logging { case DeclareStreamTag(value) => value match - case VarRaw(name, _) => - for { - cd <- valueToModel(value) - (vm, prefix) = cd - _ <- Exports[S].resolved(name, vm) - } yield TagInlined.Empty(prefix = prefix) + case VarRaw(name, t) => + Exports[S].resolved(name, VarModel(name, t)).as(TagInlined.Empty()) case _ => none case ServiceIdTag(id, serviceType, name) => diff --git a/model/inline/src/main/scala/aqua/model/inline/tag/IfTagInliner.scala b/model/inline/src/main/scala/aqua/model/inline/tag/IfTagInliner.scala index e563c609..cedc8202 100644 --- a/model/inline/src/main/scala/aqua/model/inline/tag/IfTagInliner.scala +++ b/model/inline/src/main/scala/aqua/model/inline/tag/IfTagInliner.scala @@ -8,17 +8,17 @@ import aqua.model.inline.state.{Arrows, Exports, Mangler} import aqua.model.inline.RawValueInliner.valueToModel import aqua.model.inline.TagInliner.canonicalizeIfStream import aqua.model.inline.Inline.parDesugarPrefixOpt - -import cats.data.Chain +import cats.data.{Chain, State} import cats.syntax.flatMap.* import cats.syntax.apply.* +import cats.Eval final case class IfTagInliner( valueRaw: ValueRaw ) { import IfTagInliner.* - def inlined[S: Mangler: Exports: Arrows] = + def inlined[S: Mangler: Exports: Arrows]: State[S, IfTagInlined] = (valueRaw match { // Optimize in case last operation is equality check case ApplyBinaryOpRaw(op @ (BinOp.Eq | BinOp.Neq), left, right, _) => diff --git a/model/raw/src/main/scala/aqua/raw/ops/Call.scala b/model/raw/src/main/scala/aqua/raw/ops/Call.scala index 7a2a3c24..cbbbafb0 100644 --- a/model/raw/src/main/scala/aqua/raw/ops/Call.scala +++ b/model/raw/src/main/scala/aqua/raw/ops/Call.scala @@ -1,5 +1,6 @@ package aqua.raw.ops +import aqua.errors.Errors.internalError import aqua.raw.value.{ValueRaw, VarRaw} import aqua.types.{ArrowType, ProductType, Type} @@ -27,9 +28,26 @@ case class Call(args: List[ValueRaw], exportTo: List[Call.Export]) { object Call { // TODO docs - case class Export(name: String, `type`: Type) { + case class Export(name: String, `type`: Type, isExistingStream: Boolean = false) { def mapName(f: String => String): Export = copy(f(name)) + def mapStream(f: ValueRaw => ValueRaw): Call.Export = + this match { + // map streams from "exportTo", because they are not exports, but variables + case ce @ Call.Export(_, _, true) => + f(ce.toRaw) match { + case VarRaw(name, baseType) => Call.Export(name, baseType, true) + case _ => internalError(s"Stream '$ce' can be only VarRaw") + } + case ce => ce + } + + def renameNonStream(map: Map[String, String]): Call.Export = + this match { + case ce @ Call.Export(_, _, true) => ce + case ce => ce.mapName(n => map.getOrElse(n, n)) + } + def toRaw: VarRaw = VarRaw(name, `type`) override def toString: String = s"$name:${`type`} <-" diff --git a/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala b/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala index 4dd480f3..2991d0ca 100644 --- a/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala +++ b/model/raw/src/main/scala/aqua/raw/ops/RawTag.scala @@ -1,11 +1,9 @@ package aqua.raw.ops import aqua.raw.arrow.FuncRaw -import aqua.raw.ops.RawTag.Tree -import aqua.raw.value.{CallArrowRaw, CallServiceRaw, ValueRaw} +import aqua.raw.value.{CallArrowRaw, CallServiceRaw, ValueRaw, VarRaw} import aqua.tree.{TreeNode, TreeNodeCompanion} import aqua.types.* - import cats.Show import cats.data.{Chain, NonEmptyList} import cats.free.Cofree @@ -208,15 +206,20 @@ case class CallArrowRawTag( value: ValueRaw ) extends RawTag { - override def exportsVarNames: Set[String] = exportTo.map(_.name).toSet + private lazy val usesExportStreams = exportTo.collect { case Call.Export(name, _, true) => + name + }.toSet - override def usesVarNames: Set[String] = value.varNames + // don't use existing streams in exports + override def exportsVarNames: Set[String] = exportTo.map(_.name).toSet -- usesExportStreams + + override def usesVarNames: Set[String] = value.varNames ++ usesExportStreams override def mapValues(f: ValueRaw => ValueRaw): RawTag = - CallArrowRawTag(exportTo, value.map(f)) + CallArrowRawTag(exportTo.map(_.mapStream(f)), value.map(f)) override def renameExports(map: Map[String, String]): RawTag = - copy(exportTo = exportTo.map(_.mapName(n => map.getOrElse(n, n)))) + copy(exportTo = exportTo.map(_.renameNonStream(map))) } object CallArrowRawTag { diff --git a/model/src/main/scala/aqua/model/FuncArrow.scala b/model/src/main/scala/aqua/model/FuncArrow.scala index 98a7b9f9..c2be05af 100644 --- a/model/src/main/scala/aqua/model/FuncArrow.scala +++ b/model/src/main/scala/aqua/model/FuncArrow.scala @@ -3,8 +3,7 @@ package aqua.model import aqua.raw.arrow.FuncRaw import aqua.raw.ops.{Call, CallArrowRawTag, RawTag} import aqua.raw.value.{ValueRaw, VarRaw} -import aqua.types.{ArrowType, Type} - +import aqua.types.{ArrowType, MutableStreamType, Type} import cats.syntax.option.* case class FuncArrow( @@ -73,7 +72,9 @@ object FuncArrow { val call = Call( methodType.domain.toLabelledList().map(VarRaw.apply), - retVar.map(r => Call.Export(r.name, r.`type`)).toList + retVar.map { r => + Call.Export(r.name, r.`type`, Type.isStreamType(r.`type`)) + }.toList ) val body = CallArrowRawTag.service( diff --git a/semantics/src/main/scala/aqua/semantics/expr/func/CallArrowSem.scala b/semantics/src/main/scala/aqua/semantics/expr/func/CallArrowSem.scala index 6d291d13..402c3a4d 100644 --- a/semantics/src/main/scala/aqua/semantics/expr/func/CallArrowSem.scala +++ b/semantics/src/main/scala/aqua/semantics/expr/func/CallArrowSem.scala @@ -31,7 +31,7 @@ class CallArrowSem[S[_]](val expr: CallArrowExpr[S]) extends AnyVal { (variables zip codomain.toList).traverse { case (v, t) => N.read(v, mustBeDefined = false).flatMap { case Some(stream @ StreamType(st)) => - T.ensureTypeMatches(v, st, t).as(Call.Export(v.value, stream)) + T.ensureTypeMatches(v, st, t).as(Call.Export(v.value, stream, isExistingStream = true)) case _ => N.define(v, t).as(Call.Export(v.value, t)) } diff --git a/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala b/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala index 257164d9..67604f16 100644 --- a/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala +++ b/semantics/src/test/scala/aqua/semantics/SemanticsSpec.scala @@ -636,7 +636,7 @@ class SemanticsSpec extends AnyFlatSpec with Matchers with Inside { stream.`type` shouldBe StreamType(ScalarType.i32) matchChildren(forTag) { case (ParTag, parTag) => matchChildren(parTag)( - { case (PushToStreamTag(VarRaw(varName, _), Call.Export(streamName, _)), _) => + { case (PushToStreamTag(VarRaw(varName, _), Call.Export(streamName, _, _)), _) => varName shouldBe "i" streamName shouldBe "stream" }, diff --git a/types/src/main/scala/aqua/types/Type.scala b/types/src/main/scala/aqua/types/Type.scala index 25fd1199..d62ae3c8 100644 --- a/types/src/main/scala/aqua/types/Type.scala +++ b/types/src/main/scala/aqua/types/Type.scala @@ -519,6 +519,12 @@ object Type { * `StreamType` is collectible with canonicalization */ type CollectibleType = DataType | StreamType + + def isStreamType(t: Type): Boolean = + t match { + case _: MutableStreamType => true + case _ => false + } given PartialOrder[Type] = CompareTypes.partialOrder diff --git a/utils/mangler/src/main/scala/aqua/mangler/ManglerState.scala b/utils/mangler/src/main/scala/aqua/mangler/ManglerState.scala index 2cad6086..1b0f17cd 100644 --- a/utils/mangler/src/main/scala/aqua/mangler/ManglerState.scala +++ b/utils/mangler/src/main/scala/aqua/mangler/ManglerState.scala @@ -2,8 +2,6 @@ package aqua.mangler import cats.Monoid -import scala.annotation.tailrec - case class ManglerState(namesNumbers: Map[String, Int] = Map.empty) { private def genName(name: String, n: Int) =