Introduce Seq, Par, Fold instructions (#5)

This commit is contained in:
vms 2020-10-08 12:43:23 +03:00 committed by GitHub
parent 8cec0163b1
commit 6e754f023f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 2681 additions and 696 deletions

43
.circleci/config.yml Normal file
View File

@ -0,0 +1,43 @@
version: 2
jobs:
aqua:
docker:
- image: circleci/rust:latest
resource_class: medium+
environment:
RUST_BACKTRACE: 1
RUST_TEST_THREADS: 1
steps:
- checkout
- restore_cache:
keys:
- aqua01-{{ checksum "Cargo.lock" }}
- run: |
rustup toolchain install nightly-2020-07-12-x86_64-unknown-linux-gnu
rustup default nightly-2020-07-12-x86_64-unknown-linux-gnu
rustup target add wasm32-wasi
rustup component add rustfmt
rustup component add clippy
cargo install fcli
cd stepper
fce build
cd ..
cargo fmt --all -- --check --color always
cargo build --release --all-features
cargo test --release --all-features
cargo clippy -v
- save_cache:
paths:
- ~/.cargo
- ~/.rustup
key: aqua01-{{ checksum "Cargo.lock" }}
workflows:
version: 2
arqada:
jobs:
- aqua

3
.gitignore vendored
View File

@ -2,3 +2,6 @@
/target /target
.DS_Store .DS_Store
.repl_history .repl_history
*.wasm
!./artifacts/*.wasm

1140
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,31 +1,14 @@
[package] [workspace]
name = "aquamarine" members = [
version = "0.1.0" "crates/test-module",
authors = ["Fluence Labs"] "crates/test-utils",
edition = "2018" "stepper",
]
[lib] [profile.release]
name = "aquamarine_client" opt-level = 3
crate-type = ["cdylib"] debug = false
path = "src/wasm_bindgen.rs" lto = true
debug-assertions = false
[[bin]] overflow-checks = false
name = "aquamarine" panic = "abort"
path = "src/fce.rs"
[[bin]]
name = "aqua_test_module"
path = "test_module/main.rs"
[dependencies]
fluence = { git = "https://github.com/fluencelabs/rust-sdk", features = ["logger"] }
serde = { version = "1.0.116", features = ["derive"] }
serde_derive = "1.0.116"
serde_sexpr = "0.1.0"
jsonpath_lib = "0.2.5"
log = "0.4.11"
serde_json = "1.0"
wasm-bindgen = "0.2.68"

View File

@ -11,4 +11,4 @@ name = "aquamarine"
logger_enabled = true logger_enabled = true
[module.wasi] [module.wasi]
envs = ["CURRENT_PEER_ID=asd"] envs = { "CURRENT_PEER_ID" = "some_peer_id" }

View File

@ -2,11 +2,36 @@
Aquamarine is a distributed choreography language & platform Aquamarine is a distributed choreography language & platform
## AIR
The current version supports the following instructions:
- call
- par
- seq
- fold
- next
- null
## Examples ## Examples
```lisp ```lisp
((call (%current% (local_service_id local_fn_name) () result_name)) (call (remote_peer_id (service_id fn_name) () g))) (seq (
(call (%current_peer_id1% (local_service_id local_fn_name) () result_name_1))
(call (remote_peer_id (service_id fn_name) () result_name_2))
)),
``` ```
This instruction sequence contains two call instructions:
This instruction sequence contains two call instructions in the sequential order:
1. call a function with `local_fn_name` name of a local service with `local_service_id` id and bind result to `result_name` 1. call a function with `local_fn_name` name of a local service with `local_service_id` id and bind result to `result_name`
2. call a remote peer with `remote_peer_id` id 2. call a remote peer with `remote_peer_id` id
```lisp
(fold (Iterable i
(seq (
(call (%current_peer_id% (local_service_id local_fn_name) (i) acc[]))
(next i)
)
)))
```
This example is an analog of left fold. It iterates over `Iterable` and on each iteration calls `local_service_id` and puts result to `acc`.

Binary file not shown.

View File

@ -0,0 +1,12 @@
[package]
name = "aqua-test-module"
version = "0.1.0"
authors = ["Fluence Labs"]
edition = "2018"
[[bin]]
name = "aqua_test_module"
path = "src/main.rs"
[dependencies]
fluence = { git = "https://github.com/fluencelabs/rust-sdk", features = ["logger"] }

View File

@ -0,0 +1,49 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#![warn(rust_2018_idioms)]
#![deny(
dead_code,
nonstandard_style,
unused_imports,
unused_mut,
unused_variables,
unused_unsafe,
unreachable_patterns
)]
use fluence::fce;
fn main() {}
#[fce]
pub struct CallServiceResult {
pub ret_code: i32,
pub result: String,
}
#[fce]
pub fn call_service(service_id: String, fn_name: String, args: String) -> CallServiceResult {
println!(
"call service invoked with:\n service_id: {}\n fn_name: {}\n args: {:?}",
service_id, fn_name, args
);
CallServiceResult {
ret_code: 0,
result: String::from("[\"result string\"]"),
}
}

View File

@ -0,0 +1,14 @@
[package]
name = "aqua-test-utils"
version = "0.1.0"
authors = ["Fluence Labs"]
edition = "2018"
[lib]
name = "aqua_test_utils"
path = "src/lib.rs"
[dependencies]
fluence = { git = "https://github.com/fluencelabs/rust-sdk", features = ["logger"] }
aquamarine-vm = { git = "https://github.com/fluencelabs/fce" }
serde_json = "1.0.56"

View File

@ -0,0 +1,99 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#![warn(rust_2018_idioms)]
#![deny(
dead_code,
nonstandard_style,
unused_imports,
unused_mut,
unused_variables,
unused_unsafe,
unreachable_patterns
)]
use aquamarine_vm::vec1::Vec1;
use aquamarine_vm::AquamarineVM;
use aquamarine_vm::AquamarineVMConfig;
use aquamarine_vm::HostExportedFunc;
use aquamarine_vm::HostImportDescriptor;
use aquamarine_vm::IType;
use aquamarine_vm::IValue;
use std::path::PathBuf;
pub fn create_aqua_vm(call_service: HostExportedFunc) -> AquamarineVM {
let call_service_descriptor = HostImportDescriptor {
host_exported_func: call_service,
argument_types: vec![IType::String, IType::String, IType::String],
output_type: Some(IType::Record(0)),
error_handler: None,
};
let config = AquamarineVMConfig {
aquamarine_wasm_path: PathBuf::from("../target/wasm32-wasi/debug/aquamarine.wasm"),
call_service: call_service_descriptor,
current_peer_id: String::from("test_peer_id"),
};
AquamarineVM::new(config).expect("vm should be created")
}
pub fn unit_call_service() -> HostExportedFunc {
Box::new(|_, _| -> Option<IValue> {
Some(IValue::Record(
Vec1::new(vec![
IValue::S32(0),
IValue::String(String::from("\"test\"")),
])
.unwrap(),
))
})
}
pub fn echo_string_call_service() -> HostExportedFunc {
Box::new(|_, args| -> Option<IValue> {
let arg = match &args[2] {
IValue::String(str) => str,
_ => unreachable!(),
};
let arg: Vec<String> = serde_json::from_str(arg).unwrap();
Some(IValue::Record(
Vec1::new(vec![
IValue::S32(0),
IValue::String(format!("\"{}\"", arg[0])),
])
.unwrap(),
))
})
}
pub fn echo_number_call_service() -> HostExportedFunc {
Box::new(|_, args| -> Option<IValue> {
let arg = match &args[2] {
IValue::String(str) => str,
_ => unreachable!(),
};
let arg: Vec<String> = serde_json::from_str(arg).unwrap();
Some(IValue::Record(
Vec1::new(vec![IValue::S32(0), IValue::String(arg[0].clone())]).unwrap(),
))
})
}

View File

@ -1,170 +0,0 @@
/*
null
out: a!b . P
inp: a(b). P <- barrier
par: P | Q
new: nx . P <- restriction
rep: !P
sum: P + Q
(seq --- arrow
(proc -> new x)
(proc that uses x)
)
Scope:
a <- A
a -> B
(seq
(par
x <- P
y <- Q
)
(call fn [x y] z)
)
(seq
(call fn1 [] x)
(call fn2 [x] y)
)
(seq P Q) <- scope
(par P Q) <- independent
(xor P Q) <- lazy
-- fire and forget (any)
(seq
(par
x <- P
y <- Q
)
(call fn [] z)
)
-- join
(seq
(par
x <- P
y <- Q
)
(call noop [x y])
)
-- any (fastest)
(seq
(par
x[] <- P
x[] <- Q
)
(call fn [x.0])
)
data - HashMap<String, serde_json::Value>
x.h.a.v -> (x - key in data, h.a.v - json path)
0
1
(call 1 (counter inc) [x.a] b)
(call 0 (response inc) [b] _)
-- any (fastest) -- does not work
(seq
(seq
(par
x <- P
y <- Q
)
(xor
(call fn [x.h.a.v, x.a.b.n] z)
(call fn [y] z)
)
)
(call fn11 [z])
)
ITERATORS
(seq
(fold Iterable i
(par
(call fn [i] acc[])
(next i)
)
)
(match acc.length 3
(call fnAgg [acc] y)
)
)
(par
(call fn [i.0] acc[])
(par
(call fn [i.1] acc[])
(par
(call fn [i.2 acc[])
(fold Iterable[3,..] i
(par
(call fn [i] acc[])
(next i)
)
)
)
)
)
(seq
(fold Iterable i
(seq
(call fn [i acc] acc[])
(next i)
)
)
(call fnAgg [acc] y)
)
(seq
(fold Iterable i
(xor
(call fn [i] res)
(next i)
)
)
(call fnAgg [res] y)
)
*/
/*
Addressing:
To address a code we need to declare:
(peer_pk, srv_id, fn_name)
(call PEER_PART FN_PART [args] res_name)
(current)
(pk $pk)
(pk $pk $srv_id)
PEER_PART: resolves to (peer_pk) \/ (peer_pk, pk_srv_id)
(fn $name)
(fn $name $srv_id)
FN_PART: resolves to (fn_name) \/ (fn_srv_id, fn_name)
Call resolves to:
(peer_pk, fn_srv_id || pk_srv_id, fn_name)
If !fn_srv_id && !pk_srv_id <- error
(call (current) (fn "resolve" "by_pk") [pk])
*/

View File

@ -1,120 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::StepperOutcome;
use jsonpath_lib::JsonPathError;
use serde_json::Error as SerdeJsonError;
use serde_sexpr::Error as SExprError;
use std::convert::Into;
use std::env::VarError;
use std::error::Error;
#[derive(Debug)]
pub enum AquamarineError {
/// Errors occurred while parsing aqua script in the form of S expressions.
SExprParseError(SExprError),
/// Errors occurred while parsing supplied data.
DataParseError(SerdeJsonError),
/// Indicates that environment variable with name CURRENT_PEER_ID isn't set.
CurrentPeerIdNotSet(VarError),
/// Semantic errors in instructions.
InstructionError(String),
/// Semantic errors in instructions.
LocalServiceError(String),
/// Value with such name isn't presence in data.
VariableNotFound(String),
/// Value with such path isn't found in data with such error.
VariableNotInJsonPath(String, JsonPathError),
/// Multiple values found for such json path.
MultipleValuesInJsonPath(String),
}
impl Error for AquamarineError {}
impl std::fmt::Display for AquamarineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
AquamarineError::SExprParseError(err) => write!(f, "{}", err),
AquamarineError::DataParseError(err) => write!(f, "{}", err),
AquamarineError::CurrentPeerIdNotSet(err) => write!(f, "{}", err),
AquamarineError::InstructionError(err_msg) => write!(f, "{}", err_msg),
AquamarineError::LocalServiceError(err_msg) => write!(f, "{}", err_msg),
AquamarineError::VariableNotFound(variable_name) => write!(
f,
"variable with name {} isn't present in data",
variable_name
),
AquamarineError::VariableNotInJsonPath(json_path, json_path_err) => write!(
f,
"variable with path {} not found with error: {}",
json_path, json_path_err
),
AquamarineError::MultipleValuesInJsonPath(json_path) => write!(
f,
"multiple variables found for this json path {}",
json_path
),
}
}
}
impl From<SExprError> for AquamarineError {
fn from(err: SExprError) -> Self {
AquamarineError::SExprParseError(err)
}
}
impl From<SerdeJsonError> for AquamarineError {
fn from(err: SerdeJsonError) -> Self {
AquamarineError::DataParseError(err)
}
}
impl From<std::convert::Infallible> for AquamarineError {
fn from(_: std::convert::Infallible) -> Self {
unreachable!()
}
}
impl Into<StepperOutcome> for AquamarineError {
fn into(self) -> StepperOutcome {
let ret_code = match self {
AquamarineError::SExprParseError(_) => 1,
AquamarineError::DataParseError(..) => 2,
AquamarineError::CurrentPeerIdNotSet(..) => 3,
AquamarineError::InstructionError(..) => 4,
AquamarineError::LocalServiceError(..) => 5,
AquamarineError::VariableNotFound(..) => 6,
AquamarineError::VariableNotInJsonPath(..) => 7,
AquamarineError::MultipleValuesInJsonPath(..) => 8,
};
StepperOutcome {
ret_code,
data: format!("{}", self),
next_peer_pks: vec![],
}
}
}

