Call evidence (#9)

This commit is contained in:
vms 2020-10-15 17:31:56 +03:00 committed by GitHub
parent bdc10fc4e6
commit 4038d098e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1259 additions and 244 deletions

49
Cargo.lock generated
View File

@ -2,9 +2,9 @@
# It is not intended for manual editing.
[[package]]
name = "aho-corasick"
version = "0.7.13"
version = "0.7.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86"
checksum = "b476ce7103678b0c6d3d395dbbae31d48ff910bd28be979ba5d48c6351131d0d"
dependencies = [
"memchr",
]
@ -41,6 +41,7 @@ dependencies = [
"fluence",
"jsonpath_lib",
"log",
"once_cell",
"serde",
"serde_derive",
"serde_json",
@ -51,7 +52,7 @@ dependencies = [
[[package]]
name = "aquamarine-vm"
version = "0.1.0"
source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d"
source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed"
dependencies = [
"fluence-faas",
"serde",
@ -144,9 +145,9 @@ checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
[[package]]
name = "cc"
version = "1.0.60"
version = "1.0.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef611cc68ff783f18535d77ddd080185275713d852c4f5cbb6122c462a7a825c"
checksum = "ed67cbde08356238e75fc4656be4749481eeffb09e19f320a25237d5221c985d"
[[package]]
name = "cfg-if"
@ -371,8 +372,8 @@ dependencies = [
[[package]]
name = "fce"
version = "0.1.7"
source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d"
version = "0.1.8"
source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed"
dependencies = [
"boolinator",
"fce-wit-interfaces",
@ -392,7 +393,7 @@ dependencies = [
[[package]]
name = "fce-wit-interfaces"
version = "0.1.5"
source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d"
source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed"
dependencies = [
"multimap",
"wasmer-interface-types-fl",
@ -401,7 +402,7 @@ dependencies = [
[[package]]
name = "fce-wit-parser"
version = "0.1.7"
source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d"
source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed"
dependencies = [
"anyhow",
"fce-wit-interfaces",
@ -421,8 +422,8 @@ dependencies = [
[[package]]
name = "fluence-faas"
version = "0.1.8"
source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d"
version = "0.1.9"
source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed"
dependencies = [
"cmd_lib",
"fce",
@ -817,6 +818,12 @@ dependencies = [
"libc",
]
[[package]]
name = "once_cell"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad"
[[package]]
name = "page_size"
version = "0.4.2"
@ -983,9 +990,9 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "regex"
version = "1.3.9"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6"
checksum = "8963b85b8ce3074fecffde43b4b0dded83ce2f367dc8d363afc56679f3ee820b"
dependencies = [
"aho-corasick",
"memchr",
@ -995,9 +1002,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.6.18"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"
checksum = "8cab7a364d15cde1e505267766a2d3c4e22a843e1a601f0fa7564c0f82ced11c"
[[package]]
name = "rustc_version"
@ -1082,9 +1089,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.58"
version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a230ea9107ca2220eea9d46de97eddcb04cd00e92d13dda78e478dd33fa82bd4"
checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95"
dependencies = [
"indexmap",
"itoa",
@ -1122,9 +1129,9 @@ checksum = "343f3f510c2915908f155e94f17220b19ccfacf2a64a2a5d8004f2c3e311e7fd"
[[package]]
name = "syn"
version = "1.0.42"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c51d92969d209b54a98397e1b91c8ae82d8c87a7bb87df0b29aa2ad81454228"
checksum = "e03e57e4fcbfe7749842d53e24ccb9aa12b7252dbe5e91d2acad31834c8b8fdd"
dependencies = [
"proc-macro2",
"quote",
@ -1188,9 +1195,9 @@ dependencies = [
[[package]]
name = "toml"
version = "0.5.6"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a"
checksum = "75cf45bb0bef80604d001caaec0d09da99611b3c0fd39d3080468875cdb65645"
dependencies = [
"serde",
]

View File

@ -35,7 +35,10 @@ use aquamarine_vm::IValue;
use std::path::PathBuf;
pub fn create_aqua_vm(call_service: HostExportedFunc) -> AquamarineVM {
pub fn create_aqua_vm(
call_service: HostExportedFunc,
current_peer_id: impl Into<String>,
) -> AquamarineVM {
let call_service_descriptor = HostImportDescriptor {
host_exported_func: call_service,
argument_types: vec![IType::String, IType::String, IType::String],
@ -46,7 +49,7 @@ pub fn create_aqua_vm(call_service: HostExportedFunc) -> AquamarineVM {
let config = AquamarineVMConfig {
aquamarine_wasm_path: PathBuf::from("../target/wasm32-wasi/debug/aquamarine.wasm"),
call_service: call_service_descriptor,
current_peer_id: String::from("test_peer_id"),
current_peer_id: current_peer_id.into(),
};
AquamarineVM::new(config).expect("vm should be created")

View File

@ -23,6 +23,7 @@ serde_sexpr = "0.1.0"
jsonpath_lib = "0.2.5"
log = "0.4.11"
once_cell = "1.4.1"
serde_json = "1.0"
wasm-bindgen = "0.2.68"

View File

@ -14,7 +14,10 @@
* limitations under the License.
*/
use super::ExecutionContext;
use super::CallEvidenceCtx;
use super::CallResult;
use super::EvidenceState;
use super::ExecutionCtx;
use crate::AquamarineError;
use crate::JValue;
use crate::Result;
@ -52,70 +55,115 @@ pub enum FunctionPart {
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub(crate) struct Call(PeerPart, FunctionPart, Vec<String>, String);
#[derive(Debug, PartialEq, Eq)]
struct ParsedCall {
peer_pk: String,
service_id: String,
function_name: String,
function_arg_paths: Vec<String>,
result_variable_name: String,
}
impl super::ExecutableInstruction for Call {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("call {:?} is called with context {:?}", self, ctx);
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!(
"call {:?} is called with contexts: {:?} {:?}",
self,
exec_ctx,
call_ctx
);
let (peer_pk, service_id, func_name) = self.prepare_peer_fn_parts(ctx)?;
let function_args = self.extract_args_by_paths(ctx)?;
let function_args = serde_json::to_string(&function_args)
.map_err(|e| AquamarineError::FuncArgsSerdeError(function_args, e))?;
if peer_pk == ctx.current_peer_id || peer_pk == CURRENT_PEER_ALIAS {
let result = unsafe {
crate::call_service(service_id.to_string(), func_name.to_string(), function_args)
};
if result.ret_code != crate::CALL_SERVICE_SUCCESS {
return Err(AquamarineError::LocalServiceError(result.result));
}
let result: JValue = serde_json::from_str(&result.result)
.map_err(|e| AquamarineError::CallServiceSerdeError(result, e))?;
self.set_result(ctx, result)?;
} else {
let peer_pk = peer_pk.to_string();
ctx.next_peer_pks.push(peer_pk);
}
Ok(())
let parsed_call = ParsedCall::new(self, exec_ctx)?;
parsed_call.execute(exec_ctx, call_ctx)
}
}
impl Call {
#[rustfmt::skip]
fn prepare_peer_fn_parts<'a>(&'a self, ctx: &'a ExecutionContext) -> Result<(&'a str, &'a str, &'a str)> {
let (peer_pk, service_id, func_name) = match (&self.0, &self.1) {
(PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::ServiceIdWithFuncName(_service_id, func_name)) => {
Ok((peer_pk, peer_service_id, func_name))
},
(PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::FuncName(func_name)) => {
Ok((peer_pk, peer_service_id, func_name))
},
(PeerPart::PeerPk(peer_pk), FunctionPart::ServiceIdWithFuncName(service_id, func_name)) => {
Ok((peer_pk, service_id, func_name))
impl ParsedCall {
pub fn new(call: &Call, exec_ctx: &ExecutionCtx) -> Result<Self> {
let (peer_pk, service_id, func_name) = Self::prepare_peer_fn_parts(call, exec_ctx)?;
let result_variable_name = Self::parse_result_variable_name(call)?;
Ok(Self {
peer_pk: peer_pk.to_string(),
service_id: service_id.to_string(),
function_name: func_name.to_string(),
function_arg_paths: call.2.clone(),
result_variable_name: result_variable_name.to_string(),
})
}
pub fn execute(
self,
exec_ctx: &mut ExecutionCtx,
call_ctx: &mut CallEvidenceCtx,
) -> Result<()> {
let should_executed = self.prepare_evidence_state(call_ctx, &exec_ctx.current_peer_id)?;
if !should_executed {
return Ok(());
}
if self.peer_pk != exec_ctx.current_peer_id && self.peer_pk != CURRENT_PEER_ALIAS {
set_remote_call_result(self.peer_pk, exec_ctx, call_ctx);
return Ok(());
}
let function_args = self.extract_args_by_paths(exec_ctx)?;
let function_args = serde_json::to_string(&function_args)
.map_err(|e| AquamarineError::FuncArgsSerializationError(function_args, e))?;
let result =
unsafe { crate::call_service(self.service_id, self.function_name, function_args) };
if result.ret_code != crate::CALL_SERVICE_SUCCESS {
return Err(AquamarineError::LocalServiceError(result.result));
}
let result: JValue = serde_json::from_str(&result.result)
.map_err(|e| AquamarineError::CallServiceResultDeserializationError(result, e))?;
set_local_call_result(self.result_variable_name, exec_ctx, call_ctx, result)
}
fn prepare_peer_fn_parts<'a>(
raw_call: &'a Call,
exec_ctx: &'a ExecutionCtx,
) -> Result<(&'a str, &'a str, &'a str)> {
let (peer_pk, service_id, func_name) = match (&raw_call.0, &raw_call.1) {
(
PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id),
FunctionPart::ServiceIdWithFuncName(_service_id, func_name),
) => Ok((peer_pk, peer_service_id, func_name)),
(
PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id),
FunctionPart::FuncName(func_name),
) => Ok((peer_pk, peer_service_id, func_name)),
(
PeerPart::PeerPk(peer_pk),
FunctionPart::ServiceIdWithFuncName(service_id, func_name),
) => Ok((peer_pk, service_id, func_name)),
(PeerPart::PeerPk(_), FunctionPart::FuncName(_)) => {
Err(AquamarineError::InstructionError(String::from(
"call should have service id specified by peer part or function part",
)))
}
(PeerPart::PeerPk(_), FunctionPart::FuncName(_)) => Err(AquamarineError::InstructionError(
String::from("call should have service id specified by peer part or function part"),
)),
}?;
let peer_pk = if peer_pk != CURRENT_PEER_ALIAS {
Self::prepare_call_arg(peer_pk, ctx)?
Self::prepare_call_arg(peer_pk, exec_ctx)?
} else {
peer_pk
};
let service_id = Self::prepare_call_arg(service_id, ctx)?;
let func_name = Self::prepare_call_arg(func_name, ctx)?;
let service_id = Self::prepare_call_arg(service_id, exec_ctx)?;
let func_name = Self::prepare_call_arg(func_name, exec_ctx)?;
Ok((peer_pk, service_id, func_name))
}
fn extract_args_by_paths(&self, ctx: &ExecutionContext) -> Result<JValue> {
let mut result = Vec::with_capacity(self.2.len());
fn extract_args_by_paths(&self, ctx: &ExecutionCtx) -> Result<JValue> {
let mut result = Vec::with_capacity(self.function_arg_paths.len());
for arg_path in self.2.iter() {
for arg_path in self.function_arg_paths.iter() {
if is_string_literal(arg_path) {
result.push(JValue::String(arg_path[1..arg_path.len() - 1].to_string()));
} else {
@ -127,8 +175,8 @@ impl Call {
Ok(JValue::Array(result))
}
fn parse_result_variable_name(&self) -> Result<&str> {
let result_variable_name = &self.3;
fn parse_result_variable_name(call: &Call) -> Result<&str> {
let result_variable_name = &call.3;
if result_variable_name.is_empty() {
return Err(AquamarineError::InstructionError(String::from(
@ -136,6 +184,12 @@ impl Call {
)));
}
if super::RESERVED_KEYWORDS.contains(result_variable_name.as_str()) {
return Err(AquamarineError::ReservedKeywordError(
result_variable_name.to_string(),
));
}
if is_string_literal(result_variable_name) {
return Err(AquamarineError::InstructionError(String::from(
"result name of a call instruction must be non string literal",
@ -147,7 +201,7 @@ impl Call {
fn get_args_by_path<'args_path, 'ctx>(
args_path: &'args_path str,
ctx: &'ctx ExecutionContext,
ctx: &'ctx ExecutionCtx,
) -> Result<Vec<&'ctx JValue>> {
let mut split_arg: Vec<&str> = args_path.splitn(2, '.').collect();
let arg_path_head = split_arg.remove(0);
@ -189,7 +243,11 @@ impl Call {
Ok(values)
}
fn prepare_call_arg<'a>(arg_path: &'a str, ctx: &'a ExecutionContext) -> Result<&'a str> {
fn prepare_call_arg<'a>(arg_path: &'a str, ctx: &'a ExecutionCtx) -> Result<&'a str> {
if super::RESERVED_KEYWORDS.contains(arg_path) {
return Err(AquamarineError::ReservedKeywordError(arg_path.to_string()));
}
if is_string_literal(arg_path) {
return Ok(&arg_path[1..arg_path.len() - 1]);
}
@ -214,49 +272,118 @@ impl Call {
}
}
fn set_result(&self, ctx: &mut ExecutionContext, result: JValue) -> Result<()> {
use std::collections::hash_map::Entry;
let result_variable_name = self.parse_result_variable_name()?;
let is_array = result_variable_name.ends_with("[]");
if !is_array {
// if result is not an array, simply insert it into data
if ctx
.data
.insert(result_variable_name.to_string(), result)
.is_some()
{
return Err(AquamarineError::MultipleVariablesFound(
result_variable_name.to_string(),
));
}
return Ok(());
fn prepare_evidence_state(
&self,
call_ctx: &mut CallEvidenceCtx,
current_peer_id: &str,
) -> Result<bool> {
if call_ctx.unused_subtree_elements_count == 0 {
log::info!("call evidence: previous state wasn't found");
return Ok(true);
}
// if result is an array, insert result to the end of the array
match ctx
.data
// unwrap is safe because it's been checked for []
.entry(result_variable_name.strip_suffix("[]").unwrap().to_string())
{
Entry::Occupied(mut entry) => match entry.get_mut() {
JValue::Array(values) => values.push(result),
v => {
return Err(AquamarineError::IncompatibleJValueType(
v.clone(),
String::from("Array"),
))
call_ctx.unused_subtree_elements_count -= 1;
// unwrap is safe here, because current_states length's been checked
let prev_state = call_ctx.current_states.pop_front().unwrap();
log::info!("call evidence: previous state found {:?}", prev_state);
match &prev_state {
// this call was failed on one of the previous executions,
// here it's needed to bubble this special error up
EvidenceState::Call(CallResult::CallServiceFailed(err_msg)) => {
let err_msg = err_msg.clone();
call_ctx.new_states.push(prev_state);
Err(AquamarineError::LocalServiceError(err_msg))
}
EvidenceState::Call(CallResult::RequestSent) => {
// check whether current node can execute this call
if self.peer_pk == current_peer_id {
Ok(true)
} else {
call_ctx.new_states.push(prev_state);
Ok(false)
}
},
Entry::Vacant(entry) => {
entry.insert(JValue::Array(vec![result]));
}
// this instruction's been already executed
EvidenceState::Call(CallResult::Executed) => {
call_ctx.new_states.push(prev_state);
Ok(false)
}
// state has inconsistent order - return a error, call shouldn't be executed
par_state @ EvidenceState::Par(..) => Err(AquamarineError::InvalidEvidenceState(
par_state.clone(),
String::from("call"),
)),
}
}
}
fn set_local_call_result(
result_variable_name: String,
exec_ctx: &mut ExecutionCtx,
call_ctx: &mut CallEvidenceCtx,
result: JValue,
) -> Result<()> {
use std::collections::hash_map::Entry;
let new_evidence_state = EvidenceState::Call(CallResult::Executed);
let is_array = result_variable_name.ends_with("[]");
if !is_array {
// if result is not an array, simply insert it into data
if exec_ctx
.data
.insert(result_variable_name.clone(), result)
.is_some()
{
return Err(AquamarineError::MultipleVariablesFound(
result_variable_name,
));
}
Ok(())
log::info!("call evidence: adding new state {:?}", new_evidence_state);
call_ctx.new_states.push(new_evidence_state);
return Ok(());
}
// if result is an array, insert result to the end of the array
match exec_ctx
.data
// unwrap is safe because it's been checked for []
.entry(result_variable_name.strip_suffix("[]").unwrap().to_string())
{
Entry::Occupied(mut entry) => match entry.get_mut() {
JValue::Array(values) => values.push(result),
v => {
return Err(AquamarineError::IncompatibleJValueType(
v.clone(),
String::from("Array"),
))
}
},
Entry::Vacant(entry) => {
entry.insert(JValue::Array(vec![result]));
}
}
log::info!("call evidence: adding new state {:?}", new_evidence_state);
call_ctx.new_states.push(new_evidence_state);
Ok(())
}
fn set_remote_call_result(
peer_pk: String,
exec_ctx: &mut ExecutionCtx,
call_ctx: &mut CallEvidenceCtx,
) {
exec_ctx.next_peer_pks.push(peer_pk);
let new_evidence_state = EvidenceState::Call(CallResult::RequestSent);
log::info!("call evidence: adding new state {:?}", new_evidence_state);
call_ctx.new_states.push(new_evidence_state);
}
fn is_string_literal(value: &str) -> bool {
@ -277,7 +404,7 @@ mod tests {
#[test]
fn current_peer_id_call() {
let mut vm = create_aqua_vm(echo_string_call_service());
let mut vm = create_aqua_vm(echo_string_call_service(), "test_peer_id");
let script = String::from(
r#"
@ -318,7 +445,7 @@ mod tests {
#[test]
fn remote_peer_id_call() {
let mut vm = create_aqua_vm(echo_string_call_service());
let mut vm = create_aqua_vm(echo_string_call_service(), "");
let remote_peer_id = String::from("some_remote_peer_id");
let script = format!(
@ -339,7 +466,7 @@ mod tests {
#[test]
fn variables() {
let mut vm = create_aqua_vm(echo_string_call_service());
let mut vm = create_aqua_vm(echo_string_call_service(), "");
let script = format!(
r#"(call (remote_peer_id ("some_service_id" "local_fn_name") ("param") result_name))"#,
@ -369,7 +496,7 @@ mod tests {
))
});
let mut vm = create_aqua_vm(call_service);
let mut vm = create_aqua_vm(call_service, "");
let script = format!(
r#"(call (%current_peer_id% ("some_service_id" "local_fn_name") ("arg1" "arg2" arg3) result_name))"#,

View File

@ -0,0 +1,39 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::fold::FoldState;
use crate::AquaData;
use std::collections::HashMap;
#[derive(Clone, Default, Debug)]
pub(crate) struct ExecutionCtx {
pub data: AquaData,
pub next_peer_pks: Vec<String>,
pub current_peer_id: String,
pub folds: HashMap<String, FoldState>,
}
impl ExecutionCtx {
pub(crate) fn new(data: AquaData, current_peer_id: String) -> Self {
Self {
data,
next_peer_pks: vec![],
current_peer_id,
folds: HashMap::new(),
}
}
}

View File

@ -14,7 +14,8 @@
* limitations under the License.
*/
use super::ExecutionContext;
use super::CallEvidenceCtx;
use super::ExecutionCtx;
use super::Instruction;
use crate::AquamarineError;
use crate::JValue;
@ -47,15 +48,21 @@ pub(crate) struct FoldState {
}
impl super::ExecutableInstruction for Fold {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("fold {:?} is called with context {:?}", self, ctx);
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!(
"fold {} {} is called with contexts {:?} {:?}",
self.0,
self.1,
exec_ctx,
call_ctx
);
let iterable_name = &self.0;
let iterator_name = &self.1;
let instr_head = self.2.clone();
// check that value exists and has array type
match ctx.data.get(iterable_name) {
match exec_ctx.data.get(iterable_name) {
Some(JValue::Array(_)) => {}
Some(v) => {
return Err(AquamarineError::IncompatibleJValueType(
@ -76,7 +83,7 @@ impl super::ExecutableInstruction for Fold {
instr_head: instr_head.clone(),
};
if ctx
if exec_ctx
.folds
.insert(iterator_name.clone(), fold_state)
.is_some()
@ -84,23 +91,28 @@ impl super::ExecutableInstruction for Fold {
return Err(AquamarineError::MultipleFoldStates(iterable_name.clone()));
}
instr_head.execute(ctx)?;
ctx.folds.remove(iterator_name);
instr_head.execute(exec_ctx, call_ctx)?;
exec_ctx.folds.remove(iterator_name);
Ok(())
}
}
impl super::ExecutableInstruction for Next {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("next {:?} is called with context {:?}", self, ctx);
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!(
"next {:?} is called with contexts {:?} {:?}",
self,
exec_ctx,
call_ctx
);
let iterator_name = &self.0;
let fold_state = ctx
let fold_state = exec_ctx
.folds
.get_mut(iterator_name)
.ok_or_else(|| AquamarineError::FoldStateNotFound(iterator_name.clone()))?;
let value = ctx
let value = exec_ctx
.data
.get(&fold_state.iterable_name)
.expect("this has been checked on the fold instruction");
@ -117,10 +129,10 @@ impl super::ExecutableInstruction for Next {
}
let next_instr = fold_state.instr_head.clone();
next_instr.execute(ctx)?;
next_instr.execute(exec_ctx, call_ctx)?;
// get the same fold state again because of borrow checker
match ctx.folds.get_mut(iterator_name) {
match exec_ctx.folds.get_mut(iterator_name) {
Some(fold_state) => fold_state.cursor -= 1,
_ => unreachable!("iterator value shouldn't changed inside fold"),
};
@ -142,7 +154,7 @@ mod tests {
#[test]
fn lfold() {
let mut vm = create_aqua_vm(echo_number_call_service());
let mut vm = create_aqua_vm(echo_number_call_service(), "");
let lfold = String::from(
r#"
@ -169,7 +181,7 @@ mod tests {
#[test]
fn rfold() {
let mut vm = create_aqua_vm(echo_number_call_service());
let mut vm = create_aqua_vm(echo_number_call_service(), "");
let rfold = String::from(
r#"
@ -196,7 +208,7 @@ mod tests {
#[test]
fn inner_fold() {
let mut vm = create_aqua_vm(echo_number_call_service());
let mut vm = create_aqua_vm(echo_number_call_service(), "");
let script = String::from(
r#"
@ -231,7 +243,7 @@ mod tests {
#[test]
fn inner_fold_with_same_iterator() {
let mut vm = create_aqua_vm(echo_number_call_service());
let mut vm = create_aqua_vm(echo_number_call_service(), "");
let script = String::from(
r#"

View File

@ -15,17 +15,22 @@
*/
mod call;
mod execution_context;
mod fold;
mod null;
mod par;
mod seq;
mod xor;
use crate::AquaData;
pub(crate) use execution_context::ExecutionCtx;
pub(self) use crate::call_evidence::CallEvidenceCtx;
pub(self) use crate::call_evidence::CallResult;
pub(self) use crate::call_evidence::EvidenceState;
use crate::Result;
use call::Call;
use fold::Fold;
use fold::FoldState;
use fold::Next;
use null::Null;
use par::Par;
@ -34,26 +39,15 @@ use xor::Xor;
use serde_derive::Deserialize;
use serde_derive::Serialize;
use std::collections::HashMap;
#[derive(Clone, Default, Debug)]
pub(super) struct ExecutionContext {
pub data: AquaData,
pub next_peer_pks: Vec<String>,
pub current_peer_id: String,
pub folds: HashMap<String, FoldState>,
}
use once_cell::sync::Lazy;
use std::collections::HashSet;
impl ExecutionContext {
pub(super) fn new(data: AquaData, current_peer_id: String) -> Self {
Self {
data,
next_peer_pks: vec![],
current_peer_id,
folds: HashMap::new(),
}
}
}
pub(self) static RESERVED_KEYWORDS: Lazy<HashSet<&str>> = Lazy::new(|| {
let mut set = HashSet::new();
set.insert("__call");
set
});
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
@ -68,19 +62,19 @@ pub(crate) enum Instruction {
}
pub(crate) trait ExecutableInstruction {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()>;
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()>;
}
impl ExecutableInstruction for Instruction {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
match self {
Instruction::Null(null) => null.execute(ctx),
Instruction::Call(call) => call.execute(ctx),
Instruction::Fold(fold) => fold.execute(ctx),
Instruction::Next(next) => next.execute(ctx),
Instruction::Par(par) => par.execute(ctx),
Instruction::Seq(seq) => seq.execute(ctx),
Instruction::Xor(xor) => xor.execute(ctx),
Instruction::Null(null) => null.execute(exec_ctx, call_ctx),
Instruction::Call(call) => call.execute(exec_ctx, call_ctx),
Instruction::Fold(fold) => fold.execute(exec_ctx, call_ctx),
Instruction::Next(next) => next.execute(exec_ctx, call_ctx),
Instruction::Par(par) => par.execute(exec_ctx, call_ctx),
Instruction::Seq(seq) => seq.execute(exec_ctx, call_ctx),
Instruction::Xor(xor) => xor.execute(exec_ctx, call_ctx),
}
}
}

View File

@ -14,7 +14,8 @@
* limitations under the License.
*/
use super::ExecutionContext;
use super::CallEvidenceCtx;
use super::ExecutionCtx;
use crate::Result;
use serde_derive::Deserialize;
@ -24,8 +25,12 @@ use serde_derive::Serialize;
pub(crate) struct Null {}
impl super::ExecutableInstruction for Null {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("null is called with context: {:?}", ctx);
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!(
"null is called with contexts: {:?} {:?}",
exec_ctx,
call_ctx
);
Ok(())
}

View File

@ -14,8 +14,12 @@
* limitations under the License.
*/
use super::ExecutionContext;
use super::CallEvidenceCtx;
use super::EvidenceState;
use super::ExecutableInstruction;
use super::ExecutionCtx;
use super::Instruction;
use crate::AquamarineError;
use crate::Result;
use serde_derive::Deserialize;
@ -24,28 +28,88 @@ use serde_derive::Serialize;
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub(crate) struct Par(Box<Instruction>, Box<Instruction>);
impl super::ExecutableInstruction for Par {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("par is called with context: {:?}", ctx);
impl ExecutableInstruction for Par {
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!("par is called with context: {:?} {:?}", exec_ctx, call_ctx);
self.0.execute(ctx)?;
self.1.execute(ctx)?;
let (left_subtree_size, right_subtree_size) = extract_subtree_sizes(call_ctx)?;
let pre_states_count = call_ctx.current_states.len();
let pre_unused_elements = call_ctx.unused_subtree_elements_count;
let pre_new_states_count = call_ctx.new_states.len();
call_ctx.new_states.push(EvidenceState::Par(0, 0));
let new_left_subtree_size =
execute_subtree(&self.0, left_subtree_size, exec_ctx, call_ctx)?;
let new_right_subtree_size =
execute_subtree(&self.1, right_subtree_size, exec_ctx, call_ctx)?;
let new_par_evidence_state =
EvidenceState::Par(new_left_subtree_size, new_right_subtree_size);
log::info!(
"call evidence: adding new state {:?}",
new_par_evidence_state
);
call_ctx.new_states[pre_new_states_count] = new_par_evidence_state;
let post_states_count = call_ctx.current_states.len();
call_ctx.unused_subtree_elements_count =
pre_unused_elements - (pre_states_count - post_states_count);
Ok(())
}
}
fn extract_subtree_sizes(call_ctx: &mut CallEvidenceCtx) -> Result<(usize, usize)> {
if call_ctx.unused_subtree_elements_count == 0 {
return Ok((0, 0));
}
call_ctx.unused_subtree_elements_count -= 1;
log::info!(
"call evidence: the previous state was found {:?}",
call_ctx.current_states[0]
);
// unwrap is safe here because of length's been checked
match call_ctx.current_states.pop_front().unwrap() {
EvidenceState::Par(left, right) => Ok((left, right)),
state => Err(AquamarineError::InvalidEvidenceState(
state,
String::from("par"),
)),
}
}
fn execute_subtree(
subtree: &Instruction,
subtree_size: usize,
exec_ctx: &mut ExecutionCtx,
call_ctx: &mut CallEvidenceCtx,
) -> Result<usize> {
call_ctx.unused_subtree_elements_count = subtree_size;
let before_states_count = call_ctx.new_states.len();
// execute subtree
subtree.execute(exec_ctx, call_ctx)?;
Ok(call_ctx.new_states.len() - before_states_count)
}
#[cfg(test)]
mod tests {
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::unit_call_service;
use aquamarine_vm::StepperOutcome;
use serde_json::json;
#[test]
fn par() {
let mut vm = create_aqua_vm(unit_call_service());
fn par_remote_remote() {
use std::collections::HashSet;
let mut vm = create_aqua_vm(unit_call_service(), "");
let script = String::from(
r#"
@ -55,19 +119,37 @@ mod tests {
))"#,
);
let mut res = vm
.call(json!([String::from("asd"), script, String::from("{}"),]))
.expect("call should be successful");
let peers_result: HashSet<_> = res.next_peer_pks.drain(..).collect();
let peers_right: HashSet<_> = vec![
String::from("remote_peer_id_1"),
String::from("remote_peer_id_2"),
]
.drain(..)
.collect();
assert_eq!(peers_result, peers_right);
}
#[test]
fn par_local_remote() {
let mut vm = create_aqua_vm(unit_call_service(), "");
let script = String::from(
r#"
(par (
(call (%current_peer_id% ("local_service_id" "local_fn_name") () result_name))
(call ("remote_peer_id_2" ("service_id" "fn_name") () g))
))"#,
);
let res = vm
.call(json!([String::from("asd"), script, String::from("{}"),]))
.expect("call should be successful");
assert_eq!(
res,
StepperOutcome {
data: String::from("{}"),
next_peer_pks: vec![
String::from("remote_peer_id_1"),
String::from("remote_peer_id_2")
]
}
);
assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_2")]);
}
}

View File

@ -14,7 +14,8 @@
* limitations under the License.
*/
use super::ExecutionContext;
use super::CallEvidenceCtx;
use super::ExecutionCtx;
use super::Instruction;
use crate::Result;
@ -25,14 +26,14 @@ use serde_derive::Serialize;
pub(crate) struct Seq(Box<Instruction>, Box<Instruction>);
impl super::ExecutableInstruction for Seq {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("seq is called with context: {:?}", ctx);
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!("seq is called with contexts: {:?} {:?}", exec_ctx, call_ctx);
let pks_count_before_call = ctx.next_peer_pks.len();
self.0.execute(ctx)?;
let pks_count_before_call = exec_ctx.next_peer_pks.len();
self.0.execute(exec_ctx, call_ctx)?;
if pks_count_before_call == ctx.next_peer_pks.len() {
self.1.execute(ctx)?;
if pks_count_before_call == exec_ctx.next_peer_pks.len() {
self.1.execute(exec_ctx, call_ctx)?;
}
Ok(())
@ -43,13 +44,12 @@ impl super::ExecutableInstruction for Seq {
mod tests {
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::unit_call_service;
use aquamarine_vm::StepperOutcome;
use serde_json::json;
#[test]
fn par() {
let mut vm = create_aqua_vm(unit_call_service());
fn seq_remote_remote() {
let mut vm = create_aqua_vm(unit_call_service(), "");
let script = String::from(
r#"
@ -59,16 +59,43 @@ mod tests {
))"#,
);
let res = vm
.call(json!([String::from("asd"), script, String::from("{}")]))
.expect("call should be successful");
assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_1")]);
let res = vm
.call(json!([
String::from("asd"),
script,
json!({
"__call": [{"call": "executed"}]
}
)
.to_string(),
]))
.expect("call should be successful");
assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_2")]);
}
#[test]
fn seq_local_remote() {
let mut vm = create_aqua_vm(unit_call_service(), "");
let script = String::from(
r#"
(seq (
(call (%current_peer_id% ("local_service_id" "local_fn_name") () result_name))
(call ("remote_peer_id_2" ("service_id" "fn_name") () g))
))"#,
);
let res = vm
.call(json!([String::from("asd"), script, String::from("{}"),]))
.expect("call should be successful");
assert_eq!(
res,
StepperOutcome {
data: String::from("{}"),
next_peer_pks: vec![String::from("remote_peer_id_1")]
}
);
assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_2")]);
}
}

View File

@ -14,7 +14,8 @@
* limitations under the License.
*/
use super::ExecutionContext;
use super::CallEvidenceCtx;
use super::ExecutionCtx;
use super::Instruction;
use crate::AquamarineError;
use crate::Result;
@ -26,11 +27,11 @@ use serde_derive::Serialize;
pub(crate) struct Xor(Box<Instruction>, Box<Instruction>);
impl super::ExecutableInstruction for Xor {
fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> {
log::info!("xor is called with context: {:?}", ctx);
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!("xor is called with contexts: {:?} {:?}", exec_ctx, call_ctx);
match self.0.execute(ctx) {
Err(AquamarineError::LocalServiceError(_)) => self.1.execute(ctx),
match self.0.execute(exec_ctx, call_ctx) {
Err(AquamarineError::LocalServiceError(_)) => self.1.execute(exec_ctx, call_ctx),
res => res,
}
}
@ -72,7 +73,7 @@ mod tests {
}
});
let mut vm = create_aqua_vm(call_service);
let mut vm = create_aqua_vm(call_service, "");
let script = String::from(
r#"

View File

@ -0,0 +1,40 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::EvidenceState;
use serde::Deserialize;
use serde::Serialize;
use std::collections::VecDeque;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub(crate) struct CallEvidenceCtx {
pub(crate) current_states: VecDeque<EvidenceState>,
pub(crate) unused_subtree_elements_count: usize,
pub(crate) new_states: Vec<EvidenceState>,
}
impl CallEvidenceCtx {
pub fn new(current_states: VecDeque<EvidenceState>) -> Self {
let right = current_states.len();
Self {
current_states,
unused_subtree_elements_count: right,
new_states: vec![],
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod context;
mod state;
pub(crate) use context::CallEvidenceCtx;
pub(crate) use state::CallResult;
pub(crate) use state::EvidenceState;

View File

@ -0,0 +1,38 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CallResult {
/// Request was sent to a target node and it shouldn't be called again.
RequestSent,
/// A corresponding call's been already executed.
Executed,
/// call_service ended with a service error.
CallServiceFailed(String),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum EvidenceState {
Par(usize, usize),
Call(CallResult),
}

View File

@ -14,6 +14,7 @@
* limitations under the License.
*/
use crate::call_evidence::EvidenceState;
use crate::CallServiceResult;
use crate::JValue;
use crate::StepperOutcome;
@ -31,14 +32,17 @@ pub(crate) enum AquamarineError {
/// Errors occurred while parsing aqua script in the form of S expressions.
SExprParseError(SExprError),
/// Errors occurred while parsing aqua data.
DataSerdeError(SerdeJsonError),
/// Errors occurred on aqua data deserialization.
DataDeserializationError(SerdeJsonError),
/// Errors occurred on aqua data serialization.
DataSerializationError(SerdeJsonError),
/// Errors occurred while parsing function arguments of an expression.
FuncArgsSerdeError(JValue, SerdeJsonError),
FuncArgsSerializationError(JValue, SerdeJsonError),
/// Errors occurred while parsing returned by call_service value.
CallServiceSerdeError(CallServiceResult, SerdeJsonError),
CallServiceResultDeserializationError(CallServiceResult, SerdeJsonError),
/// Indicates that environment variable with name CURRENT_PEER_ID isn't set.
CurrentPeerIdEnvError(VarError, String),
@ -69,6 +73,18 @@ pub(crate) enum AquamarineError {
/// Multiple fold states found for such iterator name.
MultipleFoldStates(String),
/// Expected evidence state of different type.
InvalidEvidenceState(EvidenceState, String),
/// Errors occurred on call evidence deserialization.
CallEvidenceDeserializationError(SerdeJsonError),
/// Errors occurred on call evidence serialization.
CallEvidenceSerializationError(SerdeJsonError),
/// Errors occurred when reserved keyword is used for variable name.
ReservedKeywordError(String),
}
impl Error for AquamarineError {}
@ -79,17 +95,22 @@ impl std::fmt::Display for AquamarineError {
AquamarineError::SExprParseError(err) => {
write!(f, "aqua script can't be parsed: {:?}", err)
}
AquamarineError::DataSerdeError(err) => write!(
AquamarineError::DataDeserializationError(err) => write!(
f,
"an error occurred while serializing/deserializing data: {:?}",
"an error occurred while data deserialization: {:?}",
err
),
AquamarineError::FuncArgsSerdeError(args, err) => write!(
AquamarineError::DataSerializationError(err) => write!(
f,
"an error occurred while data serialization: {:?}",
err
),
AquamarineError::FuncArgsSerializationError(args, err) => write!(
f,
"function arguments {} can't be serialized or deserialized with an error: {:?}",
args, err
),
AquamarineError::CallServiceSerdeError(result, err) => write!(
AquamarineError::CallServiceResultDeserializationError(result, err) => write!(
f,
"call_service result \"{:?}\" can't be serialized or deserialized with an error: {:?}",
result, err
@ -138,6 +159,26 @@ impl std::fmt::Display for AquamarineError {
"multiple fold states found for iterable {}",
iterator
),
AquamarineError::InvalidEvidenceState(found_state, expected) => write!(
f,
"invalid evidence state: expected {}, but found {:?}",
expected, found_state
),
AquamarineError::CallEvidenceDeserializationError(err) => write!(
f,
"an error occurred while data deserialization: {:?}",
err
),
AquamarineError::CallEvidenceSerializationError(err) => write!(
f,
"an error occurred while data serialization: {:?}",
err
),
AquamarineError::ReservedKeywordError(variable_name) => write!(
f,
"a variable can't be named as {} because this name is reserved",
variable_name
),
}
}
}
@ -158,19 +199,24 @@ impl Into<StepperOutcome> for AquamarineError {
fn into(self) -> StepperOutcome {
let ret_code = match self {
AquamarineError::SExprParseError(_) => 1,
AquamarineError::DataSerdeError(..) => 2,
AquamarineError::FuncArgsSerdeError(..) => 3,
AquamarineError::CallServiceSerdeError(..) => 4,
AquamarineError::CurrentPeerIdEnvError(..) => 5,
AquamarineError::InstructionError(..) => 6,
AquamarineError::LocalServiceError(..) => 7,
AquamarineError::VariableNotFound(..) => 8,
AquamarineError::MultipleVariablesFound(..) => 9,
AquamarineError::VariableNotInJsonPath(..) => 10,
AquamarineError::IncompatibleJValueType(..) => 11,
AquamarineError::MultipleValuesInJsonPath(..) => 12,
AquamarineError::FoldStateNotFound(..) => 13,
AquamarineError::MultipleFoldStates(..) => 14,
AquamarineError::DataDeserializationError(..) => 2,
AquamarineError::DataSerializationError(..) => 3,
AquamarineError::FuncArgsSerializationError(..) => 4,
AquamarineError::CallServiceResultDeserializationError(..) => 5,
AquamarineError::CurrentPeerIdEnvError(..) => 6,
AquamarineError::InstructionError(..) => 7,
AquamarineError::LocalServiceError(..) => 8,
AquamarineError::VariableNotFound(..) => 9,
AquamarineError::MultipleVariablesFound(..) => 10,
AquamarineError::VariableNotInJsonPath(..) => 11,
AquamarineError::IncompatibleJValueType(..) => 12,
AquamarineError::MultipleValuesInJsonPath(..) => 13,
AquamarineError::FoldStateNotFound(..) => 14,
AquamarineError::MultipleFoldStates(..) => 15,
AquamarineError::InvalidEvidenceState(..) => 16,
AquamarineError::CallEvidenceDeserializationError(..) => 17,
AquamarineError::CallEvidenceSerializationError(..) => 18,
AquamarineError::ReservedKeywordError(..) => 19,
};
StepperOutcome {

View File

@ -16,13 +16,18 @@
use super::StepperOutcome;
use crate::air::ExecutableInstruction;
use crate::air::ExecutionContext;
use crate::air::ExecutionCtx;
use crate::air::Instruction;
use crate::call_evidence::EvidenceState;
use crate::get_current_peer_id;
use crate::AquaData;
use crate::AquamarineError;
use crate::Result;
use crate::call_evidence::CallEvidenceCtx;
use std::collections::VecDeque;
pub(crate) fn execute_aqua(init_user_id: String, aqua: String, data: String) -> StepperOutcome {
log::info!(
"stepper invoked with user_id = {}, aqua = {:?}, data = {:?}",
@ -35,8 +40,8 @@ pub(crate) fn execute_aqua(init_user_id: String, aqua: String, data: String) ->
}
fn execute_aqua_impl(_init_user_id: String, aqua: String, data: String) -> Result<StepperOutcome> {
let parsed_data: AquaData =
serde_json::from_str(&data).map_err(AquamarineError::DataSerdeError)?;
let mut parsed_data: AquaData =
serde_json::from_str(&data).map_err(AquamarineError::DataDeserializationError)?;
let formatted_aqua = format_aqua(aqua);
let parsed_aqua = serde_sexpr::from_str::<Instruction>(&formatted_aqua)?;
@ -49,16 +54,31 @@ fn execute_aqua_impl(_init_user_id: String, aqua: String, data: String) -> Resul
let current_peer_id = get_current_peer_id()
.map_err(|e| AquamarineError::CurrentPeerIdEnvError(e, String::from("CURRENT_PEER_ID")))?;
let mut execution_ctx = ExecutionContext::new(parsed_data, current_peer_id);
parsed_aqua.execute(&mut execution_ctx)?;
let call_evidence_ctx_key: &str = "__call";
let current_states: VecDeque<EvidenceState> = match parsed_data.remove(call_evidence_ctx_key) {
Some(jvalue) => serde_json::from_value(jvalue)
.map_err(AquamarineError::CallEvidenceDeserializationError)?,
None => VecDeque::new(),
};
let data =
serde_json::to_string(&execution_ctx.data).map_err(AquamarineError::DataSerdeError)?;
let mut execution_ctx = ExecutionCtx::new(parsed_data, current_peer_id);
let mut call_evidence_ctx = CallEvidenceCtx::new(current_states);
parsed_aqua.execute(&mut execution_ctx, &mut call_evidence_ctx)?;
let serialized_call_ctx = serde_json::to_value(call_evidence_ctx.new_states)
.map_err(AquamarineError::CallEvidenceSerializationError)?;
execution_ctx
.data
.insert(call_evidence_ctx_key.to_string(), serialized_call_ctx);
let data = serde_json::to_string(&execution_ctx.data)
.map_err(AquamarineError::DataSerializationError)?;
Ok(StepperOutcome {
ret_code: 0,
data,
next_peer_pks: execution_ctx.next_peer_pks,
next_peer_pks: dedup(execution_ctx.next_peer_pks),
})
}
@ -92,6 +112,15 @@ fn format_aqua(aqua: String) -> String {
String::from_iter(formatted_aqua.into_iter())
}
use std::hash::Hash;
fn dedup<T: Eq + Hash>(mut vec: Vec<T>) -> Vec<T> {
use std::collections::HashSet;
let set: HashSet<_> = vec.drain(..).collect(); // dedup
set.into_iter().collect()
}
#[cfg(test)]
mod tests {
#[test]

View File

@ -26,6 +26,7 @@
)]
mod air;
mod call_evidence;
mod defines;
mod errors;
mod execution;

View File

@ -15,6 +15,7 @@
*/
mod air;
mod call_evidence;
mod defines;
mod errors;
mod execution;

View File

@ -19,7 +19,6 @@ use aqua_test_utils::unit_call_service;
use aquamarine_vm::vec1::Vec1;
use aquamarine_vm::HostExportedFunc;
use aquamarine_vm::IValue;
use aquamarine_vm::StepperOutcome;
use serde_json::json;
@ -27,7 +26,7 @@ type JValue = serde_json::Value;
#[test]
fn seq_par_call() {
let mut vm = create_aqua_vm(unit_call_service());
let mut vm = create_aqua_vm(unit_call_service(), "");
let script = String::from(
r#"
@ -43,17 +42,26 @@ fn seq_par_call() {
let res = vm
.call(json!([String::from("asd"), script, String::from("{}"),]))
.expect("should be successful");
let right_outcome = StepperOutcome {
data: String::from("{\"result_1\":\"test\"}"),
next_peer_pks: vec![String::from("remote_peer_id")],
};
assert_eq!(res, right_outcome);
let resulted_json: JValue =
serde_json::from_str(&res.data).expect("stepper should return valid json");
let right_json = json!( {
"result_1" : "test",
"__call": [
{ "par": [1,1] },
{ "call": "executed" },
{ "call": "request_sent" },
]
});
assert_eq!(resulted_json, right_json);
assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id")]);
}
#[test]
fn par_par_call() {
let mut vm = create_aqua_vm(unit_call_service());
let mut vm = create_aqua_vm(unit_call_service(), "");
let script = String::from(
r#"
@ -73,7 +81,17 @@ fn par_par_call() {
let resulted_json: JValue =
serde_json::from_str(&res.data).expect("stepper should return valid json");
let right_json = json!( {"result_1" : "test", "result_2" : "test"} );
let right_json = json!( {
"result_1" : "test",
"result_2" : "test",
"__call": [
{ "par": [3,1] },
{ "par": [1,1] },
{ "call": "executed" },
{ "call": "request_sent" },
{ "call": "executed" },
]
});
assert_eq!(resulted_json, right_json);
assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id")]);
@ -136,7 +154,7 @@ fn create_service() {
))
});
let mut vm = create_aqua_vm(call_service);
let mut vm = create_aqua_vm(call_service, "");
let res = vm
.call(json!([String::from("init_user_pk"), script, data,]))
@ -156,6 +174,10 @@ fn create_service() {
String::from("service_id"),
JValue::String(String::from("create response")),
);
data_value.as_object_mut().unwrap().insert(
String::from("__call"),
json!([{"call": "executed"}, {"call": "executed"}, {"call": "executed"}, {"call": "request_sent"}]),
);
assert_eq!(resulted_data, data_value);
assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id")]);

View File

@ -0,0 +1,518 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::echo_number_call_service;
use aqua_test_utils::unit_call_service;
use aquamarine_vm::vec1::Vec1;
use aquamarine_vm::HostExportedFunc;
use aquamarine_vm::IValue;
use serde_json::json;
type JValue = serde_json::Value;
#[test]
fn evidence_seq_par_call() {
env_logger::init();
let mut vm = create_aqua_vm(unit_call_service(), "");
let script = String::from(
r#"
(seq (
(par (
(call (%current_peer_id% ("local_service_id" "local_fn_name") () result_1))
(call ("remote_peer_id" ("service_id" "fn_name") () g))
))
(call (%current_peer_id% ("local_service_id" "local_fn_name") () result_2))
))"#,
);
let res = vm
.call(json!([
String::from("asd"),
script,
json!({
"__call": [
{ "par": [1,1] },
{ "call": "executed" },
{ "call": "executed" },
]
})
.to_string(),
]))
.expect("should be successful");
let resulted_json: JValue =
serde_json::from_str(&res.data).expect("stepper should return valid json");
let right_json = json!( {
"result_2": "test",
"__call": [
{ "par": [1,1] },
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
]
});
assert_eq!(resulted_json, right_json);
assert!(res.next_peer_pks.is_empty());
}
#[test]
fn evidence_par_par_call() {
let mut vm = create_aqua_vm(unit_call_service(), "some_peer_id");
let script = String::from(
r#"
(par (
(par (
(call ("some_peer_id" ("local_service_id" "local_fn_name") () result_1))
(call ("remote_peer_id" ("service_id" "fn_name") () g))
))
(call (%current_peer_id% ("local_service_id" "local_fn_name") () result_2))
))"#,
);
let res = vm
.call(json!([
String::from("asd"),
script,
json!({
"__call": [
{ "par": [3,0] },
{ "par": [1,1] },
{ "call": "request_sent" },
{ "call": "executed" },
]
})
.to_string(),
]))
.expect("should be successful");
let resulted_json: JValue =
serde_json::from_str(&res.data).expect("stepper should return valid json");
let right_json = json!( {
"result_1" : "test",
"result_2" : "test",
"__call": [
{ "par": [3,1] },
{ "par": [1,1] },
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
]
});
assert_eq!(resulted_json, right_json);
assert!(res.next_peer_pks.is_empty());
}
#[test]
fn evidence_seq_seq() {
let peer_id_1 = String::from("12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er");
let peer_id_2 = String::from("12D3KooWAzJcYitiZrerycVB4Wryrx22CFKdDGx7c4u31PFdfTbR");
let mut vm1 = create_aqua_vm(unit_call_service(), peer_id_1.clone());
let mut vm2 = create_aqua_vm(unit_call_service(), peer_id_2.clone());
let script = format!(
r#"
(seq (
(call ("{}" ("identity" "") () void0))
(seq (
(call ("{}" ("add_blueprint" "") () blueprint_id))
(call ("{}" ("addBlueprint-14d8488e-d10d-474d-96b2-878f6a7d74c8" "") () void1))
))
))
"#,
peer_id_1, peer_id_1, peer_id_2
);
let res1 = vm2
.call(json!([String::from("asd"), script, String::from("{}")]))
.expect("should be successful");
assert_eq!(res1.next_peer_pks, vec![peer_id_1.clone()]);
let res2 = vm1
.call(json!([String::from("asd"), script, res1.data]))
.expect("should be successful");
assert_eq!(res2.next_peer_pks, vec![peer_id_2.clone()]);
let res3 = vm2
.call(json!([String::from("asd"), script, res2.data]))
.expect("should be successful");
let resulted_json: JValue =
serde_json::from_str(&res3.data).expect("stepper should return valid json");
let right_json = json!( {
"void0": "test",
"void1": "test",
"blueprint_id": "test",
"__call": [
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
]
});
assert_eq!(resulted_json, right_json);
assert!(res3.next_peer_pks.is_empty());
}
#[test]
fn evidence_create_service() {
let module = "greeting";
let config = json!(
{
"name": module,
"mem_pages_count": 100,
"logger_enabled": true,
"wasi": {
"envs": json!({}),
"preopened_files": vec!["/tmp"],
"mapped_dirs": json!({}),
}
}
);
let mut data_value = json!({
"module_bytes": vec![1,2],
"module_config": config,
"blueprint": { "name": "blueprint", "dependencies": [module] },
"__call": [
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
]
});
let data = data_value.to_string();
let script = String::from(
r#"
(seq (
(call (%current_peer_id% ("add_module" "") (module_bytes module_config) module))
(seq (
(call (%current_peer_id% ("add_blueprint" "") (blueprint) blueprint_id))
(seq (
(call (%current_peer_id% ("create" "") (blueprint_id) service_id))
(call ("remote_peer_id" ("" "") (service_id) client_result))
))
))
))"#,
);
let call_service: HostExportedFunc = Box::new(|_, args| -> Option<IValue> {
let builtin_service = match &args[0] {
IValue::String(str) => str,
_ => unreachable!(),
};
let response = match builtin_service.as_str() {
"add_module" => String::from("add_module response"),
"add_blueprint" => String::from("add_blueprint response"),
"create" => String::from("create response"),
_ => String::from("unknown response"),
};
Some(IValue::Record(
Vec1::new(vec![
IValue::S32(0),
IValue::String(format!("\"{}\"", response)),
])
.unwrap(),
))
});
let mut vm = create_aqua_vm(call_service, "");
let res = vm
.call(json!([String::from("init_user_pk"), script, data,]))
.expect("should be successful");
let resulted_data: JValue = serde_json::from_str(&res.data).expect("should be correct json");
data_value.as_object_mut().unwrap().insert(
String::from("__call"),
json!([{"call": "executed"}, {"call": "executed"}, {"call": "executed"}, {"call": "executed"}]),
);
assert_eq!(resulted_data, data_value);
assert!(res.next_peer_pks.is_empty());
}
#[test]
fn evidence_par_seq_fold_call() {
let return_numbers_call_service: HostExportedFunc = Box::new(|_, args| -> Option<IValue> {
Some(IValue::Record(
Vec1::new(vec![
IValue::S32(0),
IValue::String(String::from(
"[\"1\", \"2\", \"3\", \"4\", \"5\", \"6\", \"7\", \"8\", \"9\", \"10\"]",
)),
])
.unwrap(),
))
});
let mut vm1 = create_aqua_vm(return_numbers_call_service, "some_peer_id_1");
let mut vm2 = create_aqua_vm(echo_number_call_service(), "some_peer_id_2");
let mut vm3 = create_aqua_vm(unit_call_service(), "some_peer_id_3");
let script = String::from(
r#"
(par (
(seq (
(call ("some_peer_id_1" ("local_service_id" "local_fn_name") () IterableResultPeer1))
(fold (IterableResultPeer1 i
(par (
(call ("some_peer_id_2" ("local_service_id" "local_fn_name") (i) acc[]))
(next i)
))
))
))
(call ("some_peer_id_3" ("local_service_id" "local_fn_name") () result_2))
))"#,
);
let res1 = vm2
.call(json!([
String::from("asd"),
script,
json!({
"__call": []
})
.to_string(),
]))
.expect("should be successful");
let res2 = vm1
.call(json!([String::from("asd"), script, res1.data]))
.expect("should be successful");
let mut data = res2.data;
for _ in 0..100 {
let res3 = vm2
.call(json!([String::from("asd"), script, data]))
.expect("should be successful");
data = res3.data;
}
let res4 = vm3
.call(json!([String::from("asd"), script, data]))
.expect("should be successful");
let resulted_json: JValue =
serde_json::from_str(&res4.data).expect("stepper should return valid json");
let right_json = json!( {
"result_2": "test",
"IterableResultPeer1": ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"],
"acc": [1,2,3,4,5,6,7,8,9,10],
"__call": [
{ "par": [21,1] },
{ "call": "executed" },
{ "par": [1,18] },
{ "call": "executed" },
{ "par": [1,16] },
{ "call": "executed" },
{ "par": [1,14] },
{ "call": "executed" },
{ "par": [1,12] },
{ "call": "executed" },
{ "par": [1,10] },
{ "call": "executed" },
{ "par": [1,8] },
{ "call": "executed" },
{ "par": [1,6] },
{ "call": "executed" },
{ "par": [1,4] },
{ "call": "executed" },
{ "par": [1,2] },
{ "call": "executed" },
{ "par": [1,0] },
{ "call": "executed" },
{ "call": "executed" },
]
});
assert_eq!(resulted_json, right_json);
assert!(res4.next_peer_pks.is_empty());
}
#[test]
fn evidence_par_seq_fold_in_cycle_call() {
let return_numbers_call_service: HostExportedFunc = Box::new(|_, args| -> Option<IValue> {
Some(IValue::Record(
Vec1::new(vec![
IValue::S32(0),
IValue::String(String::from(
"[\"1\", \"2\", \"3\", \"4\", \"5\", \"6\", \"7\", \"8\", \"9\", \"10\"]",
)),
])
.unwrap(),
))
});
let mut vm1 = create_aqua_vm(return_numbers_call_service, "some_peer_id_1");
let mut vm2 = create_aqua_vm(echo_number_call_service(), "some_peer_id_2");
let mut vm3 = create_aqua_vm(unit_call_service(), "some_peer_id_3");
let script = String::from(
r#"
(par (
(seq (
(call ("some_peer_id_1" ("local_service_id" "local_fn_name") () IterableResultPeer1))
(fold (IterableResultPeer1 i
(par (
(call ("some_peer_id_2" ("local_service_id" "local_fn_name") (i) acc[]))
(next i)
))
))
))
(call ("some_peer_id_3" ("local_service_id" "local_fn_name") () result_2))
))"#,
);
let mut data = String::from("{}");
for _ in 0..100 {
let res1 = vm1
.call(json!([String::from("asd"), script, data]))
.expect("should be successful");
data = res1.data;
let res2 = vm2
.call(json!([String::from("asd"), script, data]))
.expect("should be successful");
data = res2.data;
let res3 = vm3
.call(json!([String::from("asd"), script, data]))
.expect("should be successful");
data = res3.data;
}
let resulted_json: JValue =
serde_json::from_str(&data).expect("stepper should return valid json");
let right_json = json!( {
"result_2": "test",
"IterableResultPeer1": ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"],
"acc": [1,2,3,4,5,6,7,8,9,10],
"__call": [
{ "par": [21,1] },
{ "call": "executed" },
{ "par": [1,18] },
{ "call": "executed" },
{ "par": [1,16] },
{ "call": "executed" },
{ "par": [1,14] },
{ "call": "executed" },
{ "par": [1,12] },
{ "call": "executed" },
{ "par": [1,10] },
{ "call": "executed" },
{ "par": [1,8] },
{ "call": "executed" },
{ "par": [1,6] },
{ "call": "executed" },
{ "par": [1,4] },
{ "call": "executed" },
{ "par": [1,2] },
{ "call": "executed" },
{ "par": [1,0] },
{ "call": "executed" },
{ "call": "executed" },
]
});
assert_eq!(resulted_json, right_json);
}
#[test]
fn evidence_seq_par_seq_seq() {
let peer_id_1 = String::from("12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er");
let peer_id_2 = String::from("12D3KooWAzJcYitiZrerycVB4Wryrx22CFKdDGx7c4u31PFdfTbR");
let mut vm1 = create_aqua_vm(unit_call_service(), peer_id_1.clone());
let mut vm2 = create_aqua_vm(unit_call_service(), peer_id_2.clone());
let script = format!(
r#"
(seq (
(par (
(seq (
(call ("{}" ("" "") () result_1))
(call ("{}" ("" "") () result_2))
))
(seq (
(call ("{}" ("" "") () result_3))
(call ("{}" ("" "") () result_4))
))
))
(call ("{}" ("" "") () result_5))
))
"#,
peer_id_1, peer_id_2, peer_id_2, peer_id_1, peer_id_2
);
let res1 = vm2
.call(json!([String::from("asd"), script, String::from("{}")]))
.expect("should be successful");
assert_eq!(res1.next_peer_pks, vec![peer_id_1.clone()]);
let res2 = vm1
.call(json!([String::from("asd"), script, res1.data]))
.expect("should be successful");
assert_eq!(res2.next_peer_pks, vec![peer_id_2.clone()]);
let res3 = vm2
.call(json!([String::from("asd"), script, res2.data]))
.expect("should be successful");
let resulted_json: JValue =
serde_json::from_str(&res3.data).expect("stepper should return valid json");
let right_json = json!( {
"result_1": "test",
"result_2": "test",
"result_3": "test",
"result_4": "test",
"result_5": "test",
"__call": [
{ "par": [2,2] },
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
]
});
assert_eq!(resulted_json, right_json);
assert!(res3.next_peer_pks.is_empty());
}