From 81cd3a2c71610385290ac5a5e6e476de76ec0837 Mon Sep 17 00:00:00 2001 From: Dima Date: Tue, 27 Jul 2021 09:58:33 +0300 Subject: [PATCH] 200 202 stream resolving (#213) --- aqua-src/test.aqua | 24 ++++++--- .../main/scala/aqua/model/ValueModel.scala | 5 +- .../scala/aqua/model/func/FuncCallable.scala | 52 ++++++++++++++++--- .../aqua/model/transform/ResolveFunc.scala | 22 +++++--- 4 files changed, 81 insertions(+), 22 deletions(-) diff --git a/aqua-src/test.aqua b/aqua-src/test.aqua index 277b658b..cd11cd10 100644 --- a/aqua-src/test.aqua +++ b/aqua-src/test.aqua @@ -1,9 +1,21 @@ -service OpH("oph"): - get_str() -> string +data DT: + field: string -func create_client_util() -> []string: +service DTGetter("get-dt"): + get_dt(s: string) -> DT + +func use_name1(name: string) -> string: + results <- DTGetter.get_dt(name) + <- results.field + +func use_name2(name: string) -> []string: results: *string - results <<- "hello" - str <- OpH.get_str() - results <<- str + results <- use_name1(name) + results <- use_name1(name) + results <- use_name1(name) <- results + +func use_name3(name: string) -> []string: + DTGetter.get_dt("yoyo literal") + results <- use_name2(name) + <- results \ No newline at end of file diff --git a/model/src/main/scala/aqua/model/ValueModel.scala b/model/src/main/scala/aqua/model/ValueModel.scala index b179e9e4..7e2e1d01 100644 --- a/model/src/main/scala/aqua/model/ValueModel.scala +++ b/model/src/main/scala/aqua/model/ValueModel.scala @@ -1,6 +1,6 @@ package aqua.model -import aqua.types.{DataType, ProductType, ScalarType, StreamType, Type} +import aqua.types._ import cats.Eq import cats.data.{Chain, NonEmptyMap} import wvlet.log.LogSupport @@ -51,7 +51,7 @@ case class VarModel(name: String, `type`: Type, lambda: Chain[LambdaModel] = Cha override val lastType: Type = lambda.lastOption.map(_.`type`).getOrElse(`type`) - override def resolveWith(map: Map[String, ValueModel]): ValueModel = { + override def resolveWith(map: Map[String, ValueModel]): ValueModel = map.get(name) match { case Some(vv: VarModel) => map.get(vv.name) match { @@ -94,7 +94,6 @@ case class VarModel(name: String, `type`: Type, lambda: Chain[LambdaModel] = Cha case Some(vv) => vv // TODO check that lambda is empty, otherwise error case None => this // Should not happen } - } } object VarModel { diff --git a/model/src/main/scala/aqua/model/func/FuncCallable.scala b/model/src/main/scala/aqua/model/func/FuncCallable.scala index a5ebcc27..a0e756db 100644 --- a/model/src/main/scala/aqua/model/func/FuncCallable.scala +++ b/model/src/main/scala/aqua/model/func/FuncCallable.scala @@ -1,8 +1,9 @@ package aqua.model.func -import aqua.model.func.raw.{AssignmentTag, CallArrowTag, FuncOp, RawTag} +import aqua.model.ValueModel.varName +import aqua.model.func.raw._ import aqua.model.{Model, ValueModel, VarModel} -import aqua.types.{ArrowType, Type} +import aqua.types.{ArrowType, StreamType, Type} import cats.Eval import cats.data.Chain import cats.free.Cofree @@ -35,6 +36,14 @@ case class FuncCallable( .head) } + def extractStreamArgs(args: Map[String, ValueModel]): Map[String, ValueModel] = + args.filter { arg => + arg._2.`type` match { + case StreamType(_) => true + case _ => false + } + } + // Apply a callable function, get its fully resolved body & optional value, if any def resolve( call: Call, @@ -51,6 +60,13 @@ case class FuncCallable( // Arrow arguments: expected type is Arrow, given by-name val argsToArrowsRaw = argsFull.arrowArgs(arrows) + // collect arguments with stream type + // to exclude it from resolving and rename it with a higher-level stream that passed by argument + val streamArgs = extractStreamArgs(argsToDataRaw) + val streamToRename = streamArgs.map { case (k, v) => (k, varName(v)) }.collect { + case (k, Some(v)) => (k, v) + } + // Find all duplicates in arguments val argsShouldRename = findNewNames(forbiddenNames, (argsToDataRaw ++ argsToArrowsRaw).keySet) val argsToData = argsToDataRaw.map { case (k, v) => argsShouldRename.getOrElse(k, k) -> v } @@ -61,13 +77,21 @@ case class FuncCallable( // Substitute arguments (referenced by name and optional lambda expressions) with values // Also rename all renamed arguments in the body - val treeWithValues = body.rename(argsShouldRename).resolveValues(argsToData) + val treeWithValues = body.rename(argsShouldRename ++ streamToRename).resolveValues(argsToData) // Function body on its own defines some values; collect their names - val treeDefines = treeWithValues.definesVarNames.value -- call.exportTo.map(_.name) + // except stream arguments. They should be already renamed + val treeDefines = + treeWithValues.definesVarNames.value -- streamArgs.keySet -- call.exportTo.filter { exp => + exp.`type` match { + case StreamType(_) => false + case _ => true + } + }.map(_.name) // We have some names in scope (forbiddenNames), can't introduce them again; so find new names val shouldRename = findNewNames(forbiddenNames, treeDefines) + // If there was a collision, rename exports and usages with new names val treeRenamed = treeWithValues.rename(shouldRename) @@ -129,9 +153,25 @@ case class FuncCallable( Eval.now(Chain.empty) ) } - .map { case ((_, resolvedExports), b) => + .map { case ((_, resolvedExports), callableFuncBody) => // If return value is affected by any of internal functions, resolve it - FuncOp(b) -> result.map(_.resolveWith(resolvedExports)) + (for { + exp <- call.exportTo + res <- result + pair <- exp match { + case Call.Export(name, StreamType(_)) => + val resolved = res.resolveWith(resolvedExports) + // path nested function results to a stream + Some( + FuncOps.seq(FuncOp(callableFuncBody), FuncOps.identity(resolved, exp)) -> Some( + exp.model + ) + ) + case _ => None + } + } yield { + pair + }).getOrElse(FuncOp(callableFuncBody) -> result.map(_.resolveWith(resolvedExports))) } } diff --git a/model/src/main/scala/aqua/model/transform/ResolveFunc.scala b/model/src/main/scala/aqua/model/transform/ResolveFunc.scala index 4d8289b1..f7d38c74 100644 --- a/model/src/main/scala/aqua/model/transform/ResolveFunc.scala +++ b/model/src/main/scala/aqua/model/transform/ResolveFunc.scala @@ -1,9 +1,9 @@ package aqua.model.transform -import aqua.model.func.raw.{FuncOp, FuncOps} import aqua.model.func._ +import aqua.model.func.raw.{FuncOp, FuncOps} import aqua.model.{ValueModel, VarModel} -import aqua.types.ArrowType +import aqua.types.{ArrayType, ArrowType, StreamType} import cats.Eval import cats.syntax.apply._ @@ -15,6 +15,8 @@ case class ResolveFunc( arrowCallbackPrefix: String = "init_peer_callable_" ) { + private val returnVar: String = "-return-" + def returnCallback(retModel: ValueModel): FuncOp = callback( respFuncName, @@ -36,7 +38,13 @@ case class ResolveFunc( ) } - def wrap(func: FuncCallable): FuncCallable = + def wrap(func: FuncCallable): FuncCallable = { + val returnType = func.ret.map(_._1.lastType).map { + // we mustn't return a stream in response callback to avoid pushing stream to `-return-` value + case StreamType(t) => ArrayType(t) + case t => t + } + FuncCallable( wrapCallableName, transform( @@ -46,12 +54,11 @@ case class ResolveFunc( func.funcName, Call( func.args.toCallArgs, - func.ret.map(rmv => Call.Export("-return-", rmv._1.lastType)) + returnType.map(t => Call.Export(returnVar, t)) ) ) :: - func.ret - .map(_._1) - .map(rmv => VarModel("-return-", rmv.lastType)) + returnType + .map(t => VarModel(returnVar, t)) .map(returnCallback) .toList: _* ) @@ -63,6 +70,7 @@ case class ResolveFunc( }.toList.toMap, Map.empty ) + } def resolve( func: FuncCallable,