Fix bug in merging; add dashboard test (#36)

This commit is contained in:
vms 2020-12-07 20:28:26 +03:00 committed by GitHub
parent 42d2b825e4
commit fcdca0af73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 600 additions and 87 deletions

107
Cargo.lock generated
View File

@ -34,9 +34,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.34"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf8dcb5b4bbaa28653b647d8c77bd4ed40183b48882e130c1f1ffb73de069fd7"
checksum = "2c0df63cb2955042487fad3aefd2c6e3ae7389ac5dc1beb28921de0b69f779d4"
[[package]]
name = "aqua-test-module"
@ -68,7 +68,7 @@ dependencies = [
[[package]]
name = "aquamarine-vm"
version = "0.1.2"
source = "git+https://github.com/fluencelabs/fce#afa612a5639f900f5730f005f61651edb0c97569"
source = "git+https://github.com/fluencelabs/fce#1f2355e6ef0337eb3ee179fa7c18dd1f7fd9d9e8"
dependencies = [
"fluence-faas",
"maplit",
@ -147,9 +147,9 @@ dependencies = [
[[package]]
name = "bit-vec"
version = "0.6.2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0dc55f2d8a1a85650ac47858bb001b4c0dd73d79e3c455a842925e68d29cd3"
checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb"
[[package]]
name = "bitflags"
@ -251,9 +251,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.0.65"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95752358c8f7552394baf48cd82695b345628ad3f170d607de3ca03b8dacca15"
checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48"
[[package]]
name = "cfg-if"
@ -443,9 +443,9 @@ dependencies = [
[[package]]
name = "crossbeam-epoch"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0f606a85340376eef0d6d8fec399e6d4a544d648386c6645eb6d0653b27d9f"
checksum = "a1aaa739f95311c2c7887a76863f500026092fb1dce0161dab577e559ef3569d"
dependencies = [
"cfg-if 1.0.0",
"const_fn",
@ -457,13 +457,12 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec91540d98355f690a86367e566ecad2e9e579f230230eb7c21398372be73ea5"
checksum = "02d96d1e189ef58269ebe5b97953da3274d83a93af647c2ddd6f9dab28cedb8d"
dependencies = [
"autocfg",
"cfg-if 1.0.0",
"const_fn",
"lazy_static",
]
@ -479,9 +478,9 @@ dependencies = [
[[package]]
name = "csv"
version = "1.1.4"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4666154fd004af3fd6f1da2e81a96fd5a81927fe8ddb6ecc79e2aa6e138b54"
checksum = "f9d58633299b24b515ac72a3f869f8b91306a3cec616a602843a383acd6f9e97"
dependencies = [
"bstr",
"csv-core",
@ -628,8 +627,8 @@ checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "fce"
version = "0.1.11"
source = "git+https://github.com/fluencelabs/fce#afa612a5639f900f5730f005f61651edb0c97569"
version = "0.1.12"
source = "git+https://github.com/fluencelabs/fce#1f2355e6ef0337eb3ee179fa7c18dd1f7fd9d9e8"
dependencies = [
"boolinator",
"fce-utils",
@ -651,12 +650,12 @@ dependencies = [
[[package]]
name = "fce-utils"
version = "0.1.0"
source = "git+https://github.com/fluencelabs/fce#afa612a5639f900f5730f005f61651edb0c97569"
source = "git+https://github.com/fluencelabs/fce#1f2355e6ef0337eb3ee179fa7c18dd1f7fd9d9e8"
[[package]]
name = "fce-wit-interfaces"
version = "0.1.8"
source = "git+https://github.com/fluencelabs/fce#afa612a5639f900f5730f005f61651edb0c97569"
source = "git+https://github.com/fluencelabs/fce#1f2355e6ef0337eb3ee179fa7c18dd1f7fd9d9e8"
dependencies = [
"multimap",
"wasmer-interface-types-fl",
@ -665,10 +664,11 @@ dependencies = [
[[package]]
name = "fce-wit-parser"
version = "0.1.10"
source = "git+https://github.com/fluencelabs/fce#afa612a5639f900f5730f005f61651edb0c97569"
source = "git+https://github.com/fluencelabs/fce#1f2355e6ef0337eb3ee179fa7c18dd1f7fd9d9e8"
dependencies = [
"anyhow",
"fce-wit-interfaces",
"serde",
"walrus",
"wasmer-interface-types-fl",
"wasmer-runtime-core-fl",
@ -683,21 +683,21 @@ checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d"
[[package]]
name = "fluence"
version = "0.2.9"
source = "git+https://github.com/fluencelabs/rust-sdk#fd9672636e8d7a91275e5e0b8b88a34494336e5a"
source = "git+https://github.com/fluencelabs/rust-sdk#ebf3e63aef8d4aafda41180eeb299c4138628174"
dependencies = [
"fluence-sdk-macro 0.2.9 (git+https://github.com/fluencelabs/rust-sdk)",
"fluence-sdk-main 0.2.9 (git+https://github.com/fluencelabs/rust-sdk)",
"fluence-sdk-macro 0.2.10",
"fluence-sdk-main 0.2.10",
]
[[package]]
name = "fluence-faas"
version = "0.1.12"
source = "git+https://github.com/fluencelabs/fce#afa612a5639f900f5730f005f61651edb0c97569"
version = "0.1.13"
source = "git+https://github.com/fluencelabs/fce#1f2355e6ef0337eb3ee179fa7c18dd1f7fd9d9e8"
dependencies = [
"cmd_lib",
"fce",
"fce-utils",
"fluence-sdk-main 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
"fluence-sdk-main 0.2.9",
"itertools",
"log",
"safe-transmute",
@ -717,15 +717,15 @@ version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca5ffdf0ccf817b1e4e8438f6da7e8fa024679c706a69bde7aa8cad8b43e90ee"
dependencies = [
"fluence-sdk-wit 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
"fluence-sdk-wit 0.2.9",
]
[[package]]
name = "fluence-sdk-macro"
version = "0.2.9"
source = "git+https://github.com/fluencelabs/rust-sdk#fd9672636e8d7a91275e5e0b8b88a34494336e5a"
version = "0.2.10"
source = "git+https://github.com/fluencelabs/rust-sdk#ebf3e63aef8d4aafda41180eeb299c4138628174"
dependencies = [
"fluence-sdk-wit 0.2.9 (git+https://github.com/fluencelabs/rust-sdk)",
"fluence-sdk-wit 0.2.10",
]
[[package]]
@ -734,17 +734,17 @@ version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4f81c3778c18d372fec6d96049f25e29fc4ff7ba4ab65ef4c2285f971e8670a"
dependencies = [
"fluence-sdk-macro 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
"fluence-sdk-macro 0.2.9",
"log",
"serde",
]
[[package]]
name = "fluence-sdk-main"
version = "0.2.9"
source = "git+https://github.com/fluencelabs/rust-sdk#fd9672636e8d7a91275e5e0b8b88a34494336e5a"
version = "0.2.10"
source = "git+https://github.com/fluencelabs/rust-sdk#ebf3e63aef8d4aafda41180eeb299c4138628174"
dependencies = [
"fluence-sdk-macro 0.2.9 (git+https://github.com/fluencelabs/rust-sdk)",
"fluence-sdk-macro 0.2.10",
"log",
"serde",
]
@ -765,8 +765,8 @@ dependencies = [
[[package]]
name = "fluence-sdk-wit"
version = "0.2.9"
source = "git+https://github.com/fluencelabs/rust-sdk#fd9672636e8d7a91275e5e0b8b88a34494336e5a"
version = "0.2.10"
source = "git+https://github.com/fluencelabs/rust-sdk#ebf3e63aef8d4aafda41180eeb299c4138628174"
dependencies = [
"proc-macro2",
"quote",
@ -1045,9 +1045,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.80"
version = "0.2.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614"
checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb"
[[package]]
name = "lock_api"
@ -1091,9 +1091,9 @@ dependencies = [
[[package]]
name = "memoffset"
version = "0.5.6"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa"
checksum = "157b4208e3059a8f9e78d559edc658e13df41410cb3ae03979c83130067fdd87"
dependencies = [
"autocfg",
]
@ -1511,9 +1511,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.117"
version = "1.0.118"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a"
checksum = "06c64263859d87aa2eb554587e2d23183398d617427327cf2b3d0ed8c69e4800"
dependencies = [
"serde_derive",
]
@ -1549,9 +1549,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.117"
version = "1.0.118"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e"
checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df"
dependencies = [
"proc-macro2",
"quote",
@ -1560,9 +1560,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.59"
version = "1.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95"
checksum = "1500e84d27fe482ed1dc791a56eddc2f230046a040fa908c08bda1d9fb615779"
dependencies = [
"indexmap",
"itoa",
@ -1590,9 +1590,9 @@ checksum = "fa8f3741c7372e75519bd9346068370c9cdaabcc1f9599cbcf2a2719352286b7"
[[package]]
name = "smallvec"
version = "1.5.0"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7acad6f34eb9e8a259d3283d1e8c1d34d7415943d4895f65cc73813c7396fc85"
checksum = "ae524f056d7d770e174287294f562e95044c68e88dec909a00d2094805db9d75"
[[package]]
name = "static_assertions"
@ -1609,6 +1609,7 @@ dependencies = [
"aquamarine-vm",
"boolinator",
"criterion",
"csv",
"env_logger",
"fluence",
"jsonpath_lib",
@ -1649,9 +1650,9 @@ checksum = "343f3f510c2915908f155e94f17220b19ccfacf2a64a2a5d8004f2c3e311e7fd"
[[package]]
name = "syn"
version = "1.0.51"
version = "1.0.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b4f34193997d92804d359ed09953e25d5138df6bcc055a71bf68ee89fdf9223"
checksum = "8833e20724c24de12bbaba5ad230ea61c3eafb05b881c7c9d3cfe8638b187e68"
dependencies = [
"proc-macro2",
"quote",
@ -1784,9 +1785,9 @@ dependencies = [
[[package]]
name = "unicode-segmentation"
version = "1.7.0"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db8716a166f290ff49dabc18b44aa407cb7c6dbe1aa0971b44b8a24b0ca35aae"
checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796"
[[package]]
name = "unicode-width"
@ -2064,9 +2065,9 @@ dependencies = [
[[package]]
name = "wasmer-wasi-fl"
version = "0.17.0"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e2493c1ef951a7a1704ecfd39abbbbb2346bf3768d761ef64ed53249a32b181"
checksum = "3e0f331ec6fb16590946f61b2418cd5295c4f3aa772c77eb054bea250a8fdb71"
dependencies = [
"bincode",
"byteorder",

View File

@ -29,6 +29,7 @@ aqua-test-utils = { path = "../crates/test-utils" }
aquamarine-vm = { git = "https://github.com/fluencelabs/fce", features = ["raw-aquamarine-vm-api"], branch = "master" }
criterion = "0.3.3"
csv = "1.1.5"
once_cell = "1.4.1"
env_logger = "0.7.1"
maplit = "1.0.2"

View File

@ -47,9 +47,6 @@ impl<'i> ExecutableInstruction<'i> for Par<'i> {
let (left_subtree_size, right_subtree_size) = extract_subtree_sizes(call_ctx)?;
let before_path_size = call_ctx.current_path.len();
let before_subtree_size = call_ctx.current_subtree_size;
let par_pos = call_ctx.new_path.len();
call_ctx.new_path.push_back(EvidenceState::Par(0, 0));
@ -64,11 +61,6 @@ impl<'i> ExecutableInstruction<'i> for Par<'i> {
// par is completed if at least one of its subtrees is completed
exec_ctx.subtree_complete = left_subtree_complete || right_subtree_complete;
// decrease current subtree size by used elements from current_path
let after_path_size = call_ctx.current_path.len();
let used_path_elements = before_path_size - after_path_size;
call_ctx.current_subtree_size = before_subtree_size - used_path_elements;
Ok(())
}
}
@ -79,6 +71,7 @@ fn extract_subtree_sizes(call_ctx: &mut CallEvidenceCtx) -> Result<(usize, usize
if call_ctx.current_subtree_size == 0 {
return Ok((0, 0));
}
call_ctx.current_subtree_size -= 1;
log::trace!(
@ -105,21 +98,24 @@ fn execute_subtree<'i>(
) -> Result<()> {
use crate::AquamarineError::LocalServiceError;
let before_subtree_size = call_ctx.current_subtree_size;
call_ctx.current_subtree_size = subtree_size;
let before_new_path_len = call_ctx.new_path.len();
exec_ctx.subtree_complete = determine_subtree_complete(&subtree);
// execute subtree
// execute a subtree
match subtree.execute(exec_ctx, call_ctx) {
res @ Ok(_) => {
update_par_state(call_ctx, subtree_type, current_par_pos, before_new_path_len);
call_ctx.current_subtree_size = before_subtree_size - subtree_size;
res
}
// if there is a service error, update already added Par state
// and then bubble the error up
err @ Err(LocalServiceError(_)) => {
update_par_state(call_ctx, subtree_type, current_par_pos, before_new_path_len);
call_ctx.current_subtree_size = before_subtree_size - subtree_size;
err
}
err @ Err(_) => err,

View File

@ -20,7 +20,6 @@ use crate::Result;
use serde::Deserialize;
use serde::Serialize;
use std::cmp::max;
use std::rc::Rc;
pub type CallEvidencePath = std::collections::VecDeque<EvidenceState>;
@ -100,10 +99,20 @@ fn merge_subtree(
result_path.push_back(Call(resulted_call));
}
(Some(Par(prev_left, prev_right)), Some(Par(current_left, current_right))) => {
result_path.push_back(Par(max(prev_left, current_left), max(prev_right, current_right)));
let par_position = result_path.len();
// place temporary Par value to avoid insert in the middle
result_path.push_back(Par(0, 0));
let before_result_len = result_path.len();
merge_subtree(prev_path, prev_left, current_path, current_left, result_path)?;
let left_par_size = result_path.len() - before_result_len;
merge_subtree(prev_path, prev_right, current_path, current_right, result_path)?;
let right_par_size = result_path.len() - left_par_size - before_result_len;
// update temporary Par with final values
result_path[par_position] = Par(left_par_size, right_par_size);
prev_subtree_size -= prev_left + prev_right;
current_subtree_size -= current_left + current_right;
@ -171,6 +180,20 @@ fn merge_call(prev_call_result: CallResult, current_call_result: CallResult) ->
}
}
impl std::fmt::Display for EvidenceState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use CallResult::*;
use EvidenceState::*;
match self {
Par(left, right) => write!(f, "Par({}, {})", left, right),
Call(RequestSent(peer_id)) => write!(f, "RequestSent({})", peer_id),
Call(Executed(result)) => write!(f, "Executed({})", result),
Call(CallServiceFailed(err_msg)) => write!(f, "CallServiceFailed({})", err_msg),
}
}
}
#[cfg(test)]
mod tests {
use crate::call_evidence::CallResult;

View File

@ -80,6 +80,6 @@ pub(super) fn make_contexts(
/// Parse an AIR script to AST
pub fn parse(script: &str) -> Result<Instruction<'_>> {
let ast = air_parser::parse(script).map_err(|msg| AquamarineError::AIRParseError(msg))?;
let ast = air_parser::parse(script).map_err(AquamarineError::AIRParseError)?;
Ok(*ast)
}

View File

@ -0,0 +1,383 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use aqua_test_utils::call_vm;
use aqua_test_utils::create_aqua_vm;
use aquamarine_vm::vec1::Vec1;
use aquamarine_vm::IValue;
use aquamarine_vm::{AquamarineVM, HostExportedFunc};
use std::cell::RefCell;
use std::collections::HashSet;
use std::rc::Rc;
type JValue = serde_json::Value;
fn parse_peers() -> Vec<String> {
use csv::ReaderBuilder;
let data = include_str!("dashboard/peers");
let mut rdr = ReaderBuilder::new()
.delimiter(b',')
.has_headers(false)
.from_reader(data.as_bytes());
let mut result = Vec::new();
while let Some(record) = rdr.records().next() {
let record = record.unwrap();
result.push(record.as_slice().to_string());
}
return result;
}
fn into_hashset(peers: Vec<String>) -> HashSet<String> {
peers.into_iter().collect()
}
fn client_host_function(
known_peers: Vec<String>,
client_id: String,
relay_id: String,
) -> (HostExportedFunc, Rc<RefCell<String>>) {
let all_info = Rc::new(RefCell::new(String::new()));
let known_peers = JValue::Array(known_peers.iter().cloned().map(JValue::String).collect::<Vec<_>>());
let client_id = JValue::String(client_id);
let relay_id = JValue::String(relay_id);
let to_ret_value = Box::new(
move |service_name: &str, function_name: &str, arguments: Vec<&str>| -> JValue {
match (service_name, function_name, arguments.as_slice()) {
("", "load", &["relayId"]) => relay_id.clone(),
("", "load", &["knownPeers"]) => known_peers.clone(),
("", "load", &["clientId"]) => client_id.clone(),
_ => JValue::Null,
}
},
);
let all_info_inner = all_info.clone();
let host_function: HostExportedFunc = Box::new(move |_, args| -> Option<IValue> {
let service_name = match &args[0] {
IValue::String(str) => str,
_ => unreachable!(),
};
let function_name = match &args[1] {
IValue::String(str) => str,
_ => unreachable!(),
};
let function_args = match &args[2] {
IValue::String(str) => str,
_ => unreachable!(),
};
let ret_value = match serde_json::from_str(function_args) {
Ok(args) => to_ret_value(service_name.as_str(), function_name.as_str(), args),
Err(_) => {
*all_info_inner.borrow_mut() = function_args.clone();
JValue::Null
}
};
Some(IValue::Record(
Vec1::new(vec![IValue::S32(0), IValue::String(ret_value.to_string())]).unwrap(),
))
});
(host_function, all_info)
}
fn peer_host_function(
known_peers: Vec<String>,
blueprints: Vec<String>,
modules: Vec<String>,
interfaces: Vec<String>,
ident: String,
) -> HostExportedFunc {
let known_peers = JValue::Array(known_peers.into_iter().map(JValue::String).collect());
let blueprints = JValue::Array(blueprints.into_iter().map(JValue::String).collect());
let modules = JValue::Array(modules.into_iter().map(JValue::String).collect());
let interfaces = JValue::Array(interfaces.into_iter().map(JValue::String).collect());
let identify = JValue::String(ident.clone());
let ident = JValue::String(ident);
let to_ret_value = Box::new(
move |service_name: &str, function_name: &str, arguments: Vec<&str>| -> JValue {
match (service_name, function_name, arguments.as_slice()) {
("op", "identity", _) => ident.clone(),
("op", "identify", _) => identify.clone(),
("dist", "get_blueprints", _) => blueprints.clone(),
("dist", "get_modules", _) => modules.clone(),
("srv", "get_interfaces", _) => interfaces.clone(),
("dht", "neighborhood", _) => known_peers.clone(),
_ => JValue::Null,
}
},
);
Box::new(move |_, args| -> Option<IValue> {
let service_name = match &args[0] {
IValue::String(str) => str,
_ => unreachable!(),
};
let function_name = match &args[1] {
IValue::String(str) => str,
_ => unreachable!(),
};
let function_args = match &args[2] {
IValue::String(str) => str,
_ => unreachable!(),
};
let args: Vec<&str> = serde_json::from_str(function_args).unwrap();
let ret_value = to_ret_value(service_name.as_str(), function_name.as_str(), args);
Some(IValue::Record(
Vec1::new(vec![IValue::S32(0), IValue::String(ret_value.to_string())]).unwrap(),
))
})
}
#[rustfmt::skip]
fn create_peer_host_function(peer_id: String, known_peer_ids: Vec<String>) -> HostExportedFunc {
let relay_blueprints = (0..=2).map(|id| format!("{}_blueprint_{}", peer_id, id)).collect::<Vec<_>>();
let relay_modules = (0..=2).map(|id| format!("{}_module_{}", peer_id, id)).collect::<Vec<_>>();
let relay_interfaces = (0..=2).map(|id| format!("{}_interface_{}", peer_id, id)).collect::<Vec<_>>();
let relay_ident = format!("{}_ident", peer_id);
peer_host_function(
known_peer_ids,
relay_blueprints,
relay_modules,
relay_interfaces,
relay_ident,
)
}
struct AquaVMState {
vm: AquamarineVM,
peer_id: String,
prev_result: String,
}
#[test]
fn dashboard() {
let script = include_str!("dashboard/script.clj");
let known_peer_ids = parse_peers();
let client_id = String::from("client_id");
let relay_id = String::from("relay_id");
let (host_function, all_info) = client_host_function(known_peer_ids.clone(), client_id.clone(), relay_id.clone());
let mut client = create_aqua_vm(host_function, client_id.clone());
let mut relay = create_aqua_vm(
create_peer_host_function(relay_id.clone(), known_peer_ids.clone()),
relay_id.clone(),
);
let mut known_peers = known_peer_ids
.iter()
.cloned()
.map(|peer_id| {
let vm = create_aqua_vm(
create_peer_host_function(peer_id.clone(), known_peer_ids.clone()),
peer_id.clone(),
);
AquaVMState {
vm,
peer_id,
prev_result: String::new(),
}
})
.collect::<Vec<_>>();
// -> client 1
let client_1_res = call_vm!(client, client_id.clone(), script.clone(), "", "");
let next_peer_pks = into_hashset(client_1_res.next_peer_pks);
let mut all_peer_pks = into_hashset(known_peer_ids.clone());
all_peer_pks.insert(relay_id.clone());
assert_eq!(next_peer_pks, all_peer_pks);
// client 1 -> relay 1
let relay_1_res = call_vm!(relay, client_id.clone(), script.clone(), client_1_res.data.clone(), "");
let next_peer_pks = into_hashset(relay_1_res.next_peer_pks.clone());
all_peer_pks.remove(&relay_id);
all_peer_pks.insert(client_id.clone());
assert_eq!(next_peer_pks, all_peer_pks);
// relay 1 -> client 2
let client_2_res = call_vm!(
client,
client_id.clone(),
script.clone(),
client_1_res.data.clone(),
relay_1_res.data.clone()
);
assert!(client_2_res.next_peer_pks.is_empty());
assert_eq!(
*all_info.borrow(),
String::from(
r#"["relay_id","relay_id_ident",["relay_id_interface_0","relay_id_interface_1","relay_id_interface_2"],["relay_id_blueprint_0","relay_id_blueprint_1","relay_id_blueprint_2"],["relay_id_module_0","relay_id_module_1","relay_id_module_2"]]"#
)
);
let mut relay_2_res = relay_1_res.clone();
let mut client_3_res = client_2_res.clone();
// peers 1 -> relay 2 -> client 3
for aqua_vm in known_peers.iter_mut() {
let prev_result = std::mem::replace(&mut aqua_vm.prev_result, String::new());
let known_peer_res = call_vm!(
aqua_vm.vm,
client_id.clone(),
script.clone(),
prev_result,
client_1_res.data.clone()
);
assert_eq!(known_peer_res.next_peer_pks, vec![relay_id.clone()]);
aqua_vm.prev_result = known_peer_res.data;
relay_2_res = call_vm!(
relay,
client_id.clone(),
script.clone(),
relay_2_res.data.clone(),
aqua_vm.prev_result.clone()
);
assert_eq!(relay_2_res.next_peer_pks, vec![client_id.clone()]);
client_3_res = call_vm!(
client,
client_id.clone(),
script.clone(),
client_3_res.data.clone(),
relay_2_res.data.clone()
);
assert!(client_3_res.next_peer_pks.is_empty());
assert_eq!(
*all_info.borrow(),
format!(
r#"["{peer_id}","{peer_id}_ident",["{peer_id}_interface_0","{peer_id}_interface_1","{peer_id}_interface_2"],["{peer_id}_blueprint_0","{peer_id}_blueprint_1","{peer_id}_blueprint_2"],["{peer_id}_module_0","{peer_id}_module_1","{peer_id}_module_2"]]"#,
peer_id = aqua_vm.peer_id
)
)
}
all_peer_pks.remove(&client_id);
all_peer_pks.insert(relay_id.clone());
let mut relay_3_res = relay_2_res.clone();
let mut client_4_res = client_3_res.clone();
// peers 2 -> relay 3 -> client 4
for aqua_vm in known_peers.iter_mut() {
let prev_result = std::mem::replace(&mut aqua_vm.prev_result, String::new());
let known_peer_res = call_vm!(
aqua_vm.vm,
client_id.clone(),
script.clone(),
prev_result,
relay_1_res.data.clone()
);
all_peer_pks.remove(&aqua_vm.peer_id);
let next_peer_pks = into_hashset(known_peer_res.next_peer_pks.clone());
assert_eq!(next_peer_pks, all_peer_pks);
all_peer_pks.insert(aqua_vm.peer_id.clone());
aqua_vm.prev_result = known_peer_res.data;
relay_3_res = call_vm!(
relay,
client_id.clone(),
script.clone(),
relay_3_res.data.clone(),
aqua_vm.prev_result.clone()
);
assert_eq!(relay_3_res.next_peer_pks, vec![client_id.clone()]);
// client -> peers -> relay -> client
client_4_res = call_vm!(
client,
client_id.clone(),
script.clone(),
client_4_res.data.clone(),
relay_3_res.data.clone()
);
assert!(client_4_res.next_peer_pks.is_empty());
assert_eq!(
*all_info.borrow(),
format!(
r#"["{peer_id}","{peer_id}_ident",["{peer_id}_interface_0","{peer_id}_interface_1","{peer_id}_interface_2"],["{peer_id}_blueprint_0","{peer_id}_blueprint_1","{peer_id}_blueprint_2"],["{peer_id}_module_0","{peer_id}_module_1","{peer_id}_module_2"]]"#,
peer_id = aqua_vm.peer_id
)
)
}
let mut relay_4_res = relay_3_res.clone();
let mut client_5_res = client_4_res.clone();
// peers 2 -> peers 3 -> relay 4 -> client 5
for i in 0..known_peers.len() {
for j in 0..known_peers.len() {
if known_peers[i].peer_id == known_peers[j].peer_id {
continue;
}
let prev_data = known_peers[j].prev_result.clone();
let data = known_peers[i].prev_result.clone();
let known_peer_i_j_res = call_vm!(known_peers[j].vm, client_id.clone(), script.clone(), prev_data, data);
assert_eq!(known_peer_i_j_res.next_peer_pks, vec![relay_id.clone()]);
known_peers[j].prev_result = known_peer_i_j_res.data;
relay_4_res = call_vm!(
relay,
client_id.clone(),
script.clone(),
relay_4_res.data.clone(),
known_peers[j].prev_result.clone()
);
assert_eq!(relay_4_res.next_peer_pks, vec![client_id.clone()]);
// client -> peers -> relay -> client
client_5_res = call_vm!(
client,
client_id.clone(),
script.clone(),
client_5_res.data.clone(),
relay_4_res.data.clone()
);
assert!(client_5_res.next_peer_pks.is_empty());
assert_eq!(
*all_info.borrow(),
format!(
r#"["{peer_id}","{peer_id}_ident",["{peer_id}_interface_0","{peer_id}_interface_1","{peer_id}_interface_2"],["{peer_id}_blueprint_0","{peer_id}_blueprint_1","{peer_id}_blueprint_2"],["{peer_id}_module_0","{peer_id}_module_1","{peer_id}_module_2"]]"#,
peer_id = known_peers[j].peer_id
)
);
}
}
}

View File

@ -0,0 +1,7 @@
12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9,
12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE1,
12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb,
12D3KooWJbJFaZ3k5sNd8DjQgg3aERoKtBAnirEvPV8yp76kEXHB,
12D3KooWCKCeqLPSgMnDjyFsJuWqREDtKNHx1JEBiwaMXhCLNTRb,
12D3KooWMhVpgfQxBLkQkJed8VFNvgN4iE6MD7xCybb1ZYWW2Gtz,
12D3KooWPnLxnY71JDxvB3zbjKu9k1BCYNthGZw6iGrLYsR1RnWM,

View File

@ -0,0 +1,95 @@
(seq
(call %init_peer_id% ("" "load") ["relayId"] relayId)
(seq
(call %init_peer_id% ("" "load") ["knownPeers"] knownPeers)
(seq
(call %init_peer_id% ("" "load") ["clientId"] clientId)
; get info from relay
(par
(seq
(call relayId ("op" "identity") [])
(seq
(call relayId ("op" "identify") [] ident)
(seq
(call relayId ("dist" "get_blueprints") [] blueprints)
(seq
(call relayId ("dist" "get_modules") [] modules)
(seq
(call relayId ("srv" "get_interfaces") [] interfaces)
(seq
(call relayId ("op" "identity") [])
(call %init_peer_id% ("event" "all_info") [relayId ident interfaces blueprints modules])
)
)
)
)
)
)
(par
; iterate over known peers and get their info
(fold knownPeers p
(par
(seq
(call p ("op" "identity") [])
(seq
(call p ("op" "identify") [] ident)
(seq
(call p ("dist" "get_blueprints") [] blueprints)
(seq
(call p ("dist" "get_modules") [] modules)
(seq
(call p ("srv" "get_interfaces") [] interfaces)
(seq
(call relayId ("op" "identity") [])
(call %init_peer_id% ("event" "all_info") [p ident interfaces blueprints modules])
)
)
)
)
)
)
(next p)
)
)
; call on relay neighborhood
(seq
(call relayId ("op" "identity") [])
(seq
(call relayId ("dht" "neighborhood") [clientId] neigh)
(fold neigh n
; call neighborhood on every relays' neighbours
(par
(seq
(call n ("dht" "neighborhood") [clientId] moreNeigh)
(fold moreNeigh mp
(par
(seq
(call mp ("op" "identify") [] ident)
(seq
(call mp ("dist" "get_blueprints") [] blueprints)
(seq
(call mp ("dist" "get_modules") [] modules)
(seq
(call mp ("srv" "get_interfaces") [] interfaces)
(seq
(call relayId ("op" "identity") [])
(call %init_peer_id% ("event" "all_info") [mp ident interfaces blueprints modules])
)
)
)
)
)
(next mp)
)
)
)
(next n)
)
)
)
)
)
)
)
)
)

View File

@ -30,15 +30,23 @@ mod ast;
mod logger;
use fluence::fce;
use logger::DEFAULT_LOG_LEVEL;
use stepper_lib::execute_aqua;
use stepper_lib::StepperOutcome;
use log::Level as LogLevel;
const RUST_LOG_ENV_NAME: &str = "RUST_LOG";
pub fn main() {
logger::init_logger();
}
#[fce]
pub fn invoke(init_peer_id: String, aqua: String, prev_data: String, data: String) -> StepperOutcome {
let log_level = get_log_level();
log::set_max_level(log_level.to_level_filter());
execute_aqua(init_peer_id, aqua, prev_data, data)
}
@ -46,3 +54,12 @@ pub fn invoke(init_peer_id: String, aqua: String, prev_data: String, data: Strin
pub fn ast(script: String) -> String {
ast::ast(script)
}
fn get_log_level() -> LogLevel {
use std::str::FromStr;
match std::env::var(RUST_LOG_ENV_NAME) {
Ok(log_level_str) => LogLevel::from_str(&log_level_str).unwrap_or(DEFAULT_LOG_LEVEL),
Err(_) => DEFAULT_LOG_LEVEL,
}
}

View File

@ -14,37 +14,20 @@
* limitations under the License.
*/
use log::Level as LogLevel;
use stepper_lib::log_targets::TARGET_MAP;
use log::Level as LogLevel;
use std::collections::HashMap;
const RUST_LOG_ENV_NAME: &str = "RUST_LOG";
const DEFAULT_LOG_LEVEL: &str = "trace";
pub const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Info;
pub fn init_logger() {
use std::iter::FromIterator;
let target_map = HashMap::from_iter(TARGET_MAP.iter().cloned());
let log_level_str = std::env::var(RUST_LOG_ENV_NAME).unwrap_or(String::from(DEFAULT_LOG_LEVEL));
let log_level = to_log_level(&log_level_str);
fluence::WasmLogger::new()
.with_log_level(log_level)
fluence::WasmLoggerBuilder::new()
.with_target_map(target_map)
.build()
.unwrap();
}
fn to_log_level(raw_log_level: &String) -> LogLevel {
use LogLevel::*;
match raw_log_level.to_ascii_lowercase().as_str() {
"error" => Error,
"warn" => Warn,
"info" => Info,
"debug" => Debug,
"trace" => Trace,
_ => Trace,
}
}

View File

@ -29,7 +29,9 @@
mod ast;
mod logger;
use logger::DEFAULT_LOG_LEVEL;
use stepper_lib::execute_aqua;
use wasm_bindgen::prelude::*;
#[wasm_bindgen(start)]
@ -38,7 +40,12 @@ pub fn main() {
}
#[wasm_bindgen]
pub fn invoke(init_peer_id: String, aqua: String, prev_data: String, data: String) -> String {
pub fn invoke(init_peer_id: String, aqua: String, prev_data: String, data: String, log_level: &str) -> String {
use std::str::FromStr;
let log_level = log::Level::from_str(log_level).unwrap_or(DEFAULT_LOG_LEVEL);
log::set_max_level(log_level.to_level_filter());
let outcome = execute_aqua(init_peer_id, aqua, prev_data, data);
serde_json::to_string(&outcome).expect("Cannot parse StepperOutcome")
}