mirror of
https://github.com/fluencelabs/aqua.git
synced 2024-12-04 22:50:18 +00:00
refactor: Refactor Transform
, add comments (#723)
Refactored transform, added comments
This commit is contained in:
parent
df19ec6734
commit
82f25dd993
@ -1,6 +1,8 @@
|
||||
package aqua.model.transform
|
||||
|
||||
import cats.syntax.show.*
|
||||
import cats.syntax.traverse.*
|
||||
import cats.instances.list.*
|
||||
import aqua.model.inline.ArrowInliner
|
||||
import aqua.model.inline.state.InliningState
|
||||
import aqua.model.transform.funcop.*
|
||||
@ -16,42 +18,83 @@ import cats.data.Chain
|
||||
import cats.free.Cofree
|
||||
import scribe.Logging
|
||||
|
||||
// TODO: doc
|
||||
// API for transforming RawTag to Res
|
||||
object Transform extends Logging {
|
||||
|
||||
// TODO: doc
|
||||
def defaultFilter(t: ResolvedOp): Boolean = t match {
|
||||
private def defaultFilter(t: ResolvedOp): Boolean = t match {
|
||||
case _: NoAir => false
|
||||
case _ => true
|
||||
}
|
||||
|
||||
// TODO: doc
|
||||
def clear(
|
||||
// Purge subtrees of tree for which `filter(head)` is false
|
||||
private def clear(
|
||||
tree: Cofree[Chain, ResolvedOp],
|
||||
filter: ResolvedOp => Boolean = defaultFilter
|
||||
): Cofree[Chain, ResolvedOp] =
|
||||
tree.copy(tail = tree.tail.map(_.filter(t => filter(t.head)).map(clear(_, filter))))
|
||||
|
||||
// TODO: doc/rename
|
||||
def funcRes(func: FuncArrow, conf: TransformConfig): Eval[FuncRes] = {
|
||||
val initCallable: InitPeerCallable = InitViaRelayCallable(
|
||||
Chain.fromOption(conf.relayVarName.map(_ -> ScalarType.string))
|
||||
Cofree.anaEval(tree)(
|
||||
tree =>
|
||||
for {
|
||||
children <- tree.tail
|
||||
filtered = children.filter(child => filter(child.head))
|
||||
} yield filtered,
|
||||
_.head
|
||||
)
|
||||
|
||||
// Apply given preTransformer and inline the function
|
||||
private def funcToModelTree(
|
||||
func: FuncArrow,
|
||||
preTransformer: FuncPreTransformer,
|
||||
funcArgName: String = "_func"
|
||||
): Eval[OpModel.Tree] = {
|
||||
|
||||
/**
|
||||
* preTransform creates function
|
||||
* ```
|
||||
* func funcAround(func: <func.arrowType>):
|
||||
* <retrieve args>
|
||||
* result <- func(<args>)
|
||||
* <pass result to callback>
|
||||
* ```
|
||||
*/
|
||||
val transformed = preTransformer.preTransform(func)
|
||||
|
||||
val funcArg = VarModel(funcArgName, func.arrowType)
|
||||
val call = CallModel(funcArg :: Nil, Nil)
|
||||
|
||||
// <funcArgName> resolves to func
|
||||
val initState = InliningState(resolvedArrows = Map(funcArgName -> func))
|
||||
|
||||
// Inlining `funcAround(<funcArgName>)`
|
||||
ArrowInliner
|
||||
.callArrow[InliningState](transformed, call)
|
||||
.run(initState)
|
||||
.map { case (_, tree) => tree }
|
||||
}
|
||||
|
||||
// Convert FuncArrow to FuncRes with given TransformConfig
|
||||
// Do necessary transformations and inlining
|
||||
def funcRes(func: FuncArrow, conf: TransformConfig): Eval[FuncRes] = {
|
||||
val relayVar = conf.relayVarName.map(_ -> ScalarType.string)
|
||||
|
||||
val initCallable: InitPeerCallable = InitViaRelayCallable(
|
||||
goThrough = Chain.fromOption(relayVar)
|
||||
)
|
||||
|
||||
val errorsCatcher = ErrorsCatcher(
|
||||
enabled = conf.wrapWithXor,
|
||||
conf.errorHandlingCallback,
|
||||
conf.errorFuncName,
|
||||
initCallable
|
||||
serviceId = conf.errorHandlingCallback,
|
||||
funcName = conf.errorFuncName,
|
||||
callable = initCallable
|
||||
)
|
||||
|
||||
val argsProvider: ArgsProvider = ArgsFromService(
|
||||
dataServiceId = conf.dataSrvId,
|
||||
names = relayVar.toList ::: func.arrowType.domain.labelledData
|
||||
)
|
||||
val argsProvider: ArgsProvider =
|
||||
ArgsFromService(
|
||||
conf.dataSrvId,
|
||||
conf.relayVarName.map(_ -> ScalarType.string).toList ::: func.arrowType.domain.labelledData
|
||||
)
|
||||
|
||||
// Transform the body of the function: wrap it with initCallable, provide function arguments via service calls
|
||||
val transform: RawTag.Tree => RawTag.Tree =
|
||||
initCallable.transform _ compose argsProvider.transform
|
||||
argsProvider.transform andThen initCallable.transform
|
||||
|
||||
// Callback on the init peer id, either done via relay or not
|
||||
val callback = initCallable.service(conf.callbackSrvId)
|
||||
@ -62,59 +105,43 @@ object Transform extends Logging {
|
||||
callback,
|
||||
conf.respFuncName
|
||||
)
|
||||
// PreTransformation is done, function is inlined, we have an OpModel.Tree that is ready for topology resolution
|
||||
val preparedFunc = funcToModelTree(func, preTransformer).map(errorsCatcher.transform)
|
||||
|
||||
// Resolve the topology, clear the resulting tree
|
||||
val resultingTree = preparedFunc.flatMap(tree => Topology.resolve(tree).map(clear(_)))
|
||||
|
||||
resultingTree.map(res =>
|
||||
FuncRes(
|
||||
func.funcName,
|
||||
func.argNames,
|
||||
FuncRes.arrowArgs(func.arrowType),
|
||||
func.arrowType.codomain,
|
||||
conf.relayVarName,
|
||||
conf.getDataService,
|
||||
conf.callbackService,
|
||||
conf.respFuncName,
|
||||
conf.errorHandlingService,
|
||||
conf.errorFuncName,
|
||||
res
|
||||
)
|
||||
for {
|
||||
// Pre transform and inline the function
|
||||
model <- funcToModelTree(func, preTransformer)
|
||||
// Post transform the function
|
||||
postModel = errorsCatcher.transform(model)
|
||||
// Resolve topology
|
||||
resolved <- Topology.resolve(postModel)
|
||||
// Clear the tree
|
||||
result = clear(resolved)
|
||||
} yield FuncRes(
|
||||
func.funcName,
|
||||
func.argNames,
|
||||
FuncRes.arrowArgs(func.arrowType),
|
||||
func.arrowType.codomain,
|
||||
conf.relayVarName,
|
||||
conf.getDataService,
|
||||
conf.callbackService,
|
||||
conf.respFuncName,
|
||||
conf.errorHandlingService,
|
||||
conf.errorFuncName,
|
||||
result
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
def funcToModelTree(
|
||||
func: FuncArrow,
|
||||
preTransformer: FuncPreTransformer,
|
||||
funcArgName: String = "_func"
|
||||
): Eval[OpModel.Tree] =
|
||||
ArrowInliner
|
||||
.callArrow[InliningState](
|
||||
preTransformer.preTransform(func),
|
||||
CallModel(VarModel(funcArgName, func.arrowType, Chain.empty) :: Nil, Nil)
|
||||
)
|
||||
.run(
|
||||
InliningState(resolvedArrows = Map(funcArgName -> func))
|
||||
)
|
||||
.map(_._2)
|
||||
// Convert AquaContext to AquaRes with the given TransformConfig
|
||||
def contextRes(ex: AquaContext, conf: TransformConfig): AquaRes = {
|
||||
val funcResults = ex.funcs.toList.traverse { case (fnName, fn) =>
|
||||
funcRes(fn.copy(funcName = fnName), conf)
|
||||
}
|
||||
val serviceResults = ex.services.toList.map { case (srvName, srv) =>
|
||||
ServiceRes.fromModel(srv.copy(name = srvName))
|
||||
}
|
||||
|
||||
def contextRes(ex: AquaContext, conf: TransformConfig): AquaRes =
|
||||
AquaRes(
|
||||
funcs = Chain
|
||||
.fromSeq(ex.funcs.map { case (fnName, fn) =>
|
||||
fn.copy(funcName = fnName)
|
||||
}.toSeq)
|
||||
.map(
|
||||
// TODO: keeep Eval
|
||||
funcRes(_, conf).value
|
||||
),
|
||||
services = Chain
|
||||
.fromSeq(ex.services.map { case (srvName, srv) =>
|
||||
srv.copy(name = srvName)
|
||||
}.toSeq)
|
||||
.map(ServiceRes.fromModel)
|
||||
funcs = Chain.fromSeq(funcResults.value),
|
||||
services = Chain.fromSeq(serviceResults)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user