View File

@ -1,144 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::AquaData;
use crate::AquamarineError;
use crate::Result;
use serde_derive::Deserialize;
use serde_derive::Serialize;
const CURRENT_PEER_ALIAS: &str = "%current%";
const CURRENT_PEER_ID_ENV_NAME: &str = "CURRENT_PEER_ID";
/*
(current)
(pk $pk)
(pk $pk $srv_id)
PEER_PART: resolves to (peer_pk) \/ (peer_pk, pk_srv_id)
(fn $name)
(fn $name $srv_id)
FN_PART: resolves to (fn_name) \/ (fn_srv_id, fn_name)
*/
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum PeerPart {
PeerPk(String),
PeerPkWithPkServiceId(String, String),
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum FunctionPart {
FuncName(String),
ServiceIdWithFuncName(String, String),
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub(crate) struct Call(PeerPart, FunctionPart, Vec<String>, String);
impl super::ExecutableInstruction for Call {
fn execute(self, data: &mut AquaData, next_peer_pks: &mut Vec<String>) -> Result<()> {
log::info!("call called with data: {:?} and next_peer_pks: {:?}", data, next_peer_pks);
let (peer_pk, service_id, func_name) = parse_peer_fn_parts(self.0, self.1)?;
let function_args = parse_args(self.2, data)?;
let function_args = serde_json::to_string(&function_args)?;
let result_name = parse_result_name(self.3)?;
let current_peer_id = std::env::var(CURRENT_PEER_ID_ENV_NAME)
.map_err(|e| AquamarineError::CurrentPeerIdNotSet(e))?;
if peer_pk == current_peer_id || peer_pk == CURRENT_PEER_ALIAS {
let result = unsafe { crate::call_service(service_id, func_name, function_args) };
if result.ret_code != crate::CALL_SERVICE_SUCCESS {
return Err(AquamarineError::LocalServiceError(result.result));
}
let result: serde_json::Value = serde_json::from_str(&result.result)?;
data.insert(result_name, result);
} else {
next_peer_pks.push(peer_pk);
}
Ok(())
}
}
#[rustfmt::skip]
fn parse_peer_fn_parts(
peer_part: PeerPart,
fn_part: FunctionPart,
) -> Result<(String, String, String)> {
match (peer_part, fn_part) {
(PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::ServiceIdWithFuncName(_service_id, func_name)) => {
Ok((peer_pk, peer_service_id, func_name))
},
(PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::FuncName(func_name)) => {
Ok((peer_pk, peer_service_id, func_name))
},
(PeerPart::PeerPk(peer_pk), FunctionPart::ServiceIdWithFuncName(service_id, func_name)) => {
Ok((peer_pk, service_id, func_name))
}
(PeerPart::PeerPk(_), FunctionPart::FuncName(_)) => Err(AquamarineError::InstructionError(
String::from("call should have service id specified by peer part or function part"),
)),
}
}
fn parse_args(args: Vec<String>, data: &AquaData) -> Result<serde_json::Value> {
let mut result = Vec::with_capacity(args.len());
for arg in args {
let mut split_arg: Vec<&str> = arg.splitn(1, '.').collect();
let variable_name = split_arg.remove(0);
let value_by_key = data
.get(variable_name)
.ok_or_else(|| AquamarineError::VariableNotFound(String::from(variable_name)))?;
let value = if !split_arg.is_empty() {
let json_path = split_arg.remove(0);
let values = jsonpath_lib::select(&value_by_key, json_path)
.map_err(|e| AquamarineError::VariableNotInJsonPath(String::from(json_path), e))?;
if values.len() != 1 {
return Err(AquamarineError::MultipleValuesInJsonPath(String::from(
json_path,
)));
}
values[0].clone()
} else {
value_by_key.clone()
};
result.push(value);
}
Ok(serde_json::Value::Array(result))
}
fn parse_result_name(result_name: String) -> Result<String> {
if !result_name.is_empty() {
Ok(result_name)
} else {
Err(AquamarineError::InstructionError(String::from(
"result name of a call instruction must be non empty",
)))
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod call;
mod null;
pub(self) use crate::stepper::ExecutableInstruction;
use crate::AquaData;
use crate::Result;
use call::Call;
use null::Null;
use serde_derive::Deserialize;
use serde_derive::Serialize;
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub(crate) enum Instruction {
Null(Null),
Call(Call),
/*
Par(Box<Instruction>, Box<Instruction>),
Seq(Box<Instruction>, Box<Instruction>),
*/
}
impl ExecutableInstruction for Instruction {
fn execute(self, data: &mut AquaData, next_peer_pks: &mut Vec<String>) -> Result<()> {
match self {
Instruction::Null(null) => null.execute(data, next_peer_pks),
Instruction::Call(call) => call.execute(data, next_peer_pks),
}
}
}

View File

@ -1,51 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::StepperOutcome;
use crate::instructions::Instruction;
use crate::AquaData;
use crate::Result;
pub(crate) fn execute_aqua(init_user_id: String, aqua: String, data: String) -> StepperOutcome {
log::info!(
"stepper invoked with user_id = {}, aqua = {:?}, data = {:?}",
init_user_id,
aqua,
data
);
execute_aqua_impl(init_user_id, aqua, data).unwrap_or_else(Into::into)
}
fn execute_aqua_impl(init_user_id: String, aqua: String, data: String) -> Result<StepperOutcome> {
let mut parsed_data: AquaData = serde_json::from_str(&data)?;
let parsed_aqua = serde_sexpr::from_str::<Vec<Instruction>>(&aqua)?;
log::info!(
"parsed_aqua: {:?}\nparsed_data: {:?}",
parsed_aqua,
parsed_data
);
let next_peer_pks = super::stepper::execute(parsed_aqua, &mut parsed_data)?;
let data = serde_json::to_string(&parsed_data)?;
Ok(StepperOutcome {
ret_code: 0,
data,
next_peer_pks,
})
}

View File

@ -1,25 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod execution;
mod stepper;
mod stepper_outcome;
pub use stepper_outcome::StepperOutcome;
pub use stepper_outcome::SUCCESS_ERROR_CODE;
pub(crate) use execution::execute_aqua;
pub(crate) use stepper::ExecutableInstruction;

View File

@ -1,33 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::instructions::Instruction;
use crate::AquaData;
use crate::Result;
pub(crate) trait ExecutableInstruction {
fn execute(self, data: &mut AquaData, next_peer_pks: &mut Vec<String>) -> Result<()>;
}
pub(crate) fn execute(instructions: Vec<Instruction>, data: &mut AquaData) -> Result<Vec<String>> {
let mut next_peer_pks = Vec::new();
for instruction in instructions {
instruction.execute(data, &mut next_peer_pks)?;
}
Ok(next_peer_pks)
}

34
stepper/Cargo.toml Normal file
View File

@ -0,0 +1,34 @@
[package]
name = "aquamarine"
version = "0.1.0"
authors = ["Fluence Labs"]
edition = "2018"
[lib]
name = "aquamarine_client"
crate-type = ["cdylib"]
path = "src/wasm_bindgen.rs"
[[bin]]
name = "aquamarine"
path = "src/fce.rs"
[dependencies]
fluence = { git = "https://github.com/fluencelabs/rust-sdk", features = ["logger"] }
serde = { version = "1.0.116", features = [ "derive", "rc" ] }
serde_derive = "1.0.116"
serde_sexpr = "0.1.0"
jsonpath_lib = "0.2.5"
log = "0.4.11"
serde_json = "1.0"
wasm-bindgen = "0.2.68"
[dev_dependencies]
aqua-test-utils = { path = "../crates/test-utils" }
aquamarine-vm = { git = "https://github.com/fluencelabs/fce" }
env_logger = "0.7.1"
serde_json = "1.0.56"

274
stepper/src/air/call.rs Normal file
View File

@ -0,0 +1,274 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionContext;
use crate::AquamarineError;
use crate::JValue;
use crate::Result;
use serde_derive::Deserialize;
use serde_derive::Serialize;
const CURRENT_PEER_ALIAS: &str = "%current_peer_id%";
/*
(current)
(pk $pk)
(pk $pk $srv_id)
PEER_PART: resolves to (peer_pk) \/ (peer_pk, pk_srv_id)
(fn $name)
(fn $name $srv_id)
FN_PART: resolves to (fn_name) \/ (fn_srv_id, fn_name)
*/
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum PeerPart {
PeerPk(String),
PeerPkWithPkServiceId(String, String),
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum FunctionPart {
FuncName(String),
ServiceIdWithFuncName(String, String),
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub(crate) struct Call(PeerPart, FunctionPart, Vec<String>, String);
impl super::ExecutableInstruction for Call {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("call {:?} is called with context {:?}", self, ctx);
let peer_part = &self.0;
let function_part = &self.1;
let arguments = &self.2;
let result_variable_name = &self.3;
let (peer_pk, service_id, func_name) = parse_peer_fn_parts(peer_part, function_part)?;
let function_args = parse_args(arguments, ctx)?;
let function_args = serde_json::to_string(&function_args)
.map_err(|e| AquamarineError::FuncArgsSerdeError(function_args, e))?;
let result_variable_name = parse_result_variable_name(result_variable_name)?;
if peer_pk == ctx.current_peer_id || peer_pk == CURRENT_PEER_ALIAS {
let result = unsafe {
crate::call_service(service_id.to_string(), func_name.to_string(), function_args)
};
if result.ret_code != crate::CALL_SERVICE_SUCCESS {
return Err(AquamarineError::LocalServiceError(result.result));
}
let result: JValue = serde_json::from_str(&result.result)
.map_err(|e| AquamarineError::CallServiceSerdeError(result, e))?;
set_result(ctx, result_variable_name, result)?;
} else {
ctx.next_peer_pks.push(peer_pk.to_string());
}
Ok(())
}
}
#[rustfmt::skip]
fn parse_peer_fn_parts<'a>(
peer_part: &'a PeerPart,
fn_part: &'a FunctionPart,
) -> Result<(&'a str, &'a str, &'a str)> {
match (peer_part, fn_part) {
(PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::ServiceIdWithFuncName(_service_id, func_name)) => {
Ok((peer_pk, peer_service_id, func_name))
},
(PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::FuncName(func_name)) => {
Ok((peer_pk, peer_service_id, func_name))
},
(PeerPart::PeerPk(peer_pk), FunctionPart::ServiceIdWithFuncName(service_id, func_name)) => {
Ok((peer_pk, service_id, func_name))
}
(PeerPart::PeerPk(_), FunctionPart::FuncName(_)) => Err(AquamarineError::InstructionError(
String::from("call should have service id specified by peer part or function part"),
)),
}
}
#[rustfmt::skip]
fn parse_args(args: &[String], ctx: &ExecutionContext) -> Result<JValue> {
let mut result = Vec::with_capacity(args.len());
for arg in args {
let mut split_arg: Vec<&str> = arg.splitn(2, '.').collect();
let variable_name = split_arg.remove(0);
let value_by_key = match (ctx.data.get(variable_name), ctx.folds.get(variable_name)) {
(_, Some(fold_state)) => match ctx.data.get(&fold_state.iterable_name) {
Some(JValue::Array(values)) => &values[fold_state.cursor],
Some(v) => return Err(AquamarineError::IncompatibleJValueType(v.clone(), String::from("array"))),
None => return Err(AquamarineError::VariableNotFound(fold_state.iterable_name.clone())),
},
(Some(value), None) => value,
(None, None) => return Err(AquamarineError::VariableNotFound(variable_name.to_string())),
};
let value = if !split_arg.is_empty() {
let json_path = split_arg.remove(0);
let values = jsonpath_lib::select(value_by_key, json_path)
.map_err(|e| AquamarineError::VariableNotInJsonPath(String::from(json_path), e))?;
if values.len() != 1 {
return Err(AquamarineError::MultipleValuesInJsonPath(String::from(
json_path,
)));
}
values[0].clone()
} else {
value_by_key.clone()
};
result.push(value);
}
Ok(JValue::Array(result))
}
fn parse_result_variable_name(result_name: &str) -> Result<&str> {
if !result_name.is_empty() {
Ok(result_name)
} else {
Err(AquamarineError::InstructionError(String::from(
"result name of a call instruction must be non empty",
)))
}
}
fn set_result(
ctx: &mut ExecutionContext,
result_variable_name: &str,
result: JValue,
) -> Result<()> {
use std::collections::hash_map::Entry;
let is_array = result_variable_name.ends_with("[]");
if !is_array {
if ctx
.data
.insert(result_variable_name.to_string(), result)
.is_some()
{
return Err(AquamarineError::MultipleVariablesFound(
result_variable_name.to_string(),
));
}
return Ok(());
}
match ctx
.data
// unwrap is safe because it's been checked for []
.entry(result_variable_name.strip_suffix("[]").unwrap().to_string())
{
Entry::Occupied(mut entry) => match entry.get_mut() {
JValue::Array(values) => values.push(result),
v => {
return Err(AquamarineError::IncompatibleJValueType(
v.clone(),
String::from("Array"),
))
}
},
Entry::Vacant(entry) => {
entry.insert(JValue::Array(vec![result]));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use crate::JValue;
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::echo_string_call_service;
use serde_json::json;
#[test]
fn current_peer_id_call() {
let mut vm = create_aqua_vm(echo_string_call_service());
let script = String::from(
r#"
(call (%current_peer_id% (local_service_id local_fn_name) (value) result_name))
"#,
);
let res = vm
.call(json!([
String::from("asd"),
script,
String::from("{\"value\": \"test\"}"),
]))
.expect("call should be successful");
let res: JValue = serde_json::from_str(&res.data).unwrap();
assert_eq!(res.get("result_name").unwrap(), &json!("test"));
let script = String::from(
r#"
(call (test_peer_id (local_service_id local_fn_name) (value) result_name))
"#,
);
let res = vm
.call(json!([
String::from("asd"),
script,
String::from("{\"value\": \"test\"}"),
]))
.expect("call should be successful");
let res: JValue = serde_json::from_str(&res.data).unwrap();
assert_eq!(res.get("result_name").unwrap(), &json!("test"));
}
#[test]
fn remote_peer_id_call() {
let mut vm = create_aqua_vm(echo_string_call_service());
let remote_peer_id = String::from("some_remote_peer_id");
let script = format!(
"(call ({} (local_service_id local_fn_name) (value) result_name))",
remote_peer_id
);
let res = vm
.call(json!([
String::from("asd"),
script,
String::from("{\"value\": \"test\"}"),
]))
.expect("call should be successful");
assert_eq!(res.next_peer_pks, vec![remote_peer_id]);
}
}

272
stepper/src/air/fold.rs Normal file
View File

@ -0,0 +1,272 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionContext;
use super::Instruction;
use crate::AquamarineError;
use crate::JValue;
use crate::Result;
use serde_derive::Deserialize;
use serde_derive::Serialize;
use std::rc::Rc;
/*
(fold Iterable i
(par
(call fn [i] acc[])
(next i)
)
)
*/
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub(crate) struct Fold(String, String, Rc<Instruction>);
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub(crate) struct Next(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct FoldState {
pub(crate) cursor: usize,
pub(crate) iterable_name: String,
pub(crate) instr_head: Rc<Instruction>,
}
impl super::ExecutableInstruction for Fold {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("fold {:?} is called with context {:?}", self, ctx);
let iterable_name = &self.0;
let iterator_name = &self.1;
let instr_head = self.2.clone();
// check that value exists and has array type
match ctx.data.get(iterable_name) {
Some(JValue::Array(_)) => {}
Some(v) => {
return Err(AquamarineError::IncompatibleJValueType(
v.clone(),
String::from("Array"),
))
}
None => {
return Err(AquamarineError::VariableNotFound(String::from(
iterable_name,
)))
}
};
let fold_state = FoldState {
cursor: 0,
iterable_name: iterable_name.clone(),
instr_head: instr_head.clone(),
};
if ctx
.folds
.insert(iterator_name.clone(), fold_state)
.is_some()
{
return Err(AquamarineError::MultipleFoldStates(iterable_name.clone()));
}
instr_head.execute(ctx)?;
ctx.folds.remove(iterator_name);
Ok(())
}
}
impl super::ExecutableInstruction for Next {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("next {:?} is called with context {:?}", self, ctx);
let iterator_name = &self.0;
let fold_state = ctx
.folds
.get_mut(iterator_name)
.ok_or_else(|| AquamarineError::FoldStateNotFound(iterator_name.clone()))?;
let value = ctx
.data
.get(&fold_state.iterable_name)
.expect("this has been checked on the fold instruction");
let value_len = match value {
JValue::Array(array) => array.len(),
_ => unreachable!(),
};
fold_state.cursor += 1;
if value_len == 0 || value_len <= fold_state.cursor {
fold_state.cursor -= 1;
// just do nothing to exit
return Ok(());
}
let next_instr = fold_state.instr_head.clone();
next_instr.execute(ctx)?;
// get the same fold state again because of borrow checker
match ctx.folds.get_mut(iterator_name) {
Some(fold_state) => fold_state.cursor -= 1,
_ => unreachable!("iterator value shouldn't changed inside fold"),
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::JValue;
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::echo_number_call_service;
use aquamarine_vm::AquamarineVMError;
use aquamarine_vm::StepperError;
use serde_json::json;
#[test]
fn lfold() {
let mut vm = create_aqua_vm(echo_number_call_service());
let lfold = String::from(
r#"
(fold (Iterable i
(seq (
(call (%current_peer_id% (local_service_id local_fn_name) (i) acc[]))
(next i)
)
)))"#,
);
let res = vm
.call(json!([
String::from("asd"),
lfold,
String::from("{\"Iterable\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}"),
]))
.expect("call should be successful");
let res: JValue = serde_json::from_str(&res.data).unwrap();
assert_eq!(res.get("acc").unwrap(), &json!([1, 2, 3, 4, 5]));
}
#[test]
fn rfold() {
let mut vm = create_aqua_vm(echo_number_call_service());
let rfold = String::from(
r#"
(fold (Iterable i
(seq (
(next i)
(call (%current_peer_id% (local_service_id local_fn_name) (i) acc[]))
)
)))"#,
);
let res = vm
.call(json!([
String::from("asd"),
rfold,
String::from("{\"Iterable\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}"),
]))
.expect("call should be successful");
let res: JValue = serde_json::from_str(&res.data).unwrap();
assert_eq!(res.get("acc").unwrap(), &json!([5, 4, 3, 2, 1]));
}
#[test]
fn inner_fold() {
let mut vm = create_aqua_vm(echo_number_call_service());
let script = String::from(
r#"
(fold (Iterable1 i
(seq (
(fold (Iterable2 j
(seq (
(call (%current_peer_id% (local_service_id local_fn_name) (i) acc[]))
(next j)
))
))
(next i)
))
))"#,
);
let res = vm
.call(json!([
String::from("asd"),
script,
String::from("{\"Iterable1\": [\"1\",\"2\",\"3\",\"4\",\"5\"], \"Iterable2\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}"),
]))
.expect("call should be successful");
let res: JValue = serde_json::from_str(&res.data).unwrap();
assert_eq!(
res.get("acc").unwrap(),
&json!([1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5])
);
}
#[test]
fn inner_fold_with_same_iterator() {
let mut vm = create_aqua_vm(echo_number_call_service());
let script = String::from(
r#"
(fold (Iterable1 i
(seq (
(fold (Iterable2 i
(seq (
(call (%current_peer_id% (local_service_id local_fn_name) (i) acc[]))
(next i)
))
))
(next i)
))
))"#,
);
let res = vm
.call(json!([
String::from("asd"),
script,
String::from("{\"Iterable1\": [\"1\",\"2\",\"3\",\"4\",\"5\"], \"Iterable2\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}"),
]));
assert!(res.is_err());
let error = res.err().unwrap();
let error = match error {
AquamarineVMError::StepperError(error) => error,
_ => unreachable!(),
};
assert_eq!(
error,
StepperError::MultipleFoldStates(String::from(
"multiple fold states found for iterable Iterable2"
))
);
}
}

82
stepper/src/air/mod.rs Normal file
View File

@ -0,0 +1,82 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod call;
mod fold;
mod null;
mod par;
mod seq;
use crate::AquaData;
use crate::Result;
use call::Call;
use fold::Fold;
use fold::FoldState;
use fold::Next;
use null::Null;
use par::Par;
use seq::Seq;
use serde_derive::Deserialize;
use serde_derive::Serialize;
use std::collections::HashMap;
#[derive(Clone, Default, Debug)]
pub(super) struct ExecutionContext {
pub data: AquaData,
pub next_peer_pks: Vec<String>,
pub current_peer_id: String,
pub folds: HashMap<String, FoldState>,
}
impl ExecutionContext {
pub(super) fn new(data: AquaData, current_peer_id: String) -> Self {
Self {
data,
next_peer_pks: vec![],
current_peer_id,
folds: HashMap::new(),
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub(crate) enum Instruction {
Null(Null),
Call(Call),
Fold(Fold),
Next(Next),
Par(Par),
Seq(Seq),
}
pub(crate) trait ExecutableInstruction {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()>;
}
impl ExecutableInstruction for Instruction {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
match self {
Instruction::Null(null) => null.execute(ctx),
Instruction::Call(call) => call.execute(ctx),
Instruction::Fold(fold) => fold.execute(ctx),
Instruction::Next(next) => next.execute(ctx),
Instruction::Par(par) => par.execute(ctx),
Instruction::Seq(seq) => seq.execute(ctx),
}
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
use crate::AquaData; use super::ExecutionContext;
use crate::Result; use crate::Result;
use serde_derive::Deserialize; use serde_derive::Deserialize;
@ -24,8 +24,8 @@ use serde_derive::Serialize;
pub(crate) struct Null {} pub(crate) struct Null {}
impl super::ExecutableInstruction for Null { impl super::ExecutableInstruction for Null {
fn execute(self, data: &mut AquaData, next_peer_pks: &mut Vec<String>) -> Result<()> { fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("null called with data: {:?} and next_peer_pks: {:?}", data, next_peer_pks); log::info!("null is called with context: {:?}", ctx);
Ok(()) Ok(())
} }

73
stepper/src/air/par.rs Normal file
View File

@ -0,0 +1,73 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionContext;
use super::Instruction;
use crate::Result;
use serde_derive::Deserialize;
use serde_derive::Serialize;
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub(crate) struct Par(Box<Instruction>, Box<Instruction>);
impl super::ExecutableInstruction for Par {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("par is called with context: {:?}", ctx);
self.0.execute(ctx)?;
self.1.execute(ctx)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::unit_call_service;
use aquamarine_vm::StepperOutcome;
use serde_json::json;
#[test]
fn par() {
let mut vm = create_aqua_vm(unit_call_service());
let script = String::from(
r#"
(par (
(call (remote_peer_id_1 (local_service_id local_fn_name) () result_name))
(call (remote_peer_id_2 (service_id fn_name) () g))
))"#,
);
let res = vm
.call(json!([String::from("asd"), script, String::from("{}"),]))
.expect("call should be successful");
assert_eq!(
res,
StepperOutcome {
data: String::from("{}"),
next_peer_pks: vec![
String::from("remote_peer_id_1"),
String::from("remote_peer_id_2")
]
}
);
}
}

74
stepper/src/air/seq.rs Normal file
View File

@ -0,0 +1,74 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionContext;
use super::Instruction;
use crate::Result;
use serde_derive::Deserialize;
use serde_derive::Serialize;
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub(crate) struct Seq(Box<Instruction>, Box<Instruction>);
impl super::ExecutableInstruction for Seq {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("seq is called with context: {:?}", ctx);
let pks_count_before_call = ctx.next_peer_pks.len();
self.0.execute(ctx)?;
if pks_count_before_call == ctx.next_peer_pks.len() {
self.1.execute(ctx)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::unit_call_service;
use aquamarine_vm::StepperOutcome;
use serde_json::json;
#[test]
fn par() {
let mut vm = create_aqua_vm(unit_call_service());
let script = String::from(
r#"
(seq (
(call (remote_peer_id_1 (local_service_id local_fn_name) () result_name))
(call (remote_peer_id_2 (service_id fn_name) () g))
))"#,
);
let res = vm
.call(json!([String::from("asd"), script, String::from("{}"),]))
.expect("call should be successful");
assert_eq!(
res,
StepperOutcome {
data: String::from("{}"),
next_peer_pks: vec![String::from("remote_peer_id_1")]
}
);
}
}

View File

@ -14,20 +14,21 @@
* limitations under the License. * limitations under the License.
*/ */
/// This file contains defines similar for both FCE and browser targets.
pub(crate) type Result<T> = std::result::Result<T, AquamarineError>;
pub(crate) type AquaData = std::collections::HashMap<String, serde_json::Value>;
pub(crate) use crate::errors::AquamarineError;
pub(crate) use crate::stepper::StepperOutcome;
pub(crate) const CALL_SERVICE_SUCCESS: i32 = 0;
use serde_derive::Deserialize; use serde_derive::Deserialize;
use serde_derive::Serialize; use serde_derive::Serialize;
/// This file contains defines of some things similar both for FCE and browser targets.
pub(crate) type Result<T> = std::result::Result<T, AquamarineError>;
pub(crate) type AquaData = std::collections::HashMap<String, JValue>;
pub(crate) type JValue = serde_json::Value;
pub(crate) use crate::errors::AquamarineError;
pub(crate) use crate::stepper_outcome::StepperOutcome;
pub(crate) const CALL_SERVICE_SUCCESS: i32 = 0;
#[fluence::fce] #[fluence::fce]
#[derive(Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CallServiceResult { pub struct CallServiceResult {
pub ret_code: i32, pub ret_code: i32,
pub result: String, pub result: String,

182
stepper/src/errors.rs Normal file
View File

@ -0,0 +1,182 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::CallServiceResult;
use crate::JValue;
use crate::StepperOutcome;
use jsonpath_lib::JsonPathError;
use serde_json::Error as SerdeJsonError;
use serde_sexpr::Error as SExprError;
use std::convert::Into;
use std::env::VarError;
use std::error::Error;
#[derive(Debug)]
pub(crate) enum AquamarineError {
/// Errors occurred while parsing aqua script in the form of S expressions.
SExprParseError(SExprError),
/// Errors occurred while parsing aqua data.
DataSerdeError(SerdeJsonError),
/// Errors occurred while parsing function arguments of an expression.
FuncArgsSerdeError(JValue, SerdeJsonError),
/// Errors occurred while parsing returned by call_service value.
CallServiceSerdeError(CallServiceResult, SerdeJsonError),
/// Indicates that environment variable with name CURRENT_PEER_ID isn't set.
CurrentPeerIdEnvError(VarError, String),
/// Semantic errors in instructions.
InstructionError(String),
/// An error is occurred while calling local service via call_service.
LocalServiceError(String),
/// Value for such name isn't presence in data.
VariableNotFound(String),
/// Multiple values for such name found.
MultipleVariablesFound(String),
/// Value with such path wasn't found in data with such error.
VariableNotInJsonPath(String, JsonPathError),
/// Value for such name isn't presence in data.
IncompatibleJValueType(JValue, String),
/// Multiple values found for such json path.
MultipleValuesInJsonPath(String),
/// Fold state wasn't found for such iterator name.
FoldStateNotFound(String),
/// Multiple fold states found for such iterator name.
MultipleFoldStates(String),
}
impl Error for AquamarineError {}
impl std::fmt::Display for AquamarineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
AquamarineError::SExprParseError(err) => {
write!(f, "aqua script can't be parsed: {:?}", err)
}
AquamarineError::DataSerdeError(err) => write!(
f,
"an error occurred while serializing/deserializing data: {:?}",
err
),
AquamarineError::FuncArgsSerdeError(args, err) => write!(
f,
"function arguments {} can't be serialized or deserialized with an error: {:?}",
args, err
),
AquamarineError::CallServiceSerdeError(result, err) => write!(
f,
"call_service result \"{:?}\" can't be serialized or deserialized with an error: {:?}",
result, err
),
AquamarineError::CurrentPeerIdEnvError(err, env_name) => write!(
f,
"the environment variable \"{}\" can't be obtained: {:?}",
env_name,
err
),
AquamarineError::InstructionError(err_msg) => write!(f, "{}", err_msg),
AquamarineError::LocalServiceError(err_msg) => write!(f, "{}", err_msg),
AquamarineError::VariableNotFound(variable_name) => write!(
f,
"variable with name {} isn't present in data",
variable_name
),
AquamarineError::MultipleVariablesFound(variable_name) => write!(
f,
"multiple variables found for name {} in data",
variable_name
),
AquamarineError::VariableNotInJsonPath(json_path, json_path_err) => write!(
f,
"variable with path {} not found with error: {:?}",
json_path, json_path_err
),
AquamarineError::IncompatibleJValueType(avalue, desired_type) => write!(
f,
"got avalue \"{:?}\", but {} type is needed",
avalue,
desired_type,
),
AquamarineError::MultipleValuesInJsonPath(json_path) => write!(
f,
"multiple variables found for this json path {}",
json_path
),
AquamarineError::FoldStateNotFound(iterator) => write!(
f,
"fold state not found for this iterable {}",
iterator
),
AquamarineError::MultipleFoldStates(iterator) => write!(
f,
"multiple fold states found for iterable {}",
iterator
),
}
}
}
impl From<SExprError> for AquamarineError {
fn from(err: SExprError) -> Self {
AquamarineError::SExprParseError(err)
}
}
impl From<std::convert::Infallible> for AquamarineError {
fn from(_: std::convert::Infallible) -> Self {
unreachable!()
}
}
impl Into<StepperOutcome> for AquamarineError {
fn into(self) -> StepperOutcome {
let ret_code = match self {
AquamarineError::SExprParseError(_) => 1,
AquamarineError::DataSerdeError(..) => 2,
AquamarineError::FuncArgsSerdeError(..) => 3,
AquamarineError::CallServiceSerdeError(..) => 4,
AquamarineError::CurrentPeerIdEnvError(..) => 5,
AquamarineError::InstructionError(..) => 6,
AquamarineError::LocalServiceError(..) => 7,
AquamarineError::VariableNotFound(..) => 8,
AquamarineError::MultipleVariablesFound(..) => 9,
AquamarineError::VariableNotInJsonPath(..) => 10,
AquamarineError::IncompatibleJValueType(..) => 11,
AquamarineError::MultipleValuesInJsonPath(..) => 12,
AquamarineError::FoldStateNotFound(..) => 13,
AquamarineError::MultipleFoldStates(..) => 14,
};
StepperOutcome {
ret_code,
data: format!("{}", self),
next_peer_pks: vec![],
}
}
}

120
stepper/src/execution.rs Normal file
View File

@ -0,0 +1,120 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::StepperOutcome;
use crate::air::ExecutableInstruction;
use crate::air::ExecutionContext;
use crate::air::Instruction;
use crate::AquaData;
use crate::AquamarineError;
use crate::Result;
const CURRENT_PEER_ID_ENV_NAME: &str = "CURRENT_PEER_ID";
pub(crate) fn execute_aqua(init_user_id: String, aqua: String, data: String) -> StepperOutcome {
log::info!(
"stepper invoked with user_id = {}, aqua = {:?}, data = {:?}",
init_user_id,
aqua,
data
);
execute_aqua_impl(init_user_id, aqua, data).unwrap_or_else(Into::into)
}
fn execute_aqua_impl(_init_user_id: String, aqua: String, data: String) -> Result<StepperOutcome> {
let parsed_data: AquaData =
serde_json::from_str(&data).map_err(AquamarineError::DataSerdeError)?;
let formatted_aqua = format_aqua(aqua);
let parsed_aqua = serde_sexpr::from_str::<Instruction>(&formatted_aqua)?;
log::info!(
"\nparsed_aqua: {:?}\nparsed_data: {:?}",
parsed_aqua,
parsed_data
);
let current_peer_id = std::env::var(CURRENT_PEER_ID_ENV_NAME).map_err(|e| {
AquamarineError::CurrentPeerIdEnvError(e, String::from(CURRENT_PEER_ID_ENV_NAME))
})?;
let mut execution_ctx = ExecutionContext::new(parsed_data, current_peer_id);
parsed_aqua.execute(&mut execution_ctx)?;
let data =
serde_json::to_string(&execution_ctx.data).map_err(AquamarineError::DataSerdeError)?;
Ok(StepperOutcome {
ret_code: 0,
data,
next_peer_pks: execution_ctx.next_peer_pks,
})
}
/// Formats aqua script in a form of S-expressions to a form compatible with the serde_sexpr crate.
fn format_aqua(aqua: String) -> String {
use std::iter::FromIterator;
let mut formatted_aqua = Vec::with_capacity(aqua.len());
// whether to skip the next whitespace
let mut skip_next_whitespace = false;
// whether c was a closing brace
let mut was_cbr = false;
for c in aqua.chars() {
let is_whitespace = c == ' ';
if (skip_next_whitespace && is_whitespace) || c == '\n' {
continue;
}
let is_cbr = c == ')';
skip_next_whitespace = is_whitespace || c == '(' || is_cbr;
if was_cbr && !is_cbr {
formatted_aqua.push(' ');
}
was_cbr = is_cbr;
formatted_aqua.push(c)
}
String::from_iter(formatted_aqua.into_iter())
}
#[cfg(test)]
mod tests {
#[test]
fn format_aqua_test() {
let aqua = format!(
r#"(( (( (seq (
(call (%current_peer_id% (add_module ||) (module) module))
(seq (
(call (%current_peer_id% (add_blueprint ||) (blueprint) blueprint_id))
(seq (
(call (%current_peer_id% (create ||) (blueprint_id) service_id))
(call ({} (|| ||) (service_id) client_result))
) )
) )
))"#,
"abc"
);
let aqua = super::format_aqua(aqua);
let formatted_aqua = String::from("(((((seq ((call (%current_peer_id% (add_module ||) (module) module)) (seq ((call (%current_peer_id% (add_blueprint ||) (blueprint) blueprint_id)) (seq ((call (%current_peer_id% (create ||) (blueprint_id) service_id)) (call (abc (|| ||) (service_id) client_result))))))))");
assert_eq!(aqua, formatted_aqua);
}
}

View File

@ -14,15 +14,26 @@
* limitations under the License. * limitations under the License.
*/ */
#![warn(rust_2018_idioms)]
#![deny(
dead_code,
nonstandard_style,
unused_imports,
unused_mut,
unused_variables,
unused_unsafe,
unreachable_patterns
)]
mod air; mod air;
mod defines; mod defines;
mod errors; mod errors;
mod instructions; mod execution;
mod stepper; mod stepper_outcome;
pub(crate) use crate::defines::*; pub(crate) use crate::defines::*;
use crate::stepper::execute_aqua; use crate::execution::execute_aqua;
use fluence::fce; use fluence::fce;
pub fn main() { pub fn main() {
@ -35,7 +46,7 @@ pub fn invoke(init_user_id: String, aqua: String, data: String) -> StepperOutcom
} }
#[fce] #[fce]
#[link(wasm_import_module = "aqua_test_module")] #[link(wasm_import_module = "host")]
extern "C" { extern "C" {
pub fn call_service(service_id: String, fn_name: String, args: String) -> CallServiceResult; pub fn call_service(service_id: String, fn_name: String, args: String) -> CallServiceResult;
} }

View File

@ -17,10 +17,8 @@
use fluence::fce; use fluence::fce;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub const SUCCESS_ERROR_CODE: i32 = 0;
#[fce] #[fce]
#[derive(Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StepperOutcome { pub struct StepperOutcome {
/// A return code, where SUCCESS_ERROR_CODE means success. /// A return code, where SUCCESS_ERROR_CODE means success.
pub ret_code: i32, pub ret_code: i32,

View File

@ -17,12 +17,12 @@
mod air; mod air;
mod defines; mod defines;
mod errors; mod errors;
mod instructions; mod execution;
mod stepper; mod stepper_outcome;
pub(crate) use crate::defines::*; pub(crate) use crate::defines::*;
use crate::stepper::execute_aqua; use crate::execution::execute_aqua;
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;

162
stepper/tests/basic.rs Normal file
View File

@ -0,0 +1,162 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::unit_call_service;
use aquamarine_vm::vec1::Vec1;
use aquamarine_vm::HostExportedFunc;
use aquamarine_vm::IValue;
use aquamarine_vm::StepperOutcome;
use serde_json::json;
type JValue = serde_json::Value;
#[test]
fn seq_par_call() {
let mut vm = create_aqua_vm(unit_call_service());
let script = String::from(
r#"
(seq (
(par (
(call (%current_peer_id% (local_service_id local_fn_name) () result_1))
(call (remote_peer_id (service_id fn_name) () g))
))
(call (%current_peer_id% (local_service_id local_fn_name) () result_2))
))"#,
);
let res = vm
.call(json!([String::from("asd"), script, String::from("{}"),]))
.expect("should be successful");
let right_outcome = StepperOutcome {
data: String::from("{\"result_1\":\"test\"}"),
next_peer_pks: vec![String::from("remote_peer_id")],
};
assert_eq!(res, right_outcome);
}
#[test]
fn par_par_call() {
let mut vm = create_aqua_vm(unit_call_service());
let script = String::from(
r#"
(par (
(par (
(call (%current_peer_id% (local_service_id local_fn_name) () result_1))
(call (remote_peer_id (service_id fn_name) () g))
))
(call (%current_peer_id% (local_service_id local_fn_name) () result_2))
))"#,
);
let res = vm
.call(json!([String::from("asd"), script, String::from("{}"),]))
.expect("should be successful");
let resulted_json: JValue =
serde_json::from_str(&res.data).expect("stepper should return valid json");
let right_json = json!( {"result_1" : "test", "result_2" : "test"} );
assert_eq!(resulted_json, right_json);
assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id")]);
}
#[test]
fn create_service() {
let module = "greeting";
let config = json!(
{
"name": module,
"mem_pages_count": 100,
"logger_enabled": true,
"wasi": {
"envs": json!({}),
"preopened_files": vec!["/tmp"],
"mapped_dirs": json!({}),
}
}
);
let mut data_value = json!({
"module_bytes": vec![1,2],
"module_config": config,
"blueprint": { "name": "blueprint", "dependencies": [module] },
});
let data = data_value.to_string();
let script = String::from(
r#"(seq (
(call (%current_peer_id% (add_module ||) (module_bytes module_config) module))
(seq (
(call (%current_peer_id% (add_blueprint ||) (blueprint) blueprint_id))
(seq (
(call (%current_peer_id% (create ||) (blueprint_id) service_id))
(call (remote_peer_id (|| ||) (service_id) client_result))
))
))
))"#,
);
let call_service: HostExportedFunc = Box::new(|_, args| -> Option<IValue> {
let builtin_service = match &args[0] {
IValue::String(str) => str,
_ => unreachable!(),
};
let response = match builtin_service.as_str() {
"add_module" => String::from("add_module response"),
"add_blueprint" => String::from("add_blueprint response"),
"create" => String::from("create response"),
_ => String::from("unknown response"),
};
Some(IValue::Record(
Vec1::new(vec![
IValue::S32(0),
IValue::String(format!("\"{}\"", response)),
])
.unwrap(),
))
});
let mut vm = create_aqua_vm(call_service);
let res = vm
.call(json!([String::from("init_user_pk"), script, data,]))
.expect("should be successful");
let resulted_data: JValue = serde_json::from_str(&res.data).expect("should be correct json");
data_value.as_object_mut().unwrap().insert(
String::from("module"),
JValue::String(String::from("add_module response")),
);
data_value.as_object_mut().unwrap().insert(
String::from("blueprint_id"),
JValue::String(String::from("add_blueprint response")),
);
data_value.as_object_mut().unwrap().insert(
String::from("service_id"),
JValue::String(String::from("create response")),
);
assert_eq!(resulted_data, data_value);
assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id")]);
}

View File

@ -1,22 +0,0 @@
use fluence::fce;
fn main() {}
#[fce]
pub struct CallServiceResult {
pub ret_code: i32,
pub result: String,
}
#[fce]
pub fn call_service(service_id: String, fn_name: String, args: String) -> CallServiceResult {
println!(
"call service invoked with:\n service_id: {}\n fn_name: {}\n args: {:?}",
service_id, fn_name, args
);
CallServiceResult {
ret_code: 0,
result: String::from("[\"result string\"]"),
}
}