This commit is contained in:
Dima 2022-07-06 11:15:56 +03:00 committed by GitHub
parent 198e339694
commit 029d106079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 171 additions and 10 deletions

View File

@ -34,7 +34,8 @@ case class RunInfo(
imports: List[Path] = Nil,
argumentGetters: Map[String, VarJson] = Map.empty,
services: List[Service] = Nil,
jsonServices: List[JsonService] = Nil
jsonServices: List[JsonService] = Nil,
pluginsPaths: List[String] = Nil
)
// Builds subcommand

View File

@ -7,6 +7,7 @@ import aqua.io.OutputPrinter
import aqua.js.*
import aqua.keypair.KeyPairShow.show
import aqua.run.RunCommand.createKeyPair
import aqua.run.plugin.Plugin
import cats.data.Validated.{invalidNec, validNec}
import cats.data.ValidatedNec
import cats.effect.kernel.Async
@ -17,7 +18,7 @@ import cats.syntax.show.*
import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}
import scala.scalajs.js
import scala.scalajs.js.{timers, JSON, JavaScriptException}
import scala.scalajs.js.{JSON, JavaScriptException, timers}
object FuncCaller {
@ -34,6 +35,7 @@ object FuncCaller {
services: List[Service],
getters: List[ArgumentGetter]
): F[ValidatedNec[String, Unit]] = {
FluenceUtils.setLogLevel(LogLevelTransformer.logLevelToFluenceJS(config.common.logLevel.fluencejs))
// stops peer in any way at the end of execution
@ -68,6 +70,9 @@ object FuncCaller {
// register all services
_ = (services ++ getters :+ finisherService).map(_.register(peer))
// register all plugins
plugins <- Plugin.getPlugins(config.plugins)
_ = plugins.map(_.register(peer))
callFuture = CallJsFunction.funcCallJs(
air,
functionDef,

View File

@ -127,7 +127,7 @@ object RunCommand extends Logging {
runInfo.func,
inputPath,
runInfo.imports,
RunConfig(common, runInfo.argumentGetters, runInfo.services ++ builtinServices, runInfo.jsonServices),
RunConfig(common, runInfo.argumentGetters, runInfo.services ++ builtinServices, runInfo.jsonServices, runInfo.pluginsPaths),
transformConfig(common.on, common.constants, common.flags.noXor, common.flags.noRelay)
)
}

View File

@ -110,6 +110,7 @@ case class RunConfig(
// builtin services for aqua run, for example: Console, FileSystem, etc
services: List[Service],
jsonServices: List[JsonService],
plugins: List[String],
resultPrinterServiceId: String = "--after-callback-srv-service--",
resultPrinterName: String = "console-log",
finisherServiceId: String = "--finisher--",

View File

@ -11,6 +11,7 @@ import aqua.parser.lift.LiftParser.Implicits.idLiftParser
import aqua.parser.lift.Span
import aqua.raw.ConstantRaw
import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw}
import aqua.run.plugin.Plugin
import aqua.types.BottomType
import cats.data.*
import cats.data.Validated.{invalid, invalidNec, valid, validNec, validNel}
@ -50,13 +51,14 @@ object RunOpts extends Logging {
}
def runOptsCompose[F[_]: Files: Concurrent]
: Opts[F[ValidatedNec[String, (Path, List[Path], FuncWithData, Option[NonEmptyList[JsonService]])]]] = {
: Opts[F[ValidatedNec[String, (Path, List[Path], FuncWithData, Option[NonEmptyList[JsonService]], List[String])]]] = {
(
AppOpts.inputOpts[F],
AppOpts.importOpts[F],
ArgOpts.funcWithArgsOpt[F],
AppOpts.wrapWithOption(JsonService.jsonServiceOpt)
).mapN { case (inputF, importF, funcWithArgsF, jsonServiceOp) =>
AppOpts.wrapWithOption(JsonService.jsonServiceOpt),
AppOpts.wrapWithOption(Plugin.opt)
).mapN { case (inputF, importF, funcWithArgsF, jsonServiceOp, pluginsOp) =>
for {
inputV <- inputF
importV <- importF
@ -64,9 +66,10 @@ object RunOpts extends Logging {
jsonServiceV <- jsonServiceOp
.map(_.map(_.map(js => Some(js))))
.getOrElse(validNec[String, Option[NonEmptyList[JsonService]]](None).pure[F])
pluginsPathsV <- pluginsOp.getOrElse(validNec[String, List[String]](Nil).pure[F])
} yield {
(inputV, importV, funcWithArgsV, jsonServiceV).mapN { case (i, im, f, j) =>
(i, im, f, j)
(inputV, importV, funcWithArgsV, jsonServiceV, pluginsPathsV).mapN { case (i, im, f, j, p) =>
(i, im, f, j, p)
}
}
}
@ -86,7 +89,7 @@ object RunOpts extends Logging {
) =>
LogFormatter.initLogger(Some(common.logLevel.compiler))
optionsF.map(
_.map { case (input, imps, funcWithArgs, services) =>
_.map { case (input, imps, funcWithArgs, services, pluginsPaths) =>
RunInfo(
common,
funcWithArgs.func,
@ -94,7 +97,8 @@ object RunOpts extends Logging {
imps,
funcWithArgs.getters,
Nil,
services.map(_.toList).getOrElse(Nil)
services.map(_.toList).getOrElse(Nil),
pluginsPaths
)
}
)

View File

@ -0,0 +1,150 @@
package aqua.run.plugin
import aqua.backend.{
ArrowTypeDef,
LabeledProductTypeDef,
ServiceDef,
TopTypeDef,
UnlabeledProductTypeDef
}
import aqua.js.{CallJsFunction, FluencePeer, ServiceHandler}
import aqua.run.JsonService
import aqua.run.plugin.Plugin.toPromise
import aqua.types.TopType
import cats.data.{NonEmptyList, ValidatedNec}
import cats.effect.Concurrent
import cats.syntax.applicative.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.traverse.*
import cats.data.Validated.{invalid, invalidNec, valid, validNec, validNel}
import com.monovore.decline.Opts
import fs2.io.file.{Files, Path}
import scalajs.js
import scala.concurrent.{ExecutionContext, Future}
import scala.scalajs.js.Promise
case class Function(name: String, closure: js.Function)
case class Plugin(name: String, functions: List[Function]) {
def register(peer: FluencePeer): Unit = {
val (handlers, funcTypes) = functions.map { f =>
// get arguments types as TopType
val argCount = f.closure.length
val fields = Range(0, argCount).toList.map { i => ("arg" + i, TopTypeDef) }
val arrowType =
ArrowTypeDef(LabeledProductTypeDef(fields), UnlabeledProductTypeDef(TopTypeDef :: Nil))
val fType = (f.name, arrowType)
// handlers for registering
val h: ServiceHandler = args => {
val argsList = Range(0, argCount).toList.map { i =>
args(i)
}
val res = f.closure.call(this.asInstanceOf[js.Any], argsList: _*)
toPromise(res)
}
((f.name, h), fType)
}.unzip
CallJsFunction.registerService(
peer,
name,
handlers,
ServiceDef(Some(name), LabeledProductTypeDef(funcTypes))
)
}
}
object Plugin {
private def fileExt(p: Path): String =
p.fileName.toString.split('.').toList.lastOption.getOrElse("")
def opt[F[_]: Files: Concurrent]: Opts[F[ValidatedNec[String, List[String]]]] = {
Opts
.option[String]("plugin", "[experimental] Path to a directory with JS plugins", "p", "path")
.map { str =>
val path = Path(str)
Files[F]
.exists(path)
.flatMap { exists =>
if (exists)
Files[F].isRegularFile(path).flatMap { isFile =>
if (isFile) {
if (fileExt(path) == "mjs") {
validNec(str :: Nil).pure[F]
} else {
invalidNec(s"If path '$str' is a file, it must be with '.mjs' extension")
.pure[F]
}
} else {
Files[F]
.list(path)
.evalMap { ps =>
for {
isFile <- Files[F].isRegularFile(ps)
files <-
if (isFile) {
if (fileExt(ps) == "mjs") (ps :: Nil).pure[F]
else Nil.pure[F]
} else if (ps.fileName.toString != "node_modules") {
Files[F].list(ps).filter(pp => fileExt(pp) == "mjs").compile.toList
} else {
Nil.pure[F]
}
} yield {
files
}
}
.compile
.toList
.map(_.flatten.map(_.absolute.toString))
.map(validNec)
}
}
else {
invalidNec(s"There is no path '$str'").pure[F]
}
}
}
}
def getPlugins(paths: List[String])(implicit
ec: ExecutionContext
): Future[List[Plugin]] =
paths.map(p => getPlugin(p)).sequence.map(_.flatten)
private def toPromise(arg: js.Dynamic): js.Promise[js.Dynamic] = {
if (js.typeOf(arg) == "object" && js.typeOf(arg.`then`) == "function")
arg.asInstanceOf[js.Promise[js.Dynamic]]
else js.Promise.resolve(arg)
}
def getPlugin(path: String)(implicit
ec: ExecutionContext
): Future[List[Plugin]] = {
for {
file <- js.`import`[js.Dynamic](path).toFuture
plugin <- {
if (js.typeOf(file.plugins) == "function") {
val res = file.applyDynamic("plugins")()
toPromise(res).toFuture.map(_.asInstanceOf[js.Dictionary[js.Dictionary[js.Any]]])
} else {
Future(js.Dictionary[js.Dictionary[js.Any]]())
}
}
} yield {
plugin.map { case (k, v) =>
val functions = v.map { case (kf, vf) =>
Function(kf, vf.asInstanceOf[js.Function])
}.toList
Plugin(k, functions)
}.toList
}
}
}