Async fluence js (#334)

This commit is contained in:
Dima 2021-10-21 16:47:04 +03:00 committed by GitHub
parent 058a83fa7f
commit 2f1d5a0760
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 203 additions and 243 deletions

View File

@ -0,0 +1,136 @@
package aqua.backend
import aqua.types.{ArrowType, OptionType, ProductType, Type}
import io.circe.*
import io.circe.generic.auto.*
import io.circe.parser.*
import io.circe.syntax.*
import scala.annotation.tailrec
// Represents the Aqua types
sealed trait TypeDefinition {
def tag: String
}
object TypeDefinition {
implicit val encodeDefType: Encoder[TypeDefinition] = {
case d@(OptionalType | VoidType | PrimitiveType) =>
Json.obj(
("tag", Json.fromString(d.tag))
)
case d@CallbackType(cDef) =>
Json.obj(
("tag", Json.fromString(d.tag)),
("callback", cDef.asJson)
)
case d@MultiReturnType(returnItems) =>
Json.obj(
("tag", Json.fromString(d.tag)),
("returnItems", returnItems.asJson)
)
}
def apply(t: Option[Type]): TypeDefinition = t.map(apply).getOrElse(VoidType)
def apply(t: Type): TypeDefinition = {
t match {
case OptionType(t) =>
OptionalType
case pt: ProductType =>
MultiReturnType(pt.toList.map(TypeDefinition.apply))
case _ => PrimitiveType
}
}
}
case object OptionalType extends TypeDefinition { val tag = "optional" }
case object VoidType extends TypeDefinition { val tag = "void" }
case object PrimitiveType extends TypeDefinition { val tag = "primitive" }
case class CallbackType(cDef: CallbackDefinition) extends TypeDefinition { val tag = "callback" }
case class MultiReturnType(returnItems: List[TypeDefinition]) extends TypeDefinition {
val tag = "multiReturn"
}
// Describes callbacks that passes as arguments in functions
case class CallbackDefinition(argDefs: List[ArgDefinition], returnType: TypeDefinition)
object CallbackDefinition {
implicit val encodeCallbackDef: Encoder[CallbackDefinition] = (cbDef: CallbackDefinition) => Json.obj(
("argDefs", cbDef.argDefs.asJson),
("returnType", cbDef.returnType.asJson)
)
def apply(arrow: ArrowType): CallbackDefinition = {
val args = arrow.domain.toLabelledList().map(arg => ArgDefinition.argToDef(arg._1, arg._2))
val returns = arrow.codomain.toList
val returnType = returns match {
case head :: Nil =>
TypeDefinition(head)
case Nil =>
VoidType
case _ =>
MultiReturnType(returns.map(TypeDefinition.apply))
}
CallbackDefinition(args, returnType)
}
}
// Describes arguments in functions and callbacks
case class ArgDefinition(
name: String,
argType: TypeDefinition
)
object ArgDefinition {
@tailrec
def argToDef(name: String, `type`: Type, isOptional: Boolean = false): ArgDefinition = {
`type` match {
case OptionType(t) =>
argToDef(name, t, isOptional = true)
case a @ ArrowType(_, _) =>
val callbackDef = CallbackDefinition(a)
ArgDefinition(name, CallbackType(callbackDef))
case _ => ArgDefinition(name, if (isOptional) OptionalType else PrimitiveType)
}
}
implicit val encodeArgDef: Encoder[ArgDefinition] = (a: ArgDefinition) => Json.obj(
("name", Json.fromString(a.name)),
("argType", a.argType.asJson)
)
}
// Names of services and functions that are used in air
case class NamesConfig(
relay: String,
getDataSrv: String,
callbackSrv: String,
responseSrv: String,
responseFnName: String,
errorHandlingSrv: String,
errorFnName: String
)
// Describes a body of functions, services and callbacks
case class ServiceFunctionDef(
functionName: String,
argDefs: List[ArgDefinition],
returnType: TypeDefinition
)
// Describes service
case class ServiceDef(defaultServiceId: Option[String], functions: List[ServiceFunctionDef])
// Describes top-level function
case class FunctionDef(
functionName: String,
returnType: TypeDefinition,
argDefs: List[ArgDefinition],
names: NamesConfig
)

View File

@ -4,22 +4,19 @@ object Header {
def header(isJs: Boolean, isCommonJS: Boolean): String = {
val imports = if (isCommonJS) {
"""
|const { Fluence, FluencePeer } = require('@fluencelabs/fluence');
"""const { Fluence, FluencePeer } = require('@fluencelabs/fluence');
|const {
| ResultCodes,
| RequestFlow,
| RequestFlowBuilder,
| CallParams,} = require('@fluencelabs/fluence/dist/internal/compilerSupport/v1${if (isJs) ".js" else ""}');
|""".stripMargin
| CallParams,
| callFunction,
| registerService,
|} = require('@fluencelabs/fluence/dist/internal/compilerSupport/v2${if (isJs) ".js" else ""}');""".stripMargin
} else {
s"""import { Fluence, FluencePeer } from '@fluencelabs/fluence';
|import {
| ResultCodes,
| RequestFlow,
| RequestFlowBuilder,
| CallParams
|} from '@fluencelabs/fluence/dist/internal/compilerSupport/v1${if (isJs) ".js" else ""}';""".stripMargin
|import {
| CallParams,
| callFunction,
| registerService,
|} from '@fluencelabs/fluence/dist/internal/compilerSupport/v2${if (isJs) ".js" else ""}';""".stripMargin
}
s"""/**
| *

View File

@ -16,12 +16,6 @@ case class OutputFile(res: AquaRes) {
val functions =
res.funcs.map(f => OutputFunc(f, types)).map(_.generate).toList.mkString("\n\n")
s"""${Header.header(false, isCommonJS)}
|
|function ${typed(
s"""missingFields(${typed("obj", "any")}, ${typed("fields", "string[]")})""",
"string[]")} {
| return fields.filter(f => !(f in obj))
|}
|
|// Services
|$services

View File

@ -1,139 +1,61 @@
package aqua.backend
import aqua.backend.air.FuncAirGen
import aqua.backend.ts.TypeScriptCommon.{callBackExprBody, fixupArgName}
import aqua.backend.ts.TypeScriptCommon.fixupArgName
import aqua.backend.ts.{TSFuncTypes, TypeScriptCommon}
import aqua.model.transform.res.FuncRes
import aqua.model.transform.res.FuncRes.Arg
import aqua.types.{ArrowType, DataType, OptionType, ProductType}
import aqua.types.*
import cats.syntax.show.*
import io.circe.*
import io.circe.generic.auto.*
import io.circe.parser.*
import io.circe.syntax.*
case class OutputFunc(func: FuncRes, types: Types) {
import FuncRes.*
import TypeScriptCommon.*
import types.*
import func.*
import types.*
val funcTypes = types.funcType(func)
import funcTypes.*
val argsFormAssingment = args
.map(arg => fixupArgName(arg.name))
.appended("config")
.zipWithIndex
private def returnCallback: String =
val respBody = func.returnType match {
case Some(x) => x match {
case OptionType(_) =>
""" let [opt] = args;
| if (Array.isArray(opt)) {
| if (opt.length === 0) { resolve(null); }
| opt = opt[0];
| }
| return resolve(opt);""".stripMargin
case pt: ProductType =>
val unwrapOpts = pt.toList.zipWithIndex.collect { case (OptionType(_), i) =>
s""" if( Array.isArray(opt[$i])) {
| if (opt[$i].length === 0) { opt[$i] = null; }
| else {opt[$i] = opt[$i][0]; }
| }""".stripMargin
}.mkString
s""" let ${typed("opt", "any")} = args;
|$unwrapOpts
| return resolve(opt);""".stripMargin
case _ =>
""" const [res] = args;
| resolve(res);""".stripMargin
}
case None => ""
}
s""" h.onEvent('$callbackServiceId', '$respFuncName', (args) => {
|$respBody
| });""".stripMargin
def generate: String = {
val tsAir = FuncAirGen(func).generate
val setCallbacks = func.args.collect { // Product types are not handled
case Arg(argName, OptionType(_)) =>
s""" h.on('$dataServiceId', '$argName', () => {return ${fixupArgName(argName)} === null ? [] : [${fixupArgName(argName)}];});"""
case Arg(argName, _: DataType) =>
s""" h.on('$dataServiceId', '$argName', () => {return ${fixupArgName(argName)};});"""
case Arg(argName, at: ArrowType) =>
s""" h.use((req, resp, next) => {
| if(req.serviceId === '${conf.callbackService}' && req.fnName === '$argName') {
|${callBackExprBody(at, argName, 28)}
| }
| next();
| });
""".stripMargin
}
.mkString("\n")
val returnVal =
func.returnType.fold("Promise.race([promise, Promise.resolve()])")(_ => "promise")
val configArgName = genArgName("config")
val codeLeftSpace = " " * 20
val argsLets = args.map(arg => s" let ${typed(fixupArgName(arg.name), "any")};").mkString("\n")
// argument upnacking has two forms.
// One starting from the first (by index) argument,
// One starting from zero
val argsAssignmentStartingFrom1 = argsFormAssingment.map((name, ix) => s" ${name} = args[${ix + 1}];").mkString("\n")
val argsAssignmentStartingFrom0 = argsFormAssingment.map((name, ix) => s" ${name} = args[${ix}];").mkString("\n")
val script = tsAir.show.linesIterator.map(codeLeftSpace + _).mkString("\n")
val args = func.args.map(a => ArgDefinition.argToDef(a.name, a.`type`))
val config = func.conf
val names = NamesConfig(
config.relayVarName.getOrElse("-relay-"),
config.getDataService,
config.callbackService,
config.callbackService,
config.respFuncName,
config.errorHandlingService,
config.errorFuncName
)
val funcDef = FunctionDef(
func.funcName,
TypeDefinition(func.returnType),
args,
names
)
s"""${funcTypes.generate}
|export function ${func.funcName}(${typed("...args", "any")}) {
| let ${typed("peer", "FluencePeer")};
|${argsLets}
| let ${typed("config", "any")};
| if (FluencePeer.isInstance(args[0])) {
| peer = args[0];
|${argsAssignmentStartingFrom1}
| } else {
| peer = Fluence.getPeer();
|${argsAssignmentStartingFrom0}
| }
|
| let ${typed("request", "RequestFlow")};
| const promise = new ${generic("Promise", retTypeTs._2)}((resolve, reject) => {
| const r = new RequestFlowBuilder()
| .disableInjections()
| .withRawScript(`
|${tsAir.show.linesIterator.map(codeLeftSpace + _).mkString("\n")}
| `,
| )
| .configHandler((h) => {
| ${conf.relayVarName.fold("") { r =>
s"""h.on('${conf.getDataService}', '$r', () => {
| return peer.getStatus().relayPeerId;
| });""".stripMargin }}
|$setCallbacks
|$returnCallback
| h.onEvent('${conf.errorHandlingService}', '${conf.errorFuncName}', (args) => {
| const [err] = args;
| reject(err);
| });
| })
| .handleScriptError(reject)
| .handleTimeout(() => {
| reject('Request timed out for ${func.funcName}');
| })
|
| if (${configArgName} && ${configArgName}.ttl) {
| r.withTTL(${configArgName}.ttl)
| }
|
| request = r.build();
| });
| peer.internals.initiateFlow(${bang("request")});
| return ${returnVal};
| let script = `
| $script
| `
| return callFunction(
| args,
| ${funcDef.asJson.spaces4},
| script
| )
|}""".stripMargin
}

View File

@ -1,9 +1,12 @@
package aqua.backend
import aqua.backend.ts.TypeScriptCommon.callBackExprBody
import aqua.backend.ts.TypeScriptCommon
import aqua.model.transform.res.ServiceRes
import aqua.types.ArrowType
import io.circe.*
import io.circe.generic.auto.*
import io.circe.parser.*
import io.circe.syntax.*
case class OutputService(srv: ServiceRes, types: Types) {
@ -12,74 +15,22 @@ case class OutputService(srv: ServiceRes, types: Types) {
val serviceTypes = types.serviceType(srv)
import serviceTypes.*
def fnHandler(arrow: ArrowType, memberName: String) = {
s"""if (req.fnName === '${memberName}') {
|${callBackExprBody(arrow, "service." + memberName, 12)}
}""".stripMargin
}
def generate: String =
val fnHandlers = srv.members
.map{ case (name, arrow) =>
fnHandler(arrow, name)
}
.mkString("\n\n")
val functions = srv.members.map{ m =>
val cDef = CallbackDefinition(m._2)
ServiceFunctionDef(m._1, cDef.argDefs, cDef.returnType)
}
val defaultServiceIdBranch = srv.defaultId.fold("")(x =>
s"""else {
| serviceId = ${x}
| }""".stripMargin
)
val membersNames = srv.members.map(_._1)
val serviceDef = ServiceDef(srv.defaultId.map(s => s.replace("\"", "")), functions)
s"""
|${serviceTypes.generate}
|
|export function register${srv.name}(${typed("...args", "any")}) {
| let ${typed("peer", "FluencePeer")};
| let ${typed("serviceId", "any")};
| let ${typed("service", "any")};
| if (FluencePeer.isInstance(args[0])) {
| peer = args[0];
| } else {
| peer = Fluence.getPeer();
| }
|
| if (typeof args[0] === 'string') {
| serviceId = args[0];
| } else if (typeof args[1] === 'string') {
| serviceId = args[1];
| } ${defaultServiceIdBranch}
|
| // Figuring out which overload is the service.
| // If the first argument is not Fluence Peer and it is an object, then it can only be the service def
| // If the first argument is peer, we are checking further. The second argument might either be
| // an object, that it must be the service object
| // or a string, which is the service id. In that case the service is the third argument
| if (!(FluencePeer.isInstance(args[0])) && typeof args[0] === 'object') {
| service = args[0];
| } else if (typeof args[1] === 'object') {
| service = args[1];
| } else {
| service = args[2];
| }
|
| const incorrectServiceDefinitions = missingFields(service, [${membersNames.map { n => s"'$n'" }.mkString(", ")}]);
| if (!!incorrectServiceDefinitions.length) {
| throw new Error("Error registering service ${srv.name}: missing functions: " + incorrectServiceDefinitions.map((d) => "'" + d + "'").join(", "))
| }
|
| peer.internals.callServiceHandler.use((req, resp, next) => {
| if (req.serviceId !== serviceId) {
| next();
| return;
| }
|
| ${fnHandlers}
|
| next();
| });
| registerService(
| args,
| ${serviceDef.asJson.spaces4}
| );
|}
""".stripMargin
}

View File

@ -56,7 +56,7 @@ object TypeScriptCommon {
val retType = returnType(at)
s"(${args}) => ${retType}"
s"(${args}) => ${retType} | Promise<${retType}>"
def argsToTs(at: ArrowType): List[String] =
FuncRes
@ -75,52 +75,4 @@ object TypeScriptCommon {
"null"
}
s"callParams: CallParams<${generic}>"
def callBackExprBody(at: ArrowType, callbackName: String, leftSpace: Int): String = {
val arrowArgumentsToCallbackArgumentsList =
at.domain.toList
.zipWithIndex
.map((`type`, idx) => {
val valueFromArg = s"req.args[$idx]"
`type` match {
case OptionType(t) => s"${valueFromArg}.length === 0 ? null : ${valueFromArg}[0]"
case _ => valueFromArg
}
})
.concat(List("callParams"))
.mkString(", ")
val callCallbackStatement = s"$callbackName(${arrowArgumentsToCallbackArgumentsList})"
val callCallbackStatementAndReturn =
at.res.fold(s"${callCallbackStatement}; resp.result = {}")(`type` =>
`type` match {
case OptionType(t) => s"""
| var respResult = ${callCallbackStatement};
| resp.result = respResult === null ? [] : [respResult]
|""".stripMargin
case _ => s"resp.result = ${callCallbackStatement}"
}
)
val tetraplets = FuncRes
.arrowArgs(at)
.zipWithIndex
.map((x, idx) => {
s"${x.name}: req.tetraplets[${idx}]"
})
.mkString(",")
val left = " " * leftSpace
s"""${left}const callParams = {
|$left ...req.particleContext,
|$left tetraplets: {
|$left ${tetraplets}
|$left },
|$left};
|${left}resp.retCode = ResultCodes.success;
|$left${callCallbackStatementAndReturn}""".stripMargin
}
}

View File

@ -13,11 +13,12 @@ val catsEffectV = "3.2.1"
val log4catsV = "2.1.1"
val slf4jV = "1.7.30"
val declineV = "2.1.0"
val circeVersion = "0.14.1"
name := "aqua-hll"
val commons = Seq(
baseAquaVersion := "0.3.2",
baseAquaVersion := "0.4.0",
version := baseAquaVersion.value + "-" + sys.env.getOrElse("BUILD_NUMBER", "SNAPSHOT"),
scalaVersion := dottyVersion,
libraryDependencies ++= Seq(
@ -165,4 +166,11 @@ lazy val `backend-ts` = crossProject(JVMPlatform, JSPlatform)
.crossType(CrossType.Pure)
.in(file("backend/ts"))
.settings(commons: _*)
.settings(
libraryDependencies ++= Seq(
"io.circe" %%% "circe-core",
"io.circe" %%% "circe-generic",
"io.circe" %%% "circe-parser"
).map(_ % circeVersion)
)
.dependsOn(`backend-air`)