Introduce join behaviour (#11)

This commit is contained in:
vms 2020-10-23 12:41:58 +03:00 committed by GitHub
parent 1db6788382
commit a9ad93f64d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 535 additions and 70 deletions

46
Cargo.lock generated
View File

@ -9,6 +9,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "ansi_term"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.33"
@ -33,7 +42,7 @@ dependencies = [
[[package]]
name = "aquamarine"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"aqua-test-utils",
"aquamarine-vm",
@ -42,7 +51,9 @@ dependencies = [
"fluence",
"jsonpath_lib",
"log",
"maplit",
"once_cell",
"pretty_assertions",
"serde",
"serde_derive",
"serde_json",
@ -304,6 +315,12 @@ dependencies = [
"syn",
]
[[package]]
name = "difference"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
[[package]]
name = "digest"
version = "0.8.1"
@ -729,6 +746,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "maplit"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
@ -825,6 +848,15 @@ version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad"
[[package]]
name = "output_vt100"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53cdc5b785b7a58c5aad8216b3dfa114df64b0b06ae6e1501cef91df2fbdf8f9"
dependencies = [
"winapi",
]
[[package]]
name = "page_size"
version = "0.4.2"
@ -871,6 +903,18 @@ version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20"
[[package]]
name = "pretty_assertions"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f81e1644e1b54f5a68959a29aa86cde704219254669da328ecfdf6a1f09d427"
dependencies = [
"ansi_term",
"ctor",
"difference",
"output_vt100",
]
[[package]]
name = "proc-macro2"
version = "1.0.24"

View File

@ -1,6 +1,6 @@
[package]
name = "aquamarine"
version = "0.1.0"
version = "0.1.1"
authors = ["Fluence Labs"]
edition = "2018"
@ -33,4 +33,6 @@ aqua-test-utils = { path = "../crates/test-utils" }
aquamarine-vm = { git = "https://github.com/fluencelabs/fce" }
env_logger = "0.7.1"
maplit = "1.0.2"
pretty_assertions = "0.6.1"
serde_json = "1.0.56"

View File

@ -21,6 +21,8 @@ use parsed_call::ParsedCall;
use super::CallEvidenceCtx;
use super::ExecutionCtx;
use crate::AquamarineError::VariableNotFound;
use crate::AquamarineError::VariableNotInJsonPath;
use crate::Result;
use serde_derive::Deserialize;
@ -60,7 +62,27 @@ impl super::ExecutableInstruction for Call {
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!("call {:?} is called with contexts: {:?} {:?}", self, exec_ctx, call_ctx);
let parsed_call = ParsedCall::new(self, exec_ctx)?;
let parsed_call = match ParsedCall::new(self, exec_ctx) {
Ok(parsed_call) => parsed_call,
// to support lazy variable evaluation
Err(VariableNotFound(variable_name)) => {
log::info!(r#"variable with name "{}" not found, waiting"#, variable_name);
exec_ctx.subtree_complete = false;
return Ok(());
}
Err(VariableNotInJsonPath(variable, json_path, json_path_err)) => {
log::info!(
r#"variable not found with json path "{}" in {:?} with error "{:?}", waiting"#,
json_path,
variable,
json_path_err
);
exec_ctx.subtree_complete = false;
return Ok(());
}
Err(err) => return Err(err),
};
parsed_call.execute(exec_ctx, call_ctx)
}
}

View File

@ -15,13 +15,12 @@
*/
use super::utils::is_string_literal;
use super::utils::prepare_evidence_state;
use super::Call;
use super::CURRENT_PEER_ALIAS;
use crate::air::ExecutionCtx;
use crate::air::RESERVED_KEYWORDS;
use crate::call_evidence::CallEvidenceCtx;
use crate::call_evidence::CallResult;
use crate::call_evidence::EvidenceState;
use crate::AquamarineError;
use crate::JValue;
use crate::Result;
@ -50,7 +49,8 @@ impl ParsedCall {
}
pub(super) fn execute(self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
let should_executed = self.prepare_evidence_state(call_ctx, &exec_ctx.current_peer_id)?;
let is_current_peer = self.peer_pk == exec_ctx.current_peer_id;
let should_executed = prepare_evidence_state(is_current_peer, exec_ctx, call_ctx)?;
if !should_executed {
return Ok(());
}
@ -204,47 +204,4 @@ impl ParsedCall {
)),
}
}
fn prepare_evidence_state(&self, call_ctx: &mut CallEvidenceCtx, current_peer_id: &str) -> Result<bool> {
if call_ctx.current_subtree_elements_count == 0 {
log::info!("call evidence: previous state wasn't found");
return Ok(true);
}
call_ctx.current_subtree_elements_count -= 1;
// unwrap is safe here, because current_subtree_elements_count depends on current_path len,
// and it's been checked previously
let prev_state = call_ctx.current_path.pop_front().unwrap();
log::info!("call evidence: previous state found {:?}", prev_state);
match &prev_state {
// this call was failed on one of the previous executions,
// here it's needed to bubble this special error up
EvidenceState::Call(CallResult::CallServiceFailed(err_msg)) => {
let err_msg = err_msg.clone();
call_ctx.new_path.push_back(prev_state);
Err(AquamarineError::LocalServiceError(err_msg))
}
EvidenceState::Call(CallResult::RequestSent) => {
// check whether current node can execute this call
if self.peer_pk == current_peer_id {
Ok(true)
} else {
call_ctx.new_path.push_back(prev_state);
Ok(false)
}
}
// this instruction's been already executed
EvidenceState::Call(CallResult::Executed) => {
call_ctx.new_path.push_back(prev_state);
Ok(false)
}
// state has inconsistent order - return a error, call shouldn't be executed
par_state @ EvidenceState::Par(..) => Err(AquamarineError::InvalidEvidenceState(
par_state.clone(),
String::from("call"),
)),
}
}
}

