Upload file to Fluence IPFS sidecar (#390)

This commit is contained in:
Dima 2021-12-24 11:50:12 +03:00 committed by GitHub
parent 22778914ca
commit 082293a4e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 2901 additions and 450 deletions

View File

@ -55,6 +55,7 @@ jobs:
mv cli/.js/target/scala-3.1.0/cli-fastopt.js npm/aqua.js
cd npm
npm i
npm run build
cd ../aqua-playground
npm i --save-dev ../npm
npm i

View File

@ -1,9 +0,0 @@
import "chatApp.aqua"
import "service.aqua"
import "builtin.aqua"
import "demo.aqua"
func createUser(nickname: string):
on nickname:
AppConfig.getApp()
AppConfig.getApp()

View File

@ -1,3 +0,0 @@
service AppConfig("affinidi/get-config"):
getApp: -> string
getLocalService: -> string

View File

@ -1,45 +0,0 @@
module Err declares Peer, Op, include
service Peer("peer"):
is_connected: string -> bool
service Op("op"):
identity: -> ()
data User:
peer_id: string
relay_id: string
name: string
service Test("test"):
getUserList: -> []User
doSomething: -> bool
func include(opt: ?string) -> string:
Peer.is_connected(opt!)
for i <- opt:
Peer.is_connected(i)
<- opt!
func betterMessage(relay: string, arr: []string, opt: ?string, str: *string) -> ?string:
on relay:
Peer.is_connected("something")
par isOnline <- Peer.is_connected(relay)
par on "quray" via arr:
Peer.is_connected("qurara")
stream: *string
localOpt: ?string
if isOnline:
try:
Test.doSomething()
else:
Peer.is_connected(stream!)
x <- include(arr)
y <- include(opt)
localOpt <- include(str)
<- opt

View File

@ -1,7 +0,0 @@
module Srv declares *
import "chatApp.aqua"
service Service("affinidi/chat"):
create_user(peer_id: string, relay_id: string, nickname: string) -> string
chats_by_peer_id(peer_id: string) -> string

View File

@ -1,40 +0,0 @@
data ServiceInstance:
peer_id: string
service_id: string
data App:
history: ServiceInstance
user_list: ServiceInstance
service UserList:
leave: string -> u32
func in(xap: App) -> u32:
UserList xap.user_list.service_id
res <- UserList.leave("line in in()")
<- res
func bug(app: App, smth: bool, callback: bool -> ()):
UserList app.user_list.service_id
UserList.leave("line in bug()")
x <- in(app)
callback(smth)
func addEntry(entry: string, selfPeerId: string) -> AddServiceResult:
app <- AppConfig.getApp()
relay <- AppConfig.get_init_relay()
on relay:
Op.identity()
authRes <- auth()
on app.history.peer_id:
History app.history.service_id
res <- History.add(entry, authRes.is_authenticated)
allUsers <- getUserList()
for user <- allUsers:
on user.relay_id:
Op.identity()
par on user.peer_id:
FluentPad.notifyTextUpdate(entry, selfPeerId, authRes.is_authenticated)
<- res

View File

@ -1,36 +0,0 @@
-- This is some data
-- With a comment on first lines
data Smth:
a: B
c: []D
oloolo: Akaka
service MySrv:
func: A, u32 -> u32
-- We can have comments inside
pure: -> []Cdef
func do_smth( a: X, b: -> Z ): -- And comments after the line
a <- b()
x(a)
arrCdef <- MySrv.pure()
PoorAbility a
y <- PoorAbility.func()
func do_smth2( a: X, b: -> Z ): -- And comments after the line
b()
on 32:
OnAb "something"
OnAb.call()
par y <- ParAbb.call()
par on 43:
t <- ParOnAbb.call()
b(a, z)
shouldBeA <- MySrv.func()
alias Akaka : u32
alias Akaka : bool
service Json("json"):
getString: string, string -> string

View File

@ -1,18 +0,0 @@
data User:
peer_id: string
relay_id: string
name: string
data GetUsersServiceResult:
users: []User
ret_code: s32
err_msg: string
service UserList:
get_users: -> GetUsersServiceResult
func getUserList() -> []User:
on "userlist_node":
UserList "userlist_id"
allUsers <- UserList.get_users()
<- allUsers.users

View File

@ -1,17 +0,0 @@
service SomeThing("smth"):
callU32: string -> u32
callBool: string -> bool
getList: -> []string
func tryFor():
peers <- SomeThing.getList()
if false:
SomeThing.callBool("just to have smth before the for")
par for p <- peers par:
SomeThing.callU32("call u32 inside")
SomeThing.callBool("call bool inside")
else:
SomeThing.callBool("just to have smth after the for")

View File

@ -1,39 +0,0 @@
service Local("local"):
gt: u32, u32 -> bool
onIn: string -> ()
onBoolean: bool -> ()
func tryGen(in1: u32, in2: string) -> bool:
on in2:
Local.onIn(in2)
v <- Local.gt(in1, 25)
<- v
func generateComplex(value: string) -> bool:
on "smth":
Local "other"
b <- tryGen(23, value)
t <- tryGen(24, "should be omitted")
Local.onIn("Should be on other")
Local.onIn("Should be on root")
Local.onBoolean(t)
<- b
func genArrows(arr: string -> bool):
arr("hi there")
func genFold(vals: []string):
Local.onIn("hi there")
for v <- vals:
par on v:
Local.onIn(v)
Local.onBoolean(true)
func wrapGenArrows() -> bool:
on "some peer":
genArrows(generateComplex)
if true == false:
x <- generateComplex("hello there!")
else:
y <- generateComplex("else do this!")
<- x

View File

@ -1,21 +0,0 @@
service Exp("exp"):
foo: -> u32
bar: string -> ()
func expInner() -> u32:
a <- Exp.foo()
b <- Exp.foo()
<- a
func expInput(v: string) -> u32:
a <- Exp.foo()
Exp.bar(v)
<- a
func expOuter(input: string, callback: u32 -> ()):
a <- expInner()
b <- Exp.foo()
relay <- expInner()
Exp.bar(input)
expInput(input)
callback(relay)

View File

@ -1,20 +0,0 @@
data Inside:
foo: u32
bar: bool
data Wrapping:
inside: Inside
value: string
data Enclosing:
wrap: Wrapping
arr: []Wrapping
flag: bool
func call(enc: Enclosing, oni32: u32 -> (), onString: string -> (), onBool: bool -> (), onInside: Inside -> Inside) -> Inside:
onBool(enc.flag)
onBool(enc.wrap)
if enc.flag == enc.wrap:
oni32(22)
<- enc.wrap.inside

View File

@ -1,5 +1,6 @@
package aqua
import aqua.ipfs.IpfsOpts
import aqua.js.{Meta, Module}
import cats.effect.ExitCode
import cats.effect.kernel.Async
@ -18,7 +19,8 @@ object PlatformOpts extends Logging {
def opts[F[_]: Files: AquaIO: Async](implicit ec: ExecutionContext): Opts[F[ExitCode]] =
Opts.subcommand(RunOpts.runCommand[F]) orElse
Opts.subcommand(KeyPairOpts.createKeypair[F])
Opts.subcommand(KeyPairOpts.createKeypair[F]) orElse
Opts.subcommand(IpfsOpts.upload[F])
// get path to node modules if there is `aqua-lib` module with `builtin.aqua` in it
def getGlobalNodeModulePath: Option[Path] = {

View File

@ -1,22 +1,32 @@
package aqua.builder
import aqua.backend.{ArgDefinition, PrimitiveType, ServiceDef, ServiceFunctionDef}
import aqua.js.{CallJsFunction, CallServiceHandler, FluencePeer}
import aqua.model.func.Call
import aqua.model.func.raw.CallServiceTag
import aqua.model.{LiteralModel, VarModel}
import scalajs.js
import scala.concurrent.Promise
// Service that can return argument to use it from a code
case class ArgumentGetter(serviceId: String, value: VarModel, arg: scalajs.js.Dynamic)
extends ServiceFunction {
def registerService(peer: FluencePeer): CallServiceHandler = {
def registerService(peer: FluencePeer): Unit = {
CallJsFunction.registerService(
peer,
serviceId,
value.name,
_ => arg
_ => js.Promise.resolve(arg),
ServiceDef(
None,
ServiceFunctionDef(
value.name,
Nil,
PrimitiveType
) :: Nil
)
)
}

View File

@ -1,15 +1,18 @@
package aqua.builder
import aqua.backend.{ArgDefinition, PrimitiveType, ServiceDef, ServiceFunctionDef, VoidType}
import aqua.io.OutputPrinter
import aqua.js.{CallJsFunction, CallServiceHandler, FluencePeer}
import aqua.model.func.Call
import aqua.model.func.raw.CallServiceTag
import aqua.model.{LiteralModel, VarModel}
import scala.scalajs.js.JSON
import scala.scalajs.js
import scala.scalajs.js.{Dynamic, JSON}
// Function to print any variables that passed as arguments
class Console(serviceId: String, fnName: String) extends ServiceFunction {
class Console(serviceId: String, fnName: String, resultNames: List[String])
extends ServiceFunction {
def callTag(variables: List[VarModel]): CallServiceTag = {
CallServiceTag(
@ -19,17 +22,33 @@ class Console(serviceId: String, fnName: String) extends ServiceFunction {
)
}
def registerService(peer: FluencePeer): CallServiceHandler = {
CallJsFunction.registerUnitService(
def registerService(peer: FluencePeer): Unit = {
CallJsFunction.registerService(
peer,
serviceId,
fnName,
args => {
val str = JSON.stringify(args, space = 2)
varArgs => {
// drop last argument (tetraplets)
val args: Seq[js.Any] = varArgs.init
val toPrint = args.toList match {
case arg :: Nil => JSON.stringify(arg, space = 2)
case _ => args.map(a => JSON.stringify(a, space = 2)).mkString("[\n", ",\n", "\n]")
}
// if an input function returns a result, our success will be after it is printed
// otherwise finish after JS SDK will finish sending a request
OutputPrinter.print(str)
}
OutputPrinter.print(toPrint)
// empty JS object
js.Promise.resolve(ServiceFunction.emptyObject)
},
ServiceDef(
None,
ServiceFunctionDef(
fnName,
resultNames.map(n => ArgDefinition(n, PrimitiveType)),
VoidType
) :: Nil
)
)
}
}

View File

@ -1,5 +1,6 @@
package aqua.builder
import aqua.backend.{ServiceDef, ServiceFunctionDef, VoidType}
import aqua.io.OutputPrinter
import aqua.js.{CallJsFunction, FluencePeer}
import aqua.model.func.Call
@ -7,7 +8,9 @@ import aqua.model.func.raw.CallServiceTag
import aqua.model.{LiteralModel, VarModel}
import scala.concurrent.Promise
import scala.scalajs.js
import scala.scalajs.js.JSON
import scala.scalajs.js.Dynamic
// Will finish promise on service call
case class Finisher private (
@ -25,13 +28,22 @@ case class Finisher private (
}
def registerService(peer: FluencePeer) = {
CallJsFunction.registerUnitService(
CallJsFunction.registerService(
peer,
serviceId,
fnName,
_ => {
promise.success(())
}
js.Promise.resolve(ServiceFunction.emptyObject)
},
ServiceDef(
None,
ServiceFunctionDef(
fnName,
Nil,
VoidType
) :: Nil
)
)
}
}

View File

@ -0,0 +1,35 @@
package aqua.builder
import aqua.backend.{ArgDefinition, PrimitiveType, ServiceDef, ServiceFunctionDef, VoidType}
import aqua.ipfs.js.IpfsApi
import aqua.js.{CallJsFunction, CallServiceHandler, FluencePeer}
import scribe.Logging
import scalajs.js
class IPFSUploader(serviceId: String, fnName: String) extends ServiceFunction with Logging {
def registerService(peer: FluencePeer): Unit = {
CallJsFunction.registerService(
peer,
serviceId,
fnName,
args => {
IpfsApi
.uploadFile(args(0), args(1), logger.info: String => Unit, logger.error: String => Unit)
.`catch` { err =>
js.Dynamic.literal(error = "Error on uploading file: " + err)
}
},
ServiceDef(
None,
ServiceFunctionDef(
fnName,
ArgDefinition("path", PrimitiveType) :: ArgDefinition("multiaddr", PrimitiveType) :: Nil,
PrimitiveType
) :: Nil
)
)
}
}

View File

@ -1,7 +1,12 @@
package aqua.builder
import aqua.js.{CallServiceHandler, FluencePeer}
import scalajs.js.Dynamic
trait ServiceFunction {
def registerService(peer: FluencePeer): CallServiceHandler
def registerService(peer: FluencePeer): Unit
}
object ServiceFunction {
val emptyObject: Dynamic = Dynamic.literal()
}

View File

@ -0,0 +1,69 @@
package aqua.ipfs
import aqua.{AppOpts, AquaIO, FluenceOpts, LogFormatter, LogLevelTransformer}
import aqua.io.OutputPrinter
import aqua.js.{Fluence, PeerConfig}
import aqua.keypair.KeyPairShow.show
import cats.data.{NonEmptyChain, NonEmptyList, Validated, ValidatedNec, ValidatedNel}
import Validated.{invalid, invalidNec, valid, validNec, validNel}
import aqua.builder.IPFSUploader
import aqua.files.AquaFilesIO
import aqua.ipfs.js.IpfsApi
import aqua.model.LiteralModel
import aqua.run.RunCommand.createKeyPair
import aqua.run.{RunCommand, RunConfig, RunOpts}
import cats.effect.{Concurrent, ExitCode, Resource, Sync}
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.applicative.*
import cats.syntax.apply.*
import cats.effect.kernel.Async
import cats.syntax.show.*
import cats.{Applicative, Monad}
import com.monovore.decline.{Command, Opts}
import fs2.io.file.{Files, Path}
import scribe.Logging
import scala.concurrent.{ExecutionContext, Future}
import scala.scalajs.js
// Options and commands to work with IPFS
object IpfsOpts extends Logging {
val IpfsAquaPath = "aqua/ipfs.aqua"
val UploadFuncName = "uploadFile"
def pathOpt[F[_]: Files: Concurrent]: Opts[String] =
Opts
.option[String]("path", "Path to file", "p")
// Uploads a file to IPFS
def upload[F[_]: Async](implicit ec: ExecutionContext): Command[F[ExitCode]] =
Command(
name = "upload",
header = "Upload a file to IPFS"
) {
(
pathOpt,
FluenceOpts.multiaddrOpt,
AppOpts.logLevelOpt,
AppOpts.wrapWithOption(FluenceOpts.secretKeyOpt),
RunOpts.timeoutOpt,
FluenceOpts.printAir
).mapN { (path, multiaddr, logLevel, secretKey, timeout, printAir) =>
LogFormatter.initLogger(Some(logLevel))
val ipfsUploader = new IPFSUploader("ipfs", "uploadFile")
implicit val aio: AquaIO[F] = new AquaFilesIO[F]
RunCommand
.run[F](
multiaddr,
UploadFuncName,
LiteralModel.quote(path) :: Nil,
Path(IpfsAquaPath),
Nil,
RunConfig(timeout, logLevel, printAir, secretKey, Map.empty, ipfsUploader :: Nil)
)
.map(_ => ExitCode.Success)
}
}
}

View File

@ -0,0 +1,18 @@
package aqua.ipfs.js
import aqua.js.FluencePeer
import scala.scalajs.js
import scala.scalajs.js.annotation.{JSExportAll, JSImport}
object IpfsApi {
@js.native
@JSImport("./dist/ipfs.js", "uploadFile")
def uploadFile(
path: js.Any,
multiaddrResult: js.Any,
infoLogger: js.Any,
errorLogger: js.Any
): js.Promise[js.Dynamic] = js.native
}

View File

@ -1,6 +1,6 @@
package aqua.js
import aqua.backend.{ArgDefinition, FunctionDef, NamesConfig, TypeDefinition}
import aqua.backend.{ArgDefinition, FunctionDef, NamesConfig, ServiceDef, TypeDefinition}
import aqua.model.transform.TransformConfig
import aqua.model.transform.res.FuncRes
import aqua.types.Type
@ -9,41 +9,24 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.scalajs.js
import scala.scalajs.js.JSConverters.*
trait ServiceHandler extends js.Function {
def apply(args: js.Any*): js.Promise[js.Dynamic]
}
object CallJsFunction {
def registerService(
peer: FluencePeer,
serviceId: String,
fnName: String,
handler: js.Array[js.Any] => js.Dynamic
): CallServiceHandler = {
peer.internals.callServiceHandler.use((req, resp, next) => {
if (req.serviceId == serviceId && req.fnName == fnName) {
val result = handler(req.args)
resp.retCode = ResultCodes.success
resp.result = result
}
next()
})
}
// Register a service that returns no result
def registerUnitService(
peer: FluencePeer,
serviceId: String,
fnName: String,
handler: js.Array[js.Any] => Unit
): CallServiceHandler = {
registerService(
peer,
serviceId,
fnName,
arr => {
handler(arr)
js.Dynamic.literal()
}
)
handler: ServiceHandler,
servideDef: ServiceDef
): Unit = {
js.Function
val func = js.Dynamic.literal(fnName -> handler)
val args: js.Array[js.Any] =
js.Array(peer, serviceId, func)
V2.registerService(args, ServiceDefJs(servideDef))
}
// Call a function with generated air script

View File

@ -1,7 +1,7 @@
package aqua.js
import aqua.*
import aqua.backend.{ArgDefinition, FunctionDef, NamesConfig, TypeDefinition}
import aqua.backend.{ArgDefinition, FunctionDef, NamesConfig, ServiceDef, ServiceFunctionDef, TypeDefinition}
import scala.concurrent.Promise
import scala.scalajs.js
@ -70,15 +70,54 @@ trait PeerStatus extends js.Object {
@JSExportAll
case class FunctionDefJs(
functionName: String,
returnType: TypeDefinition,
argDefs: js.Array[ArgDefinition],
returnType: TypeDefinitionJs,
argDefs: js.Array[ArgDefinitionJs],
names: NamesConfigJs
)
object FunctionDefJs {
def apply(fd: FunctionDef): FunctionDefJs = {
FunctionDefJs(fd.functionName, fd.returnType, fd.argDefs.toJSArray, NamesConfigJs(fd.names))
FunctionDefJs(fd.functionName, TypeDefinitionJs(fd.returnType), fd.argDefs.map(ArgDefinitionJs.apply).toJSArray, NamesConfigJs(fd.names))
}
}
@JSExportAll
case class ArgDefinitionJs(
name: String,
argType: TypeDefinitionJs
)
object ArgDefinitionJs {
def apply(ad: ArgDefinition): ArgDefinitionJs = ArgDefinitionJs(ad.name, TypeDefinitionJs(ad.argType))
}
@JSExportAll
case class TypeDefinitionJs(tag: String)
object TypeDefinitionJs {
def apply(td: TypeDefinition): TypeDefinitionJs = TypeDefinitionJs(td.tag)
}
@JSExportAll
case class ServiceFunctionDefJs(
functionName: String,
argDefs: js.Array[ArgDefinitionJs],
returnType: TypeDefinitionJs
)
object ServiceFunctionDefJs {
def apply(sd: ServiceFunctionDef): ServiceFunctionDefJs = {
ServiceFunctionDefJs(sd.functionName, sd.argDefs.map(ArgDefinitionJs.apply).toJSArray, TypeDefinitionJs(sd.returnType))
}
}
@JSExportAll
case class ServiceDefJs(defaultServiceId: Option[String], functions: js.Array[ServiceFunctionDefJs])
object ServiceDefJs {
def apply(sd: ServiceDef): ServiceDefJs = {
ServiceDefJs(sd.defaultServiceId, sd.functions.map(ServiceFunctionDefJs.apply).toJSArray)
}
}
@ -132,6 +171,10 @@ class FluencePeer extends js.Object {
object V2 {
@js.native
@JSImport("@fluencelabs/fluence/dist/internal/compilerSupport/v2.js", "registerService")
def registerService(args: js.Array[js.Any], `def`: ServiceDefJs): Unit = js.native
@js.native
@JSImport("@fluencelabs/fluence/dist/internal/compilerSupport/v2.js", "callFunction")
def callFunction(

View File

@ -17,10 +17,6 @@ import scala.concurrent.{ExecutionContext, Future}
// Options and commands to work with KeyPairs
object KeyPairOpts extends Logging {
// Used to pass existing keypair to AquaRun
val secretKey: Opts[String] =
Opts.option[String]("secret-key", "Ed25519 32-byte key in base64", "sk")
// KeyPair generation
def createKeypair[F[_]: Async](implicit ec: ExecutionContext): Command[F[ExitCode]] =
Command(

View File

@ -2,7 +2,7 @@ package aqua.run
import aqua.LogLevelTransformer
import aqua.backend.FunctionDef
import aqua.builder.{Console, Finisher}
import aqua.builder.{Console, Finisher, ServiceFunction}
import aqua.io.OutputPrinter
import aqua.js.{CallJsFunction, Fluence, FluenceUtils, PeerConfig}
import aqua.run.RunCommand.createKeyPair
@ -26,8 +26,8 @@ object FuncCaller {
air: String,
functionDef: FunctionDef,
config: RunConfig,
consoleService: Console,
finisherService: Finisher
finisherService: Finisher,
services: List[ServiceFunction]
)(implicit
ec: ExecutionContext
): F[Unit] = {
@ -53,15 +53,18 @@ object FuncCaller {
)
.toFuture
_ = OutputPrinter.print("Your peerId: " + peer.getStatus().peerId)
_ = consoleService.registerService(peer)
_ = finisherService.registerService(peer)
_ = config.argumentGetters.values.map(_.registerService(peer))
// register all services
_ = (services ++ config.argumentGetters.values :+ finisherService).map(
_.registerService(peer)
)
callFuture = CallJsFunction.funcCallJs(
air,
functionDef,
List.empty
)
_ <- Future.firstCompletedOf(finisherService.promise.future :: callFuture :: Nil)
// error will be thrown on failed call
_ <- callFuture
_ <- finisherService.promise.future
} yield ()).recover(handleFuncCallErrors).pure[F]
}
}

View File

@ -54,27 +54,6 @@ object RunCommand extends Logging {
}.getOrElse(Future.successful(None))
}
// Generates air from function, register all services and make a call through FluenceJS
def genAirAndMakeCall[F[_]: Async](
multiaddr: String,
wrapped: FuncCallable,
consoleService: Console,
finisherService: Finisher,
transformConfig: TransformConfig,
runConfig: RunConfig
)(implicit ec: ExecutionContext): F[Unit] = {
val funcRes = Transform.fn(wrapped, transformConfig)
val definitions = FunctionDef(funcRes)
val air = FuncAirGen(funcRes).generate.show
if (runConfig.printAir) {
OutputPrinter.print(air)
}
FuncCaller.funcCall[F](multiaddr, air, definitions, runConfig, consoleService, finisherService)
}
private def findFunction(contexts: Chain[AquaContext], funcName: String): Option[FuncCallable] =
contexts
.flatMap(_.exports.map(e => Chain.fromSeq(e.funcs.values.toList)).getOrElse(Chain.empty))
@ -116,32 +95,9 @@ object RunCommand extends Logging {
val resultV: ValidatedNec[String, F[Unit]] = contextV.andThen { contextC =>
findFunction(contextC, func) match {
case Some(funcCallable) =>
val consoleService =
new Console(runConfig.consoleServiceId, runConfig.printFunctionName)
val promiseFinisherService =
Finisher(runConfig.finisherServiceId, runConfig.finisherFnName)
// call an input function from a generated function
val callResult: ValidatedNec[String, F[Unit]] = RunWrapper
.wrapCall(
func,
funcCallable,
args,
runConfig,
consoleService,
promiseFinisherService
)
.map { wrapped =>
genAirAndMakeCall[F](
multiaddr,
wrapped,
consoleService,
promiseFinisherService,
transformConfig,
runConfig
)
}
callResult
val runner =
new Runner(func, funcCallable, multiaddr, args, runConfig, transformConfig)
runner.run()
case None =>
Validated.invalidNec[String, F[Unit]](s"There is no function called '$func'")
}

View File

@ -1,6 +1,6 @@
package aqua.run
import aqua.builder.ArgumentGetter
import aqua.builder.{ArgumentGetter, ServiceFunction}
import scribe.Level
// `run` command configuration
@ -9,9 +9,12 @@ case class RunConfig(
logLevel: Level,
printAir: Boolean,
secretKey: Option[Array[Byte]],
// services that will pass arguments to air
argumentGetters: Map[String, ArgumentGetter],
// services that will be used in aqua code and need to be registered
services: List[ServiceFunction] = Nil,
consoleServiceId: String = "--after-callback-srv-service--",
printFunctionName: String = "print-and-stop",
printFunctionName: String = "console-log",
finisherServiceId: String = "--finisher--",
finisherFnName: String = "--finish-execution--",
resultName: String = "-some-unique-res-name-",

View File

@ -6,7 +6,7 @@ import aqua.parser.lexer.{Literal, VarLambda}
import aqua.parser.lift.LiftParser.Implicits.idLiftParser
import aqua.parser.lift.Span
import aqua.types.BottomType
import aqua.{AppOpts, AquaIO, LogFormatter}
import aqua.{AppOpts, AquaIO, FluenceOpts, LogFormatter}
import cats.data.{NonEmptyChain, NonEmptyList, Validated, ValidatedNec, ValidatedNel}
import Validated.{invalid, invalidNec, valid, validNec, validNel}
import aqua.builder.ArgumentGetter
@ -33,23 +33,6 @@ object RunOpts extends Logging {
.option[Int]("timeout", "Request timeout in milliseconds", "t")
.withDefault(7000)
val multiaddrOpt: Opts[String] =
Opts
.option[String]("addr", "Relay multiaddress", "a")
.withDefault(
"/dns4/kras-00.fluence.dev/tcp/19001/wss/p2p/12D3KooWR4cv1a8tv7pps4HH6wePNaK6gf1Hww5wcCMzeWxyNw51"
)
val secretKeyOpt: Opts[Array[Byte]] =
Opts
.option[String]("sk", "Ed25519 32-byte secret key in base64", "s")
.mapValidated { s =>
val decoder = Base64.getDecoder
Validated.catchNonFatal {
decoder.decode(s)
}.leftMap(t => NonEmptyList.one("secret key isn't a valid base64 string: " + t.getMessage))
}
def spanToId: Span.S ~> Id = new (Span.S ~> Id) {
override def apply[A](span: Span.S[A]): Id[A] = {
@ -57,47 +40,54 @@ object RunOpts extends Logging {
}
}
val printAir: Opts[Boolean] =
Opts
.flag("print-air", "Prints generated AIR code before function execution")
.map(_ => true)
.withDefault(false)
// Checks if a path is a file and it exists and transforms it
def checkAndTransformFile[F[_]: Files: Concurrent, T](
path: String,
transform: Path => F[ValidatedNec[String, T]]
): F[ValidatedNec[String, T]] = {
val p = Path(path)
Files[F]
.exists(p)
.flatMap { exists =>
if (exists)
Files[F].isRegularFile(p).flatMap { isFile =>
if (isFile) {
transform(p)
} else {
invalidNec(s"Path '${p.toString}' is not a file").pure[F]
}
}
else {
invalidNec(s"There is no path '${p.toString}'").pure[F]
}
}
}
def dataFromFileOpt[F[_]: Files: Concurrent]: Opts[F[ValidatedNec[String, Option[js.Dynamic]]]] =
Opts
.option[String]("data-path", "Path to file with arguments map in JSON format", "p")
.map { str =>
val p = Path(str)
Files[F]
.exists(p)
.flatMap { exists =>
if (exists)
Files[F].isRegularFile(p).flatMap { isFile =>
if (isFile) {
Files[F]
.readAll(p)
.through(fs2.text.utf8.decode)
.fold(List.empty[String]) { case (acc, str) => str :: acc }
.map(_.mkString(""))
.map { jsonStr =>
Validated.catchNonFatal {
JSON.parse(jsonStr)
}.leftMap(t =>
NonEmptyChain
.one(s"Data in ${p.toString} isn't a valid JSON: " + t.getMessage)
)
}
.compile
.last
.map(_.map(_.map(v => Some(v))).getOrElse(validNec(None)))
} else {
invalidNec(s"Path '${p.toString}' is not a file").pure[F]
}
checkAndTransformFile(
str,
p => {
Files[F]
.readAll(p)
.through(fs2.text.utf8.decode)
.fold(List.empty[String]) { case (acc, str) => str :: acc }
.map(_.mkString(""))
.map { jsonStr =>
Validated.catchNonFatal {
JSON.parse(jsonStr)
}.leftMap(t =>
NonEmptyChain
.one(s"Data in ${p.toString} isn't a valid JSON: " + t.getMessage)
)
}
else {
invalidNec(s"There is no path '${p.toString}'").pure[F]
}
.compile
.last
.map(_.map(_.map(v => Some(v))).getOrElse(validNec(None)))
}
)
}
val dataOpt: Opts[js.Dynamic] =
@ -172,12 +162,14 @@ object RunOpts extends Logging {
(
AppOpts.inputOpts[F],
AppOpts.importOpts[F],
multiaddrOpt,
FluenceOpts.multiaddrOpt.withDefault(
"/dns4/kras-00.fluence.dev/tcp/19001/wss/p2p/12D3KooWR4cv1a8tv7pps4HH6wePNaK6gf1Hww5wcCMzeWxyNw51"
),
funcOpt,
timeoutOpt,
AppOpts.logLevelOpt,
printAir,
AppOpts.wrapWithOption(secretKeyOpt),
FluenceOpts.printAir,
AppOpts.wrapWithOption(FluenceOpts.secretKeyOpt),
AppOpts.wrapWithOption(dataOpt),
AppOpts.wrapWithOption(dataFromFileOpt[F])
).mapN {
@ -193,12 +185,7 @@ object RunOpts extends Logging {
dataFromArgument,
dataFromFileF
) =>
scribe.Logger.root
.clearHandlers()
.clearModifiers()
.withHandler(formatter = LogFormatter.formatter, minimumLevel = Some(logLevel))
.replace()
LogFormatter.initLogger(Some(logLevel))
for {
inputV <- inputF
impsV <- importF

View File

@ -1,16 +1,82 @@
package aqua.run
import aqua.backend.FunctionDef
import aqua.backend.air.FuncAirGen
import aqua.builder.{ArgumentGetter, Console, Finisher}
import aqua.io.OutputPrinter
import aqua.model.{ValueModel, VarModel}
import aqua.model.func.{Call, FuncCallable}
import aqua.model.func.raw.{CallArrowTag, FuncOp, FuncOps}
import aqua.model.transform.{Transform, TransformConfig}
import aqua.types.{ArrowType, BoxType, NilType, Type}
import cats.data.{Validated, ValidatedNec}
import cats.effect.kernel.Async
import cats.syntax.show.*
import scala.concurrent.ExecutionContext
import scala.scalajs.js
// Wraps function to run with service calls to run it with variables and output printing
object RunWrapper {
class Runner(
funcName: String,
funcCallable: FuncCallable,
multiaddr: String,
args: List[ValueModel],
config: RunConfig,
transformConfig: TransformConfig
) {
def resultVariableNames(funcCallable: FuncCallable, name: String): List[String] = {
funcCallable.arrowType.codomain.toList.zipWithIndex.map { case (t, idx) =>
name + idx
}
}
// Wraps function with necessary services, registers services and calls wrapped function with FluenceJS
def run[F[_]: Async]()(implicit ec: ExecutionContext): ValidatedNec[String, F[Unit]] = {
val resultNames = resultVariableNames(funcCallable, config.resultName)
val consoleService =
new Console(config.consoleServiceId, config.printFunctionName, resultNames)
val promiseFinisherService =
Finisher(config.finisherServiceId, config.finisherFnName)
// call an input function from a generated function
val callResult: ValidatedNec[String, F[Unit]] = wrapCall(
consoleService,
promiseFinisherService
).map { wrapped =>
genAirAndMakeCall[F](
wrapped,
consoleService,
promiseFinisherService
)
}
callResult
}
// Generates air from function, register all services and make a call through FluenceJS
private def genAirAndMakeCall[F[_]: Async](
wrapped: FuncCallable,
consoleService: Console,
finisherService: Finisher
)(implicit ec: ExecutionContext): F[Unit] = {
val funcRes = Transform.fn(wrapped, transformConfig)
val definitions = FunctionDef(funcRes)
val air = FuncAirGen(funcRes).generate.show
if (config.printAir) {
OutputPrinter.print(air)
}
FuncCaller.funcCall[F](
multiaddr,
air,
definitions,
config,
finisherService,
config.services :+ consoleService
)
}
// Creates getter services for variables. Return an error if there is no variable in services
// and type of this variable couldn't be optional
@ -41,11 +107,7 @@ object RunWrapper {
// res <- funcCallable(args:_*)
// Console.print(res)
// Finisher.finish()
def wrapCall(
funcName: String,
funcCallable: FuncCallable,
args: List[ValueModel],
config: RunConfig,
private def wrapCall(
consoleService: Console,
finisherService: Finisher
): ValidatedNec[String, FuncCallable] = {
@ -72,9 +134,12 @@ object RunWrapper {
val finisherServiceTag = finisherService.callTag()
val vars = args.zip(funcCallable.arrowType.domain.toList).collect {
case (VarModel(n, _, _), argType) => (n, argType)
}
val vars = args
.zip(funcCallable.arrowType.domain.toList)
.collect { case (VarModel(n, _, _), argType) =>
(n, argType)
}
.distinctBy(_._1)
val gettersV = getGettersForVars(vars, config.argumentGetters)
@ -92,4 +157,5 @@ object RunWrapper {
)
}
}
}

View File

@ -68,11 +68,7 @@ object AquaCli extends IOApp with Logging {
scriptOpt
).mapN {
case (inputF, importsF, outputF, toAirOp, toJs, noRelayOp, noXorOp, h, v, logLevel, constants, isDryRun, isScheduled) =>
scribe.Logger.root
.clearHandlers()
.clearModifiers()
.withHandler(formatter = LogFormatter.formatter, minimumLevel = Some(logLevel))
.replace()
LogFormatter.initLogger(Some(logLevel))
val toAir = toAirOp || isScheduled
val noXor = noXorOp || isScheduled

View File

@ -0,0 +1,29 @@
package aqua
import cats.data.{NonEmptyList, Validated}
import com.monovore.decline.Opts
import java.util.Base64
object FluenceOpts {
val multiaddrOpt: Opts[String] =
Opts
.option[String]("addr", "Relay multiaddress", "a")
val secretKeyOpt: Opts[Array[Byte]] =
Opts
.option[String]("sk", "Ed25519 32-byte secret key in base64", "s")
.mapValidated { s =>
val decoder = Base64.getDecoder
Validated.catchNonFatal {
decoder.decode(s)
}.leftMap(t => NonEmptyList.one("secret key isn't a valid base64 string: " + t.getMessage))
}
val printAir: Opts[Boolean] =
Opts
.flag("print-air", "Prints generated AIR code before function execution")
.map(_ => true)
.withDefault(false)
}

View File

@ -1,8 +1,19 @@
package aqua
import scribe.{Level, Logger}
import scribe.format.*
object LogFormatter {
val formatter: Formatter = formatter"$date ${string("[")}$levelColored${string("]")} $message$mdc"
val formatterWithFilename: Formatter = formatter"$date $fileName ${string("[")}$levelColored${string("]")} $message$mdc"
val formatterWithFilename: Formatter =
formatter"$date $fileName ${string("[")}$levelColored${string("]")} $message$mdc"
def initLogger(level: Option[Level]): Logger = {
scribe.Logger.root
.clearHandlers()
.clearModifiers()
.withHandler(formatter = formatter, minimumLevel = level)
.replace()
}
}

23
npm/aqua/ipfs.aqua Normal file
View File

@ -0,0 +1,23 @@
module IpfsExports
import "@fluencelabs/aqua-ipfs/ipfs-api.aqua"
import "@fluencelabs/aqua-ipfs/ipfs.aqua"
export uploadFile
data UploadResult:
error: string
cid: string
size: u64
service IPFS("ipfs"):
uploadFile(path: string, multiaddr: IpfsMultiaddrResult) -> UploadResult
func uploadFile(path: string) -> UploadResult:
on HOST_PEER_ID:
multiaddr <- get_external_api_multiaddr(HOST_PEER_ID)
result <- IPFS.uploadFile(path, multiaddr)
<- result

2371
npm/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -7,7 +7,8 @@
"aqua.js",
"index.js",
"error.js",
"utils.js"
"utils.js",
"dist/*"
],
"bin": {
"aqua": "index.js",
@ -15,11 +16,18 @@
},
"scripts": {
"run": "node index.js",
"from:scalajs": "cp ../cli/.js/target/scala-3.0.2/cli-opt/main.js ./aqua.js && npm run run -- $@"
"from:scalajs": "cp ../cli/.js/target/scala-3.1.0/cli-opt/main.js ./aqua.js && npm run build && npm run run -- $@",
"build": "tsc"
},
"dependencies": {
"@fluencelabs/aqua-ipfs": "0.5.2",
"@fluencelabs/aqua-lib": "0.2.1",
"@fluencelabs/fluence": "0.15.2",
"@fluencelabs/aqua-lib": "0.2.1"
"ipfs-http-client": "50.1.2"
},
"devDependencies": {
"ts-node": "^10.4.0",
"typescript": "^4.5.4"
},
"repository": {
"type": "git",

48
npm/src/ipfs.ts Normal file
View File

@ -0,0 +1,48 @@
import {create, globSource} from "ipfs-http-client";
import { Multiaddr, protocols } from "multiaddr";
import * as util from "util";
type UploadResult = {
cid: string,
size: number
}
export async function uploadFile(
path: string,
multiaddrResult: any,
infoLogger: (s: string) => void,
errorLogger: (s: string) => void
): Promise<UploadResult> {
let rpcAddr;
if (multiaddrResult.success) {
rpcAddr = multiaddrResult.multiaddr;
} else {
errorLogger(
"Failed to retrieve external api multiaddr"
);
throw multiaddrResult.error;
}
let rpcMaddr = new Multiaddr(rpcAddr).decapsulateCode(
protocols.names.p2p.code
);
// HACK: `as any` is needed because ipfs-http-client forgot to add `| Multiaddr` to the `create` types
const ipfs = create(rpcMaddr as any);
infoLogger("created ipfs client to " + rpcMaddr);
await ipfs.id();
infoLogger("connected to ipfs");
const source: any = await globSource(path)
const file = await ipfs.add(source);
infoLogger("file uploaded");
return {
cid: file.cid.toString(),
size: file.size
};
}

26
npm/tsconfig.json Normal file
View File

@ -0,0 +1,26 @@
{
"compilerOptions": {
"target": "esnext",
"module": "ESNext",
"lib": [
"ESNext"
],
"declaration": true,
"outDir": "dist",
"moduleResolution": "node",
"strict": true,
"esModuleInterop": true,
"noImplicitAny": false,
"strictNullChecks": false,
"skipLibCheck": true,
},
"include": ["src/**/*"],
"exclude": [
"node_modules",
"dist",
"bundle",
"src/__test__",
"src/compiled"
]
}