200 202 stream resolving (#213)

This commit is contained in:
Dima 2021-07-27 09:58:33 +03:00 committed by GitHub
parent 3bbf089e87
commit 81cd3a2c71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 22 deletions

View File

@ -1,9 +1,21 @@
service OpH("oph"):
get_str() -> string
data DT:
field: string
func create_client_util() -> []string:
service DTGetter("get-dt"):
get_dt(s: string) -> DT
func use_name1(name: string) -> string:
results <- DTGetter.get_dt(name)
<- results.field
func use_name2(name: string) -> []string:
results: *string
results <<- "hello"
str <- OpH.get_str()
results <<- str
results <- use_name1(name)
results <- use_name1(name)
results <- use_name1(name)
<- results
func use_name3(name: string) -> []string:
DTGetter.get_dt("yoyo literal")
results <- use_name2(name)
<- results

View File

@ -1,6 +1,6 @@
package aqua.model
import aqua.types.{DataType, ProductType, ScalarType, StreamType, Type}
import aqua.types._
import cats.Eq
import cats.data.{Chain, NonEmptyMap}
import wvlet.log.LogSupport
@ -51,7 +51,7 @@ case class VarModel(name: String, `type`: Type, lambda: Chain[LambdaModel] = Cha
override val lastType: Type = lambda.lastOption.map(_.`type`).getOrElse(`type`)
override def resolveWith(map: Map[String, ValueModel]): ValueModel = {
override def resolveWith(map: Map[String, ValueModel]): ValueModel =
map.get(name) match {
case Some(vv: VarModel) =>
map.get(vv.name) match {
@ -95,7 +95,6 @@ case class VarModel(name: String, `type`: Type, lambda: Chain[LambdaModel] = Cha
case None => this // Should not happen
}
}
}
object VarModel {

View File

@ -1,8 +1,9 @@
package aqua.model.func
import aqua.model.func.raw.{AssignmentTag, CallArrowTag, FuncOp, RawTag}
import aqua.model.ValueModel.varName
import aqua.model.func.raw._
import aqua.model.{Model, ValueModel, VarModel}
import aqua.types.{ArrowType, Type}
import aqua.types.{ArrowType, StreamType, Type}
import cats.Eval
import cats.data.Chain
import cats.free.Cofree
@ -35,6 +36,14 @@ case class FuncCallable(
.head)
}
def extractStreamArgs(args: Map[String, ValueModel]): Map[String, ValueModel] =
args.filter { arg =>
arg._2.`type` match {
case StreamType(_) => true
case _ => false
}
}
// Apply a callable function, get its fully resolved body & optional value, if any
def resolve(
call: Call,
@ -51,6 +60,13 @@ case class FuncCallable(
// Arrow arguments: expected type is Arrow, given by-name
val argsToArrowsRaw = argsFull.arrowArgs(arrows)
// collect arguments with stream type
// to exclude it from resolving and rename it with a higher-level stream that passed by argument
val streamArgs = extractStreamArgs(argsToDataRaw)
val streamToRename = streamArgs.map { case (k, v) => (k, varName(v)) }.collect {
case (k, Some(v)) => (k, v)
}
// Find all duplicates in arguments
val argsShouldRename = findNewNames(forbiddenNames, (argsToDataRaw ++ argsToArrowsRaw).keySet)
val argsToData = argsToDataRaw.map { case (k, v) => argsShouldRename.getOrElse(k, k) -> v }
@ -61,13 +77,21 @@ case class FuncCallable(
// Substitute arguments (referenced by name and optional lambda expressions) with values
// Also rename all renamed arguments in the body
val treeWithValues = body.rename(argsShouldRename).resolveValues(argsToData)
val treeWithValues = body.rename(argsShouldRename ++ streamToRename).resolveValues(argsToData)
// Function body on its own defines some values; collect their names
val treeDefines = treeWithValues.definesVarNames.value -- call.exportTo.map(_.name)
// except stream arguments. They should be already renamed
val treeDefines =
treeWithValues.definesVarNames.value -- streamArgs.keySet -- call.exportTo.filter { exp =>
exp.`type` match {
case StreamType(_) => false
case _ => true
}
}.map(_.name)
// We have some names in scope (forbiddenNames), can't introduce them again; so find new names
val shouldRename = findNewNames(forbiddenNames, treeDefines)
// If there was a collision, rename exports and usages with new names
val treeRenamed = treeWithValues.rename(shouldRename)
@ -129,9 +153,25 @@ case class FuncCallable(
Eval.now(Chain.empty)
)
}
.map { case ((_, resolvedExports), b) =>
.map { case ((_, resolvedExports), callableFuncBody) =>
// If return value is affected by any of internal functions, resolve it
FuncOp(b) -> result.map(_.resolveWith(resolvedExports))
(for {
exp <- call.exportTo
res <- result
pair <- exp match {
case Call.Export(name, StreamType(_)) =>
val resolved = res.resolveWith(resolvedExports)
// path nested function results to a stream
Some(
FuncOps.seq(FuncOp(callableFuncBody), FuncOps.identity(resolved, exp)) -> Some(
exp.model
)
)
case _ => None
}
} yield {
pair
}).getOrElse(FuncOp(callableFuncBody) -> result.map(_.resolveWith(resolvedExports)))
}
}

View File

@ -1,9 +1,9 @@
package aqua.model.transform
import aqua.model.func.raw.{FuncOp, FuncOps}
import aqua.model.func._
import aqua.model.func.raw.{FuncOp, FuncOps}
import aqua.model.{ValueModel, VarModel}
import aqua.types.ArrowType
import aqua.types.{ArrayType, ArrowType, StreamType}
import cats.Eval
import cats.syntax.apply._
@ -15,6 +15,8 @@ case class ResolveFunc(
arrowCallbackPrefix: String = "init_peer_callable_"
) {
private val returnVar: String = "-return-"
def returnCallback(retModel: ValueModel): FuncOp =
callback(
respFuncName,
@ -36,7 +38,13 @@ case class ResolveFunc(
)
}
def wrap(func: FuncCallable): FuncCallable =
def wrap(func: FuncCallable): FuncCallable = {
val returnType = func.ret.map(_._1.lastType).map {
// we mustn't return a stream in response callback to avoid pushing stream to `-return-` value
case StreamType(t) => ArrayType(t)
case t => t
}
FuncCallable(
wrapCallableName,
transform(
@ -46,12 +54,11 @@ case class ResolveFunc(
func.funcName,
Call(
func.args.toCallArgs,
func.ret.map(rmv => Call.Export("-return-", rmv._1.lastType))
returnType.map(t => Call.Export(returnVar, t))
)
) ::
func.ret
.map(_._1)
.map(rmv => VarModel("-return-", rmv.lastType))
returnType
.map(t => VarModel(returnVar, t))
.map(returnCallback)
.toList: _*
)
@ -63,6 +70,7 @@ case class ResolveFunc(
}.toList.toMap,
Map.empty
)
}
def resolve(
func: FuncCallable,