View File

@ -22,6 +22,55 @@ use crate::AquamarineError;
use crate::JValue;
use crate::Result;
pub(super) fn prepare_evidence_state(
is_current_peer: bool,
exec_ctx: &mut ExecutionCtx,
call_ctx: &mut CallEvidenceCtx,
) -> Result<bool> {
if call_ctx.current_subtree_elements_count == 0 {
log::info!("call evidence: previous state wasn't found");
return Ok(true);
}
call_ctx.current_subtree_elements_count -= 1;
// unwrap is safe here, because current_subtree_elements_count depends on current_path len,
// and it's been checked previously
let prev_state = call_ctx.current_path.pop_front().unwrap();
log::info!("call evidence: previous state found {:?}", prev_state);
match &prev_state {
// this call was failed on one of the previous executions,
// here it's needed to bubble this special error up
EvidenceState::Call(CallResult::CallServiceFailed(err_msg)) => {
let err_msg = err_msg.clone();
call_ctx.new_path.push_back(prev_state);
exec_ctx.subtree_complete = false;
Err(AquamarineError::LocalServiceError(err_msg))
}
EvidenceState::Call(CallResult::RequestSent) => {
// check whether current node can execute this call
if is_current_peer {
Ok(true)
} else {
exec_ctx.subtree_complete = false;
call_ctx.new_path.push_back(prev_state);
Ok(false)
}
}
// this instruction's been already executed
EvidenceState::Call(CallResult::Executed) => {
call_ctx.new_path.push_back(prev_state);
Ok(false)
}
// state has inconsistent order - return a error, call shouldn't be executed
par_state @ EvidenceState::Par(..) => Err(AquamarineError::InvalidEvidenceState(
par_state.clone(),
String::from("call"),
)),
}
}
pub(super) fn set_local_call_result(
result_variable_name: String,
exec_ctx: &mut ExecutionCtx,
@ -71,6 +120,7 @@ pub(super) fn set_local_call_result(
pub(super) fn set_remote_call_result(peer_pk: String, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) {
exec_ctx.next_peer_pks.push(peer_pk);
exec_ctx.subtree_complete = false;
let new_evidence_state = EvidenceState::Call(CallResult::RequestSent);
log::info!("call evidence: adding new state {:?}", new_evidence_state);

View File

@ -19,12 +19,27 @@ use crate::AquaData;
use std::collections::HashMap;
/// Execution context contains all necessary information needed to execute aqua script.
#[derive(Clone, Default, Debug)]
pub(crate) struct ExecutionCtx {
/// Contains all set variables.
pub data: AquaData,
/// Set of peer public keys that should receive resulted data.
pub next_peer_pks: Vec<String>,
/// PeerId of a peer executing this aqua script.
pub current_peer_id: String,
/// Describes all met folds on the current execution step.
pub folds: HashMap<String, FoldState>,
/// Indicates that previous executed subtree is complete.
/// A subtree treats as a complete if all subtree elements satisfy the following rules:
/// - at least one of par subtrees is complete
/// - all of seq subtrees are complete
/// - call executes successfully (call evidence equals to Executed)
pub subtree_complete: bool,
}
impl ExecutionCtx {
@ -34,6 +49,7 @@ impl ExecutionCtx {
next_peer_pks: vec![],
current_peer_id,
folds: HashMap::new(),
subtree_complete: true,
}
}
}

View File

@ -63,7 +63,12 @@ impl super::ExecutableInstruction for Fold {
// check that value exists and has array type
match exec_ctx.data.get(iterable_name) {
Some(JValue::Array(_)) => {}
Some(JValue::Array(array)) => {
if array.is_empty() {
// skip fold if array is empty
return Ok(());
}
}
Some(v) => {
return Err(AquamarineError::IncompatibleJValueType(
v.clone(),
@ -269,4 +274,27 @@ mod tests {
StepperError::MultipleFoldStates(String::from("multiple fold states found for iterable Iterable2"))
);
}
#[test]
fn empty_fold() {
let mut vm = create_aqua_vm(echo_number_call_service(), "");
let lfold = String::from(
r#"
(fold (Iterable i
(seq (
(call (%current_peer_id% ("local_service_id" "local_fn_name") (i) acc[]))
(next i)
)
)))"#,
);
let res = vm
.call(json!(["asd", lfold, "{}", "{\"Iterable\": []}",]))
.expect("call should be successful");
let res: JValue = serde_json::from_str(&res.data).unwrap();
assert!(res.get("acc").is_none());
}
}

View File

@ -39,8 +39,16 @@ impl ExecutableInstruction for Par {
let pre_new_states_count = call_ctx.new_path.len();
call_ctx.new_path.push_back(EvidenceState::Par(0, 0));
exec_ctx.subtree_complete = determine_subtree_complete(&self.0);
let new_left_subtree_size = execute_subtree(&self.0, left_subtree_size, exec_ctx, call_ctx)?;
let left_subtree_complete = exec_ctx.subtree_complete;
exec_ctx.subtree_complete = determine_subtree_complete(&self.1);
let new_right_subtree_size = execute_subtree(&self.1, right_subtree_size, exec_ctx, call_ctx)?;
let right_subtree_complete = exec_ctx.subtree_complete;
// par is completed if at least one of its subtrees is completed
exec_ctx.subtree_complete = left_subtree_complete || right_subtree_complete;
let new_par_evidence_state = EvidenceState::Par(new_left_subtree_size, new_right_subtree_size);
log::info!("call evidence: adding new state {:?}", new_par_evidence_state);
@ -89,6 +97,22 @@ fn execute_subtree(
Ok(call_ctx.new_path.len() - before_states_count)
}
fn determine_subtree_complete(next_instruction: &Instruction) -> bool {
// this is needed to prevent situation when on such pattern
// (fold (Iterable i
// (par (
// (call (..))
// (next i)
// ))
// ))
// par will be executed after the last next that wouldn't change subtree_complete
if let Instruction::Next(_) = next_instruction {
false
} else {
true
}
}
#[cfg(test)]
mod tests {
use aqua_test_utils::create_aqua_vm;

View File

@ -29,10 +29,10 @@ impl super::ExecutableInstruction for Seq {
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!("seq is called with contexts: {:?} {:?}", exec_ctx, call_ctx);
let pks_count_before_call = exec_ctx.next_peer_pks.len();
exec_ctx.subtree_complete = true;
self.0.execute(exec_ctx, call_ctx)?;
if pks_count_before_call == exec_ctx.next_peer_pks.len() {
if exec_ctx.subtree_complete {
self.1.execute(exec_ctx, call_ctx)?;
}

View File

@ -30,8 +30,12 @@ impl super::ExecutableInstruction for Xor {
fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> {
log::info!("xor is called with contexts: {:?} {:?}", exec_ctx, call_ctx);
exec_ctx.subtree_complete = true;
match self.0.execute(exec_ctx, call_ctx) {
Err(LocalServiceError(_)) => self.1.execute(exec_ctx, call_ctx),
Err(LocalServiceError(_)) => {
exec_ctx.subtree_complete = true;
self.1.execute(exec_ctx, call_ctx)
}
res => res,
}
}

View File

@ -18,7 +18,7 @@ mod context;
mod state;
pub(crate) use context::CallEvidenceCtx;
pub(crate) use state::merge_call_states;
pub(crate) use state::merge_call_paths;
pub(crate) use state::CallEvidencePath;
pub(crate) use state::CallResult;
pub(crate) use state::EvidenceState;

View File

@ -42,7 +42,7 @@ pub(crate) enum EvidenceState {
Call(CallResult),
}
pub(crate) fn merge_call_states(
pub(crate) fn merge_call_paths(
mut prev_path: CallEvidencePath,
mut current_path: CallEvidencePath,
) -> Result<CallEvidencePath> {
@ -51,7 +51,7 @@ pub(crate) fn merge_call_states(
let prev_subtree_size = prev_path.len();
let current_subtree_size = current_path.len();
handle_subtree(
merge_subtree(
&mut prev_path,
prev_subtree_size,
&mut current_path,
@ -64,7 +64,7 @@ pub(crate) fn merge_call_states(
Ok(merged_path)
}
fn handle_subtree(
fn merge_subtree(
prev_path: &mut CallEvidencePath,
mut prev_subtree_size: usize,
current_path: &mut CallEvidencePath,
@ -93,14 +93,14 @@ fn handle_subtree(
match (prev_state, current_state) {
(Some(Call(prev_call)), Some(Call(call))) => {
let resulted_call = handle_call(prev_call, call)?;
let resulted_call = merge_call(prev_call, call)?;
result_path.push_back(Call(resulted_call));
}
(Some(Par(prev_left, prev_right)), Some(Par(current_left, current_right))) => {
result_path.push_back(Par(max(prev_left, current_left), max(prev_right, current_right)));
handle_subtree(prev_path, prev_left, current_path, current_left, result_path)?;
handle_subtree(prev_path, prev_right, current_path, current_right, result_path)?;
merge_subtree(prev_path, prev_left, current_path, current_left, result_path)?;
merge_subtree(prev_path, prev_right, current_path, current_right, result_path)?;
prev_subtree_size -= prev_left + prev_right;
current_subtree_size -= current_left + current_right;
@ -134,7 +134,7 @@ fn handle_subtree(
Ok(())
}
fn handle_call(prev_call_result: CallResult, current_call_result: CallResult) -> Result<CallResult> {
fn merge_call(prev_call_result: CallResult, current_call_result: CallResult) -> Result<CallResult> {
use crate::AquamarineError::IncompatibleCallResults;
use CallResult::*;
@ -160,7 +160,7 @@ fn handle_call(prev_call_result: CallResult, current_call_result: CallResult) ->
mod tests {
use crate::call_evidence::CallResult;
use crate::call_evidence::EvidenceState;
use crate::call_evidence::{merge_call_states, CallEvidencePath};
use crate::call_evidence::{merge_call_paths, CallEvidencePath};
#[test]
fn merge_call_states_1() {
@ -183,7 +183,7 @@ mod tests {
current_path.push_back(Call(Executed));
current_path.push_back(Call(RequestSent));
let merged_path = merge_call_states(prev_path, current_path).expect("merging should be successful");
let merged_path = merge_call_paths(prev_path, current_path).expect("merging should be successful");
let mut right_merged_path = CallEvidencePath::new();
right_merged_path.push_back(Par(1, 1));
@ -218,7 +218,7 @@ mod tests {
current_path.push_back(Call(Executed));
current_path.push_back(Call(RequestSent));
let merged_path = merge_call_states(prev_path, current_path).expect("merging should be successful");
let merged_path = merge_call_paths(prev_path, current_path).expect("merging should be successful");
let mut right_merged_path = CallEvidencePath::new();
right_merged_path.push_back(Par(2, 2));
@ -261,7 +261,7 @@ mod tests {
current_path.push_back(Call(Executed));
current_path.push_back(Call(RequestSent));
let merged_path = merge_call_states(prev_path, current_path).expect("merging should be successful");
let merged_path = merge_call_paths(prev_path, current_path).expect("merging should be successful");
let mut right_merged_path = CallEvidencePath::new();
right_merged_path.push_back(Call(Executed));

View File

@ -31,6 +31,8 @@ use crate::STEPPER_SUCCESS;
pub(self) const CALL_EVIDENCE_CTX_KEY: &str = "__call";
pub(crate) fn execute_aqua(init_user_id: String, aqua: String, prev_data: String, data: String) -> StepperOutcome {
log::info!("aquamarine version is {}", env!("CARGO_PKG_VERSION"));
log::info!(
"stepper invoked with user_id = {}, aqua = {:?}, prev_data = {:?}, data = {:?}",
init_user_id,

View File

@ -18,7 +18,7 @@ use super::utils::format_aqua;
use super::CALL_EVIDENCE_CTX_KEY;
use crate::air::ExecutionCtx;
use crate::air::Instruction;
use crate::call_evidence::merge_call_states;
use crate::call_evidence::merge_call_paths;
use crate::call_evidence::CallEvidenceCtx;
use crate::call_evidence::EvidenceState;
use crate::get_current_peer_id;
@ -67,10 +67,10 @@ pub(super) fn make_contexts(mut prev_data: AquaData, mut data: AquaData) -> Resu
};
let data = merge_data(prev_data, data)?;
let current_states = merge_call_states(prev_states, states)?;
let current_path = merge_call_paths(prev_states, states)?;
let execution_ctx = ExecutionCtx::new(data, current_peer_id);
let call_evidence_ctx = CallEvidenceCtx::new(current_states);
let call_evidence_ctx = CallEvidenceCtx::new(current_path);
Ok((execution_ctx, call_evidence_ctx))
}

View File

@ -47,10 +47,12 @@ fn seq_par_call() {
let right_json = json!( {
"result_1" : "test",
"result_2" : "test",
"__call": [
{ "par": [1,1] },
{ "call": "executed" },
{ "call": "request_sent" },
{ "call": "executed" },
]
});

View File

@ -15,8 +15,6 @@
*/
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::echo_number_call_service;
use aqua_test_utils::unit_call_service;
use aquamarine_vm::vec1::Vec1;
use aquamarine_vm::HostExportedFunc;
use aquamarine_vm::IValue;
@ -26,6 +24,7 @@ use serde_json::json;
type JValue = serde_json::Value;
#[test]
#[ignore]
fn data_merge() {
let neighborhood_call_service1: HostExportedFunc = Box::new(|_, _| -> Option<IValue> {
Some(IValue::Record(

315
stepper/tests/join.rs Normal file
View File

@ -0,0 +1,315 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use aqua_test_utils::create_aqua_vm;
use aqua_test_utils::unit_call_service;
use aquamarine_vm::vec1::Vec1;
use aquamarine_vm::HostExportedFunc;
use aquamarine_vm::IValue;
use pretty_assertions::assert_eq;
use serde_json::json;
type JValue = serde_json::Value;
#[test]
fn join_chat() {
use std::collections::HashSet;
let members_call_service1: HostExportedFunc = Box::new(|_, _| -> Option<IValue> {
Some(IValue::Record(
Vec1::new(vec![
IValue::S32(0),
IValue::String(String::from(r#"[["A", "Relay1"], ["B", "Relay2"]]"#)),
])
.unwrap(),
))
});
let mut relay_1 = create_aqua_vm(unit_call_service(), "Relay1");
let mut relay_2 = create_aqua_vm(unit_call_service(), "Relay2");
let mut remote = create_aqua_vm(members_call_service1, "Remote");
let mut client_1 = create_aqua_vm(unit_call_service(), "A");
let mut client_2 = create_aqua_vm(unit_call_service(), "B");
let script = String::from(
r#"
(seq (
(call ("Relay1" ("identity" "") () void1[]))
(seq (
(call ("Remote" ("552196ea-b9b2-4761-98d4-8e7dba77fac4" "add") () void2[]))
(seq (
(call ("Remote" ("920e3ba3-cbdf-4ae3-8972-0fa2f31fffd9" "get_users") () members))
(fold (members m
(par (
(seq (
(call (m.$.[1] ("identity" "") () void[]))
(call (m.$.[0] ("fgemb3" "add") () void3[]))
))
(next m)
))
))
))
))
))
"#,
);
let client_1_res = client_1
.call(json!(["asd", script, "{}", "{}"]))
.expect("should be successful");
let client_1_res_json: JValue = serde_json::from_str(&client_1_res.data).expect("stepper should return valid json");
let client_1_right_json = json!( {
"__call": [
{ "call": "request_sent" },
]
});
assert_eq!(client_1_res_json, client_1_right_json);
assert_eq!(client_1_res.next_peer_pks, vec![String::from("Relay1")]);
let relay_1_res = relay_1
.call(json!(["asd", script, client_1_res.data, "{}"]))
.expect("should be successful");
let relay_1_res_json: JValue = serde_json::from_str(&relay_1_res.data).expect("stepper should return valid json");
let relay_1_right_json = json!( {
"void1": ["test"],
"__call": [
{ "call": "executed" },
{ "call": "request_sent" },
]
});
assert_eq!(relay_1_res_json, relay_1_right_json);
assert_eq!(relay_1_res.next_peer_pks, vec![String::from("Remote")]);
let remote_res = remote
.call(json!(["asd", script, relay_1_res.data, "{}"]))
.expect("should be successful");
let remote_res_json: JValue = serde_json::from_str(&remote_res.data).expect("stepper should return valid json");
let remote_right_json = json!( {
"void1": ["test"],
"void2": [[["A", "Relay1"], ["B", "Relay2"]]],
"members": [["A", "Relay1"], ["B", "Relay2"]],
"__call": [
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
{ "par": [1, 2] },
{ "call": "request_sent" },
{ "par": [1, 0] },
{ "call": "request_sent" },
]
});
let remote_res_next_peer_pks: HashSet<_> = remote_res.next_peer_pks.iter().map(|s| s.as_str()).collect();
let next_peer_pks_right = maplit::hashset! {
"Relay1",
"Relay2",
};
assert_eq!(remote_res_json, remote_right_json);
assert_eq!(remote_res_next_peer_pks, next_peer_pks_right);
let relay_1_res = relay_1
.call(json!(["asd", script, remote_res.data, "{}"]))
.expect("should be successful");
let relay_1_res_json: JValue = serde_json::from_str(&relay_1_res.data).expect("stepper should return valid json");
let relay_1_right_json = json!( {
"void1": ["test"],
"void2": [[["A", "Relay1"], ["B", "Relay2"]]],
"void": ["test"],
"members": [["A", "Relay1"], ["B", "Relay2"]],
"__call": [
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
{ "par": [2, 2] },
{ "call": "executed" },
{ "call": "request_sent" },
{ "par": [1, 0] },
{ "call": "request_sent" },
]
});
assert_eq!(relay_1_res_json, relay_1_right_json);
assert_eq!(relay_1_res.next_peer_pks, vec![String::from("A")]);
let client_1_res = client_1
.call(json!(["asd", script, relay_1_res.data, "{}"]))
.expect("should be successful");
let client_1_res_json: JValue = serde_json::from_str(&client_1_res.data).expect("stepper should return valid json");
let client_1_right_json = json!( {
"void1": ["test"],
"void2": [[["A", "Relay1"], ["B", "Relay2"]]],
"void": ["test"],
"void3": ["test"],
"members": [["A", "Relay1"], ["B", "Relay2"]],
"__call": [
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
{ "par": [2, 2] },
{ "call": "executed" },
{ "call": "executed" },
{ "par": [1, 0] },
{ "call": "request_sent" },
]
});
assert_eq!(client_1_res_json, client_1_right_json);
assert_eq!(client_1_res.next_peer_pks, Vec::<String>::new());
let relay_2_res = relay_2
.call(json!(["asd", script, remote_res.data, "{}"]))
.expect("should be successful");
let relay_2_res_json: JValue = serde_json::from_str(&relay_2_res.data).expect("stepper should return valid json");
let relay_2_right_json = json!( {
"void1": ["test"],
"void2": [[["A", "Relay1"], ["B", "Relay2"]]],
"void": ["test"],
"members": [["A", "Relay1"], ["B", "Relay2"]],
"__call": [
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
{ "par": [1, 3] },
{ "call": "request_sent" },
{ "par": [2, 0] },
{ "call": "executed" },
{ "call": "request_sent" },
]
});
assert_eq!(relay_2_res_json, relay_2_right_json);
assert_eq!(relay_2_res.next_peer_pks, vec![String::from("B")]);
let client_2_res = client_2
.call(json!(["asd", script, relay_2_res.data, "{}"]))
.expect("should be successful");
let client_2_res_json: JValue = serde_json::from_str(&client_2_res.data).expect("stepper should return valid json");
let client_2_right_json = json!( {
"void1": ["test"],
"void2": [[["A", "Relay1"], ["B", "Relay2"]]],
"void": ["test"],
"void3": ["test"],
"members": [["A", "Relay1"], ["B", "Relay2"]],
"__call": [
{ "call": "executed" },
{ "call": "executed" },
{ "call": "executed" },
{ "par": [1, 3] },
{ "call": "request_sent" },
{ "par": [2, 0] },
{ "call": "executed" },
{ "call": "executed" },
]
});
assert_eq!(client_2_res_json, client_2_right_json);
assert_eq!(client_2_res.next_peer_pks, Vec::<String>::new());
}
#[test]
fn join() {
env_logger::init();
let members_call_service1: HostExportedFunc = Box::new(|_, _| -> Option<IValue> {
Some(IValue::Record(
Vec1::new(vec![IValue::S32(0), IValue::String(String::from(r#"[["A"], ["B"]]"#))]).unwrap(),
))
});
let mut relay_1 = create_aqua_vm(unit_call_service(), "Relay1");
let mut remote = create_aqua_vm(members_call_service1, "Remote");
let mut client_1 = create_aqua_vm(unit_call_service(), "A");
let script = String::from(
r#"
(seq (
(call ("Relay1" ("identity" "") () void1[]))
(seq (
(call ("Remote" ("920e3ba3-cbdf-4ae3-8972-0fa2f31fffd9" "get_users") () members))
(fold (members m
(par (
(seq (
(call ("Relay1" ("identity" "") () void[]))
(call ("A" ("fgemb3" "add") (m) void3[]))
))
(next m)
))
))
))
))
"#,
);
let client_1_res = client_1
.call(json!(["asd", script, "{}", "{}"]))
.expect("should be successful");
let relay_1_res = relay_1
.call(json!(["asd", script, client_1_res.data, "{}"]))
.expect("should be successful");
let remote_res = remote
.call(json!(["asd", script, relay_1_res.data, "{}"]))
.expect("should be successful");
let relay_1_res = relay_1
.call(json!(["asd", script, remote_res.data, "{}"]))
.expect("should be successful");
let client_1_res = client_1
.call(json!(["asd", script, relay_1_res.data, "{}"]))
.expect("should be successful");
let client_1_res_json: JValue = serde_json::from_str(&client_1_res.data).expect("stepper should return valid json");
let client_1_right_json = json!( {
"void1": ["test"],
"void": ["test", "test"],
"void3": ["test", "test"],
"members": [["A"], ["B"]],
"__call": [
{ "call": "executed" },
{ "call": "executed" },
{ "par": [2, 3] },
{ "call": "executed" },
{ "call": "executed" },
{ "par": [2, 0] },
{ "call": "executed" },
{ "call": "executed" },
]
});
assert_eq!(client_1_res_json, client_1_right_json);
assert_eq!(client_1_res.next_peer_pks, Vec::<String>::new());
}