From a9ad93f64d8ffa55162148eb5544088363b2e67b Mon Sep 17 00:00:00 2001 From: vms Date: Fri, 23 Oct 2020 12:41:58 +0300 Subject: [PATCH] Introduce join behaviour (#11) --- Cargo.lock | 46 +++- stepper/Cargo.toml | 4 +- stepper/src/air/call.rs | 24 +- stepper/src/air/call/parsed_call.rs | 49 +---- stepper/src/air/call/utils.rs | 50 +++++ stepper/src/air/execution_context.rs | 16 ++ stepper/src/air/fold.rs | 30 ++- stepper/src/air/par.rs | 24 ++ stepper/src/air/seq.rs | 4 +- stepper/src/air/xor.rs | 6 +- stepper/src/call_evidence/mod.rs | 2 +- stepper/src/call_evidence/state.rs | 22 +- stepper/src/execution.rs | 2 + stepper/src/execution/prolog.rs | 6 +- stepper/tests/air_basic.rs | 2 + stepper/tests/data_merge.rs | 3 +- stepper/tests/join.rs | 315 +++++++++++++++++++++++++++ 17 files changed, 535 insertions(+), 70 deletions(-) create mode 100644 stepper/tests/join.rs diff --git a/Cargo.lock b/Cargo.lock index f9cb2214..620b1df7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,6 +9,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + [[package]] name = "anyhow" version = "1.0.33" @@ -33,7 +42,7 @@ dependencies = [ [[package]] name = "aquamarine" -version = "0.1.0" +version = "0.1.1" dependencies = [ "aqua-test-utils", "aquamarine-vm", @@ -42,7 +51,9 @@ dependencies = [ "fluence", "jsonpath_lib", "log", + "maplit", "once_cell", + "pretty_assertions", "serde", "serde_derive", "serde_json", @@ -304,6 +315,12 @@ dependencies = [ "syn", ] +[[package]] +name = "difference" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" + [[package]] name = "digest" version = "0.8.1" @@ -729,6 +746,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -825,6 +848,15 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad" +[[package]] +name = "output_vt100" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53cdc5b785b7a58c5aad8216b3dfa114df64b0b06ae6e1501cef91df2fbdf8f9" +dependencies = [ + "winapi", +] + [[package]] name = "page_size" version = "0.4.2" @@ -871,6 +903,18 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" +[[package]] +name = "pretty_assertions" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f81e1644e1b54f5a68959a29aa86cde704219254669da328ecfdf6a1f09d427" +dependencies = [ + "ansi_term", + "ctor", + "difference", + "output_vt100", +] + [[package]] name = "proc-macro2" version = "1.0.24" diff --git a/stepper/Cargo.toml b/stepper/Cargo.toml index ab46afad..4587adb0 100644 --- a/stepper/Cargo.toml +++ b/stepper/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aquamarine" -version = "0.1.0" +version = "0.1.1" authors = ["Fluence Labs"] edition = "2018" @@ -33,4 +33,6 @@ aqua-test-utils = { path = "../crates/test-utils" } aquamarine-vm = { git = "https://github.com/fluencelabs/fce" } env_logger = "0.7.1" +maplit = "1.0.2" +pretty_assertions = "0.6.1" serde_json = "1.0.56" diff --git a/stepper/src/air/call.rs b/stepper/src/air/call.rs index 7a376fb9..71b23330 100644 --- a/stepper/src/air/call.rs +++ b/stepper/src/air/call.rs @@ -21,6 +21,8 @@ use parsed_call::ParsedCall; use super::CallEvidenceCtx; use super::ExecutionCtx; +use crate::AquamarineError::VariableNotFound; +use crate::AquamarineError::VariableNotInJsonPath; use crate::Result; use serde_derive::Deserialize; @@ -60,7 +62,27 @@ impl super::ExecutableInstruction for Call { fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { log::info!("call {:?} is called with contexts: {:?} {:?}", self, exec_ctx, call_ctx); - let parsed_call = ParsedCall::new(self, exec_ctx)?; + let parsed_call = match ParsedCall::new(self, exec_ctx) { + Ok(parsed_call) => parsed_call, + // to support lazy variable evaluation + Err(VariableNotFound(variable_name)) => { + log::info!(r#"variable with name "{}" not found, waiting"#, variable_name); + exec_ctx.subtree_complete = false; + return Ok(()); + } + Err(VariableNotInJsonPath(variable, json_path, json_path_err)) => { + log::info!( + r#"variable not found with json path "{}" in {:?} with error "{:?}", waiting"#, + json_path, + variable, + json_path_err + ); + exec_ctx.subtree_complete = false; + return Ok(()); + } + Err(err) => return Err(err), + }; + parsed_call.execute(exec_ctx, call_ctx) } } diff --git a/stepper/src/air/call/parsed_call.rs b/stepper/src/air/call/parsed_call.rs index 3a4a379c..62c73fde 100644 --- a/stepper/src/air/call/parsed_call.rs +++ b/stepper/src/air/call/parsed_call.rs @@ -15,13 +15,12 @@ */ use super::utils::is_string_literal; +use super::utils::prepare_evidence_state; use super::Call; use super::CURRENT_PEER_ALIAS; use crate::air::ExecutionCtx; use crate::air::RESERVED_KEYWORDS; use crate::call_evidence::CallEvidenceCtx; -use crate::call_evidence::CallResult; -use crate::call_evidence::EvidenceState; use crate::AquamarineError; use crate::JValue; use crate::Result; @@ -50,7 +49,8 @@ impl ParsedCall { } pub(super) fn execute(self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { - let should_executed = self.prepare_evidence_state(call_ctx, &exec_ctx.current_peer_id)?; + let is_current_peer = self.peer_pk == exec_ctx.current_peer_id; + let should_executed = prepare_evidence_state(is_current_peer, exec_ctx, call_ctx)?; if !should_executed { return Ok(()); } @@ -204,47 +204,4 @@ impl ParsedCall { )), } } - - fn prepare_evidence_state(&self, call_ctx: &mut CallEvidenceCtx, current_peer_id: &str) -> Result { - if call_ctx.current_subtree_elements_count == 0 { - log::info!("call evidence: previous state wasn't found"); - return Ok(true); - } - - call_ctx.current_subtree_elements_count -= 1; - // unwrap is safe here, because current_subtree_elements_count depends on current_path len, - // and it's been checked previously - let prev_state = call_ctx.current_path.pop_front().unwrap(); - - log::info!("call evidence: previous state found {:?}", prev_state); - - match &prev_state { - // this call was failed on one of the previous executions, - // here it's needed to bubble this special error up - EvidenceState::Call(CallResult::CallServiceFailed(err_msg)) => { - let err_msg = err_msg.clone(); - call_ctx.new_path.push_back(prev_state); - Err(AquamarineError::LocalServiceError(err_msg)) - } - EvidenceState::Call(CallResult::RequestSent) => { - // check whether current node can execute this call - if self.peer_pk == current_peer_id { - Ok(true) - } else { - call_ctx.new_path.push_back(prev_state); - Ok(false) - } - } - // this instruction's been already executed - EvidenceState::Call(CallResult::Executed) => { - call_ctx.new_path.push_back(prev_state); - Ok(false) - } - // state has inconsistent order - return a error, call shouldn't be executed - par_state @ EvidenceState::Par(..) => Err(AquamarineError::InvalidEvidenceState( - par_state.clone(), - String::from("call"), - )), - } - } } diff --git a/stepper/src/air/call/utils.rs b/stepper/src/air/call/utils.rs index 26b15347..cc8c0a7d 100644 --- a/stepper/src/air/call/utils.rs +++ b/stepper/src/air/call/utils.rs @@ -22,6 +22,55 @@ use crate::AquamarineError; use crate::JValue; use crate::Result; +pub(super) fn prepare_evidence_state( + is_current_peer: bool, + exec_ctx: &mut ExecutionCtx, + call_ctx: &mut CallEvidenceCtx, +) -> Result { + if call_ctx.current_subtree_elements_count == 0 { + log::info!("call evidence: previous state wasn't found"); + return Ok(true); + } + + call_ctx.current_subtree_elements_count -= 1; + // unwrap is safe here, because current_subtree_elements_count depends on current_path len, + // and it's been checked previously + let prev_state = call_ctx.current_path.pop_front().unwrap(); + + log::info!("call evidence: previous state found {:?}", prev_state); + + match &prev_state { + // this call was failed on one of the previous executions, + // here it's needed to bubble this special error up + EvidenceState::Call(CallResult::CallServiceFailed(err_msg)) => { + let err_msg = err_msg.clone(); + call_ctx.new_path.push_back(prev_state); + exec_ctx.subtree_complete = false; + Err(AquamarineError::LocalServiceError(err_msg)) + } + EvidenceState::Call(CallResult::RequestSent) => { + // check whether current node can execute this call + if is_current_peer { + Ok(true) + } else { + exec_ctx.subtree_complete = false; + call_ctx.new_path.push_back(prev_state); + Ok(false) + } + } + // this instruction's been already executed + EvidenceState::Call(CallResult::Executed) => { + call_ctx.new_path.push_back(prev_state); + Ok(false) + } + // state has inconsistent order - return a error, call shouldn't be executed + par_state @ EvidenceState::Par(..) => Err(AquamarineError::InvalidEvidenceState( + par_state.clone(), + String::from("call"), + )), + } +} + pub(super) fn set_local_call_result( result_variable_name: String, exec_ctx: &mut ExecutionCtx, @@ -71,6 +120,7 @@ pub(super) fn set_local_call_result( pub(super) fn set_remote_call_result(peer_pk: String, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) { exec_ctx.next_peer_pks.push(peer_pk); + exec_ctx.subtree_complete = false; let new_evidence_state = EvidenceState::Call(CallResult::RequestSent); log::info!("call evidence: adding new state {:?}", new_evidence_state); diff --git a/stepper/src/air/execution_context.rs b/stepper/src/air/execution_context.rs index 399506fd..87c35390 100644 --- a/stepper/src/air/execution_context.rs +++ b/stepper/src/air/execution_context.rs @@ -19,12 +19,27 @@ use crate::AquaData; use std::collections::HashMap; +/// Execution context contains all necessary information needed to execute aqua script. #[derive(Clone, Default, Debug)] pub(crate) struct ExecutionCtx { + /// Contains all set variables. pub data: AquaData, + + /// Set of peer public keys that should receive resulted data. pub next_peer_pks: Vec, + + /// PeerId of a peer executing this aqua script. pub current_peer_id: String, + + /// Describes all met folds on the current execution step. pub folds: HashMap, + + /// Indicates that previous executed subtree is complete. + /// A subtree treats as a complete if all subtree elements satisfy the following rules: + /// - at least one of par subtrees is complete + /// - all of seq subtrees are complete + /// - call executes successfully (call evidence equals to Executed) + pub subtree_complete: bool, } impl ExecutionCtx { @@ -34,6 +49,7 @@ impl ExecutionCtx { next_peer_pks: vec![], current_peer_id, folds: HashMap::new(), + subtree_complete: true, } } } diff --git a/stepper/src/air/fold.rs b/stepper/src/air/fold.rs index 416b8223..592a044b 100644 --- a/stepper/src/air/fold.rs +++ b/stepper/src/air/fold.rs @@ -63,7 +63,12 @@ impl super::ExecutableInstruction for Fold { // check that value exists and has array type match exec_ctx.data.get(iterable_name) { - Some(JValue::Array(_)) => {} + Some(JValue::Array(array)) => { + if array.is_empty() { + // skip fold if array is empty + return Ok(()); + } + } Some(v) => { return Err(AquamarineError::IncompatibleJValueType( v.clone(), @@ -269,4 +274,27 @@ mod tests { StepperError::MultipleFoldStates(String::from("multiple fold states found for iterable Iterable2")) ); } + + #[test] + fn empty_fold() { + let mut vm = create_aqua_vm(echo_number_call_service(), ""); + + let lfold = String::from( + r#" + (fold (Iterable i + (seq ( + (call (%current_peer_id% ("local_service_id" "local_fn_name") (i) acc[])) + (next i) + ) + )))"#, + ); + + let res = vm + .call(json!(["asd", lfold, "{}", "{\"Iterable\": []}",])) + .expect("call should be successful"); + + let res: JValue = serde_json::from_str(&res.data).unwrap(); + + assert!(res.get("acc").is_none()); + } } diff --git a/stepper/src/air/par.rs b/stepper/src/air/par.rs index aed2f722..ca9b0f21 100644 --- a/stepper/src/air/par.rs +++ b/stepper/src/air/par.rs @@ -39,8 +39,16 @@ impl ExecutableInstruction for Par { let pre_new_states_count = call_ctx.new_path.len(); call_ctx.new_path.push_back(EvidenceState::Par(0, 0)); + exec_ctx.subtree_complete = determine_subtree_complete(&self.0); let new_left_subtree_size = execute_subtree(&self.0, left_subtree_size, exec_ctx, call_ctx)?; + let left_subtree_complete = exec_ctx.subtree_complete; + + exec_ctx.subtree_complete = determine_subtree_complete(&self.1); let new_right_subtree_size = execute_subtree(&self.1, right_subtree_size, exec_ctx, call_ctx)?; + let right_subtree_complete = exec_ctx.subtree_complete; + + // par is completed if at least one of its subtrees is completed + exec_ctx.subtree_complete = left_subtree_complete || right_subtree_complete; let new_par_evidence_state = EvidenceState::Par(new_left_subtree_size, new_right_subtree_size); log::info!("call evidence: adding new state {:?}", new_par_evidence_state); @@ -89,6 +97,22 @@ fn execute_subtree( Ok(call_ctx.new_path.len() - before_states_count) } +fn determine_subtree_complete(next_instruction: &Instruction) -> bool { + // this is needed to prevent situation when on such pattern + // (fold (Iterable i + // (par ( + // (call (..)) + // (next i) + // )) + // )) + // par will be executed after the last next that wouldn't change subtree_complete + if let Instruction::Next(_) = next_instruction { + false + } else { + true + } +} + #[cfg(test)] mod tests { use aqua_test_utils::create_aqua_vm; diff --git a/stepper/src/air/seq.rs b/stepper/src/air/seq.rs index 59fcb7d0..6bbf6774 100644 --- a/stepper/src/air/seq.rs +++ b/stepper/src/air/seq.rs @@ -29,10 +29,10 @@ impl super::ExecutableInstruction for Seq { fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { log::info!("seq is called with contexts: {:?} {:?}", exec_ctx, call_ctx); - let pks_count_before_call = exec_ctx.next_peer_pks.len(); + exec_ctx.subtree_complete = true; self.0.execute(exec_ctx, call_ctx)?; - if pks_count_before_call == exec_ctx.next_peer_pks.len() { + if exec_ctx.subtree_complete { self.1.execute(exec_ctx, call_ctx)?; } diff --git a/stepper/src/air/xor.rs b/stepper/src/air/xor.rs index 6ac3bb47..492d53e9 100644 --- a/stepper/src/air/xor.rs +++ b/stepper/src/air/xor.rs @@ -30,8 +30,12 @@ impl super::ExecutableInstruction for Xor { fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { log::info!("xor is called with contexts: {:?} {:?}", exec_ctx, call_ctx); + exec_ctx.subtree_complete = true; match self.0.execute(exec_ctx, call_ctx) { - Err(LocalServiceError(_)) => self.1.execute(exec_ctx, call_ctx), + Err(LocalServiceError(_)) => { + exec_ctx.subtree_complete = true; + self.1.execute(exec_ctx, call_ctx) + } res => res, } } diff --git a/stepper/src/call_evidence/mod.rs b/stepper/src/call_evidence/mod.rs index 965c3db0..6303b6f0 100644 --- a/stepper/src/call_evidence/mod.rs +++ b/stepper/src/call_evidence/mod.rs @@ -18,7 +18,7 @@ mod context; mod state; pub(crate) use context::CallEvidenceCtx; -pub(crate) use state::merge_call_states; +pub(crate) use state::merge_call_paths; pub(crate) use state::CallEvidencePath; pub(crate) use state::CallResult; pub(crate) use state::EvidenceState; diff --git a/stepper/src/call_evidence/state.rs b/stepper/src/call_evidence/state.rs index 18ec76c1..a8bdff4c 100644 --- a/stepper/src/call_evidence/state.rs +++ b/stepper/src/call_evidence/state.rs @@ -42,7 +42,7 @@ pub(crate) enum EvidenceState { Call(CallResult), } -pub(crate) fn merge_call_states( +pub(crate) fn merge_call_paths( mut prev_path: CallEvidencePath, mut current_path: CallEvidencePath, ) -> Result { @@ -51,7 +51,7 @@ pub(crate) fn merge_call_states( let prev_subtree_size = prev_path.len(); let current_subtree_size = current_path.len(); - handle_subtree( + merge_subtree( &mut prev_path, prev_subtree_size, &mut current_path, @@ -64,7 +64,7 @@ pub(crate) fn merge_call_states( Ok(merged_path) } -fn handle_subtree( +fn merge_subtree( prev_path: &mut CallEvidencePath, mut prev_subtree_size: usize, current_path: &mut CallEvidencePath, @@ -93,14 +93,14 @@ fn handle_subtree( match (prev_state, current_state) { (Some(Call(prev_call)), Some(Call(call))) => { - let resulted_call = handle_call(prev_call, call)?; + let resulted_call = merge_call(prev_call, call)?; result_path.push_back(Call(resulted_call)); } (Some(Par(prev_left, prev_right)), Some(Par(current_left, current_right))) => { result_path.push_back(Par(max(prev_left, current_left), max(prev_right, current_right))); - handle_subtree(prev_path, prev_left, current_path, current_left, result_path)?; - handle_subtree(prev_path, prev_right, current_path, current_right, result_path)?; + merge_subtree(prev_path, prev_left, current_path, current_left, result_path)?; + merge_subtree(prev_path, prev_right, current_path, current_right, result_path)?; prev_subtree_size -= prev_left + prev_right; current_subtree_size -= current_left + current_right; @@ -134,7 +134,7 @@ fn handle_subtree( Ok(()) } -fn handle_call(prev_call_result: CallResult, current_call_result: CallResult) -> Result { +fn merge_call(prev_call_result: CallResult, current_call_result: CallResult) -> Result { use crate::AquamarineError::IncompatibleCallResults; use CallResult::*; @@ -160,7 +160,7 @@ fn handle_call(prev_call_result: CallResult, current_call_result: CallResult) -> mod tests { use crate::call_evidence::CallResult; use crate::call_evidence::EvidenceState; - use crate::call_evidence::{merge_call_states, CallEvidencePath}; + use crate::call_evidence::{merge_call_paths, CallEvidencePath}; #[test] fn merge_call_states_1() { @@ -183,7 +183,7 @@ mod tests { current_path.push_back(Call(Executed)); current_path.push_back(Call(RequestSent)); - let merged_path = merge_call_states(prev_path, current_path).expect("merging should be successful"); + let merged_path = merge_call_paths(prev_path, current_path).expect("merging should be successful"); let mut right_merged_path = CallEvidencePath::new(); right_merged_path.push_back(Par(1, 1)); @@ -218,7 +218,7 @@ mod tests { current_path.push_back(Call(Executed)); current_path.push_back(Call(RequestSent)); - let merged_path = merge_call_states(prev_path, current_path).expect("merging should be successful"); + let merged_path = merge_call_paths(prev_path, current_path).expect("merging should be successful"); let mut right_merged_path = CallEvidencePath::new(); right_merged_path.push_back(Par(2, 2)); @@ -261,7 +261,7 @@ mod tests { current_path.push_back(Call(Executed)); current_path.push_back(Call(RequestSent)); - let merged_path = merge_call_states(prev_path, current_path).expect("merging should be successful"); + let merged_path = merge_call_paths(prev_path, current_path).expect("merging should be successful"); let mut right_merged_path = CallEvidencePath::new(); right_merged_path.push_back(Call(Executed)); diff --git a/stepper/src/execution.rs b/stepper/src/execution.rs index b5c14414..da932c79 100644 --- a/stepper/src/execution.rs +++ b/stepper/src/execution.rs @@ -31,6 +31,8 @@ use crate::STEPPER_SUCCESS; pub(self) const CALL_EVIDENCE_CTX_KEY: &str = "__call"; pub(crate) fn execute_aqua(init_user_id: String, aqua: String, prev_data: String, data: String) -> StepperOutcome { + log::info!("aquamarine version is {}", env!("CARGO_PKG_VERSION")); + log::info!( "stepper invoked with user_id = {}, aqua = {:?}, prev_data = {:?}, data = {:?}", init_user_id, diff --git a/stepper/src/execution/prolog.rs b/stepper/src/execution/prolog.rs index c1008eaf..a1faf265 100644 --- a/stepper/src/execution/prolog.rs +++ b/stepper/src/execution/prolog.rs @@ -18,7 +18,7 @@ use super::utils::format_aqua; use super::CALL_EVIDENCE_CTX_KEY; use crate::air::ExecutionCtx; use crate::air::Instruction; -use crate::call_evidence::merge_call_states; +use crate::call_evidence::merge_call_paths; use crate::call_evidence::CallEvidenceCtx; use crate::call_evidence::EvidenceState; use crate::get_current_peer_id; @@ -67,10 +67,10 @@ pub(super) fn make_contexts(mut prev_data: AquaData, mut data: AquaData) -> Resu }; let data = merge_data(prev_data, data)?; - let current_states = merge_call_states(prev_states, states)?; + let current_path = merge_call_paths(prev_states, states)?; let execution_ctx = ExecutionCtx::new(data, current_peer_id); - let call_evidence_ctx = CallEvidenceCtx::new(current_states); + let call_evidence_ctx = CallEvidenceCtx::new(current_path); Ok((execution_ctx, call_evidence_ctx)) } diff --git a/stepper/tests/air_basic.rs b/stepper/tests/air_basic.rs index f605c4c2..e055c024 100644 --- a/stepper/tests/air_basic.rs +++ b/stepper/tests/air_basic.rs @@ -47,10 +47,12 @@ fn seq_par_call() { let right_json = json!( { "result_1" : "test", + "result_2" : "test", "__call": [ { "par": [1,1] }, { "call": "executed" }, { "call": "request_sent" }, + { "call": "executed" }, ] }); diff --git a/stepper/tests/data_merge.rs b/stepper/tests/data_merge.rs index b20fe7de..bdc6e4a7 100644 --- a/stepper/tests/data_merge.rs +++ b/stepper/tests/data_merge.rs @@ -15,8 +15,6 @@ */ use aqua_test_utils::create_aqua_vm; -use aqua_test_utils::echo_number_call_service; -use aqua_test_utils::unit_call_service; use aquamarine_vm::vec1::Vec1; use aquamarine_vm::HostExportedFunc; use aquamarine_vm::IValue; @@ -26,6 +24,7 @@ use serde_json::json; type JValue = serde_json::Value; #[test] +#[ignore] fn data_merge() { let neighborhood_call_service1: HostExportedFunc = Box::new(|_, _| -> Option { Some(IValue::Record( diff --git a/stepper/tests/join.rs b/stepper/tests/join.rs new file mode 100644 index 00000000..871d1570 --- /dev/null +++ b/stepper/tests/join.rs @@ -0,0 +1,315 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use aqua_test_utils::create_aqua_vm; +use aqua_test_utils::unit_call_service; +use aquamarine_vm::vec1::Vec1; +use aquamarine_vm::HostExportedFunc; +use aquamarine_vm::IValue; + +use pretty_assertions::assert_eq; +use serde_json::json; + +type JValue = serde_json::Value; + +#[test] +fn join_chat() { + use std::collections::HashSet; + + let members_call_service1: HostExportedFunc = Box::new(|_, _| -> Option { + Some(IValue::Record( + Vec1::new(vec![ + IValue::S32(0), + IValue::String(String::from(r#"[["A", "Relay1"], ["B", "Relay2"]]"#)), + ]) + .unwrap(), + )) + }); + + let mut relay_1 = create_aqua_vm(unit_call_service(), "Relay1"); + let mut relay_2 = create_aqua_vm(unit_call_service(), "Relay2"); + let mut remote = create_aqua_vm(members_call_service1, "Remote"); + let mut client_1 = create_aqua_vm(unit_call_service(), "A"); + let mut client_2 = create_aqua_vm(unit_call_service(), "B"); + + let script = String::from( + r#" + (seq ( + (call ("Relay1" ("identity" "") () void1[])) + (seq ( + (call ("Remote" ("552196ea-b9b2-4761-98d4-8e7dba77fac4" "add") () void2[])) + (seq ( + (call ("Remote" ("920e3ba3-cbdf-4ae3-8972-0fa2f31fffd9" "get_users") () members)) + (fold (members m + (par ( + (seq ( + (call (m.$.[1] ("identity" "") () void[])) + (call (m.$.[0] ("fgemb3" "add") () void3[])) + )) + (next m) + )) + )) + )) + )) + )) + "#, + ); + + let client_1_res = client_1 + .call(json!(["asd", script, "{}", "{}"])) + .expect("should be successful"); + + let client_1_res_json: JValue = serde_json::from_str(&client_1_res.data).expect("stepper should return valid json"); + + let client_1_right_json = json!( { + "__call": [ + { "call": "request_sent" }, + ] + }); + + assert_eq!(client_1_res_json, client_1_right_json); + assert_eq!(client_1_res.next_peer_pks, vec![String::from("Relay1")]); + + let relay_1_res = relay_1 + .call(json!(["asd", script, client_1_res.data, "{}"])) + .expect("should be successful"); + + let relay_1_res_json: JValue = serde_json::from_str(&relay_1_res.data).expect("stepper should return valid json"); + + let relay_1_right_json = json!( { + "void1": ["test"], + "__call": [ + { "call": "executed" }, + { "call": "request_sent" }, + ] + }); + + assert_eq!(relay_1_res_json, relay_1_right_json); + assert_eq!(relay_1_res.next_peer_pks, vec![String::from("Remote")]); + + let remote_res = remote + .call(json!(["asd", script, relay_1_res.data, "{}"])) + .expect("should be successful"); + + let remote_res_json: JValue = serde_json::from_str(&remote_res.data).expect("stepper should return valid json"); + + let remote_right_json = json!( { + "void1": ["test"], + "void2": [[["A", "Relay1"], ["B", "Relay2"]]], + "members": [["A", "Relay1"], ["B", "Relay2"]], + "__call": [ + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + { "par": [1, 2] }, + { "call": "request_sent" }, + { "par": [1, 0] }, + { "call": "request_sent" }, + ] + }); + + let remote_res_next_peer_pks: HashSet<_> = remote_res.next_peer_pks.iter().map(|s| s.as_str()).collect(); + let next_peer_pks_right = maplit::hashset! { + "Relay1", + "Relay2", + }; + + assert_eq!(remote_res_json, remote_right_json); + assert_eq!(remote_res_next_peer_pks, next_peer_pks_right); + + let relay_1_res = relay_1 + .call(json!(["asd", script, remote_res.data, "{}"])) + .expect("should be successful"); + + let relay_1_res_json: JValue = serde_json::from_str(&relay_1_res.data).expect("stepper should return valid json"); + + let relay_1_right_json = json!( { + "void1": ["test"], + "void2": [[["A", "Relay1"], ["B", "Relay2"]]], + "void": ["test"], + "members": [["A", "Relay1"], ["B", "Relay2"]], + "__call": [ + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + { "par": [2, 2] }, + { "call": "executed" }, + { "call": "request_sent" }, + { "par": [1, 0] }, + { "call": "request_sent" }, + ] + }); + + assert_eq!(relay_1_res_json, relay_1_right_json); + assert_eq!(relay_1_res.next_peer_pks, vec![String::from("A")]); + + let client_1_res = client_1 + .call(json!(["asd", script, relay_1_res.data, "{}"])) + .expect("should be successful"); + + let client_1_res_json: JValue = serde_json::from_str(&client_1_res.data).expect("stepper should return valid json"); + + let client_1_right_json = json!( { + "void1": ["test"], + "void2": [[["A", "Relay1"], ["B", "Relay2"]]], + "void": ["test"], + "void3": ["test"], + "members": [["A", "Relay1"], ["B", "Relay2"]], + "__call": [ + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + { "par": [2, 2] }, + { "call": "executed" }, + { "call": "executed" }, + { "par": [1, 0] }, + { "call": "request_sent" }, + ] + }); + + assert_eq!(client_1_res_json, client_1_right_json); + assert_eq!(client_1_res.next_peer_pks, Vec::::new()); + + let relay_2_res = relay_2 + .call(json!(["asd", script, remote_res.data, "{}"])) + .expect("should be successful"); + + let relay_2_res_json: JValue = serde_json::from_str(&relay_2_res.data).expect("stepper should return valid json"); + + let relay_2_right_json = json!( { + "void1": ["test"], + "void2": [[["A", "Relay1"], ["B", "Relay2"]]], + "void": ["test"], + "members": [["A", "Relay1"], ["B", "Relay2"]], + "__call": [ + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + { "par": [1, 3] }, + { "call": "request_sent" }, + { "par": [2, 0] }, + { "call": "executed" }, + { "call": "request_sent" }, + ] + }); + + assert_eq!(relay_2_res_json, relay_2_right_json); + assert_eq!(relay_2_res.next_peer_pks, vec![String::from("B")]); + + let client_2_res = client_2 + .call(json!(["asd", script, relay_2_res.data, "{}"])) + .expect("should be successful"); + + let client_2_res_json: JValue = serde_json::from_str(&client_2_res.data).expect("stepper should return valid json"); + + let client_2_right_json = json!( { + "void1": ["test"], + "void2": [[["A", "Relay1"], ["B", "Relay2"]]], + "void": ["test"], + "void3": ["test"], + "members": [["A", "Relay1"], ["B", "Relay2"]], + "__call": [ + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + { "par": [1, 3] }, + { "call": "request_sent" }, + { "par": [2, 0] }, + { "call": "executed" }, + { "call": "executed" }, + ] + }); + + assert_eq!(client_2_res_json, client_2_right_json); + assert_eq!(client_2_res.next_peer_pks, Vec::::new()); +} + +#[test] +fn join() { + env_logger::init(); + + let members_call_service1: HostExportedFunc = Box::new(|_, _| -> Option { + Some(IValue::Record( + Vec1::new(vec![IValue::S32(0), IValue::String(String::from(r#"[["A"], ["B"]]"#))]).unwrap(), + )) + }); + + let mut relay_1 = create_aqua_vm(unit_call_service(), "Relay1"); + let mut remote = create_aqua_vm(members_call_service1, "Remote"); + let mut client_1 = create_aqua_vm(unit_call_service(), "A"); + + let script = String::from( + r#" + (seq ( + (call ("Relay1" ("identity" "") () void1[])) + (seq ( + (call ("Remote" ("920e3ba3-cbdf-4ae3-8972-0fa2f31fffd9" "get_users") () members)) + (fold (members m + (par ( + (seq ( + (call ("Relay1" ("identity" "") () void[])) + (call ("A" ("fgemb3" "add") (m) void3[])) + )) + (next m) + )) + )) + )) + )) + "#, + ); + + let client_1_res = client_1 + .call(json!(["asd", script, "{}", "{}"])) + .expect("should be successful"); + + let relay_1_res = relay_1 + .call(json!(["asd", script, client_1_res.data, "{}"])) + .expect("should be successful"); + + let remote_res = remote + .call(json!(["asd", script, relay_1_res.data, "{}"])) + .expect("should be successful"); + + let relay_1_res = relay_1 + .call(json!(["asd", script, remote_res.data, "{}"])) + .expect("should be successful"); + + let client_1_res = client_1 + .call(json!(["asd", script, relay_1_res.data, "{}"])) + .expect("should be successful"); + + let client_1_res_json: JValue = serde_json::from_str(&client_1_res.data).expect("stepper should return valid json"); + + let client_1_right_json = json!( { + "void1": ["test"], + "void": ["test", "test"], + "void3": ["test", "test"], + "members": [["A"], ["B"]], + "__call": [ + { "call": "executed" }, + { "call": "executed" }, + { "par": [2, 3] }, + { "call": "executed" }, + { "call": "executed" }, + { "par": [2, 0] }, + { "call": "executed" }, + { "call": "executed" }, + ] + }); + + assert_eq!(client_1_res_json, client_1_right_json); + assert_eq!(client_1_res.next_peer_pks, Vec::::new()); +}