Add support for %init_peer_id% (#24)

This commit is contained in:
vms 2020-11-11 14:31:53 +03:00 committed by GitHub
parent 186eeb204c
commit 0d42ff7fc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 388 additions and 202 deletions

12
Cargo.lock generated
View File

@ -67,7 +67,7 @@ dependencies = [
[[package]]
name = "aquamarine-vm"
version = "0.1.2"
source = "git+https://github.com/fluencelabs/fce?branch=master#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
source = "git+https://github.com/fluencelabs/fce#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
dependencies = [
"fluence-faas",
"maplit",
@ -639,7 +639,7 @@ checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "fce"
version = "0.1.11"
source = "git+https://github.com/fluencelabs/fce?branch=master#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
source = "git+https://github.com/fluencelabs/fce#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
dependencies = [
"boolinator",
"fce-utils",
@ -661,12 +661,12 @@ dependencies = [
[[package]]
name = "fce-utils"
version = "0.1.0"
source = "git+https://github.com/fluencelabs/fce?branch=master#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
source = "git+https://github.com/fluencelabs/fce#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
[[package]]
name = "fce-wit-interfaces"
version = "0.1.8"
source = "git+https://github.com/fluencelabs/fce?branch=master#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
source = "git+https://github.com/fluencelabs/fce#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
dependencies = [
"multimap",
"wasmer-interface-types-fl",
@ -675,7 +675,7 @@ dependencies = [
[[package]]
name = "fce-wit-parser"
version = "0.1.10"
source = "git+https://github.com/fluencelabs/fce?branch=master#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
source = "git+https://github.com/fluencelabs/fce#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
dependencies = [
"anyhow",
"fce-wit-interfaces",
@ -702,7 +702,7 @@ dependencies = [
[[package]]
name = "fluence-faas"
version = "0.1.12"
source = "git+https://github.com/fluencelabs/fce?branch=master#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
source = "git+https://github.com/fluencelabs/fce#e24bb8e1ac503cdde9b1397a34ebced0132feaf1"
dependencies = [
"cmd_lib",
"fce",

View File

@ -54,6 +54,7 @@ pub enum Value<'i> {
Literal(&'i str),
JsonPath { variable: &'i str, path: &'i str },
CurrentPeerId,
InitPeerId,
}
#[derive(Debug, PartialEq, Eq, Clone)]

View File

@ -67,6 +67,7 @@ Value: Value<'input> = {
Value::JsonPath { variable, path }
},
CURRENT_PEER_ID => Value::CurrentPeerId,
INIT_PEER_ID => Value::InitPeerId,
}
@ -78,6 +79,7 @@ match {
r"[\w_-]+\[\]" => ACCUMULATOR,
r#"[\w_-]+\.*\$([\w._-]*(\[[\w"]+\])*)+"# => JSON_PATH,
r#"%current_peer_id%"# => CURRENT_PEER_ID,
r#"%init_peer_id%"# => INIT_PEER_ID,
"seq",
"call",
"null",

File diff suppressed because one or more lines are too long

View File

@ -233,6 +233,34 @@ fn parse_fold_with_xor_par_seq() {
}
}
#[test]
fn parse_current_init_peer_id() {
let source_code = r#"
(seq
(call %current_peer_id% ("local_service_id" "local_fn_name") [])
(call %init_peer_id% ("service_id" "fn_name") [])
)"#;
let instruction = parse(&source_code.as_ref());
let expected = seq(
Instruction::Call(Call {
peer_part: PeerPk(CurrentPeerId),
function_part: ServiceIdWithFuncName(
Literal("local_service_id"),
Literal("local_fn_name"),
),
args: vec![],
output: None,
}),
Instruction::Call(Call {
peer_part: PeerPk(InitPeerId),
function_part: ServiceIdWithFuncName(Literal("service_id"), Literal("fn_name")),
args: vec![],
output: None,
}),
);
assert_eq!(instruction, expected);
}
#[test]
fn seq_par_call() {
let source_code = r#"

View File

@ -146,8 +146,8 @@ pub fn set_variables_call_service(ret_mapping: HashMap<String, String>) -> HostE
#[macro_export]
macro_rules! call_vm {
($vm:expr, $init_user_id:expr, $script:expr, $prev_data:expr, $data:expr) => {
match $vm.call_with_prev_data($init_user_id, $script, $prev_data, $data) {
($vm:expr, $init_peer_id:expr, $script:expr, $prev_data:expr, $data:expr) => {
match $vm.call_with_prev_data($init_peer_id, $script, $prev_data, $data) {
Ok(v) => v,
Err(err) => panic!("VM call failed: {}", err),
}

View File

@ -29,9 +29,12 @@ pub(crate) struct ExecutionCtx<'i> {
/// Set of peer public keys that should receive resulted data.
pub next_peer_pks: Vec<String>,
/// PeerId of a peer executing this aqua script.
/// PeerId of a peer executing this aqua script at the moment.
pub current_peer_id: String,
/// PeerId of a peer send this aqua script.
pub init_peer_id: String,
/// Indicates that previous executed subtree is complete.
/// A subtree treats as a complete if all subtree elements satisfy the following rules:
/// - at least one of par subtrees is complete
@ -42,11 +45,12 @@ pub(crate) struct ExecutionCtx<'i> {
}
impl<'i> ExecutionCtx<'i> {
pub(crate) fn new(current_peer_id: String) -> Self {
pub(crate) fn new(current_peer_id: String, init_peer_id: String) -> Self {
Self {
data_cache: HashMap::new(),
next_peer_pks: vec![],
current_peer_id,
init_peer_id,
subtree_complete: true,
}
}

View File

@ -28,6 +28,7 @@ use std::borrow::Cow;
pub(crate) fn resolve_jvalue<'i>(value: &Value<'i>, ctx: &ExecutionCtx<'i>) -> Result<JValue> {
let value = match value {
Value::CurrentPeerId => JValue::String(ctx.current_peer_id.clone()),
Value::InitPeerId => JValue::String(ctx.init_peer_id.clone()),
Value::Literal(value) => JValue::String(value.to_string()),
Value::Variable(name) => resolve_variable(name, ctx)?,
Value::JsonPath { variable, path } => {
@ -69,6 +70,7 @@ pub(crate) fn resolve_variable<'exec_ctx, 'i>(variable: &'i str, ctx: &'exec_ctx
pub(crate) fn resolve_value<'i, 'a: 'i>(value: &'a Value<'i>, ctx: &'a ExecutionCtx<'i>) -> Result<Cow<'i, str>> {
let resolved = match value {
Value::CurrentPeerId => Cow::Borrowed(ctx.current_peer_id.as_str()),
Value::InitPeerId => Cow::Borrowed(ctx.init_peer_id.as_str()),
Value::Literal(value) => Cow::Borrowed(*value),
Value::Variable(name) => {
let resolved = resolve_variable(name, ctx)?;

View File

@ -27,19 +27,19 @@ use crate::Result;
use crate::StepperOutcome;
use crate::STEPPER_SUCCESS;
pub fn execute_aqua(init_user_id: String, aqua: String, prev_data: String, data: String) -> StepperOutcome {
pub fn execute_aqua(init_peer_id: String, aqua: String, prev_data: String, data: String) -> StepperOutcome {
log::info!(
"aquamarine version is {}, init user id is {}",
env!("CARGO_PKG_VERSION"),
init_user_id
init_peer_id
);
execute_aqua_impl(init_user_id, aqua, prev_data, data).unwrap_or_else(Into::into)
execute_aqua_impl(init_peer_id, aqua, prev_data, data).unwrap_or_else(Into::into)
}
fn execute_aqua_impl(_init_user_id: String, aqua: String, prev_path: String, path: String) -> Result<StepperOutcome> {
fn execute_aqua_impl(init_peer_id: String, aqua: String, prev_path: String, path: String) -> Result<StepperOutcome> {
let (prev_path, path, aqua) = prepare(prev_path, path, aqua.as_str())?;
let (mut exec_ctx, mut call_ctx) = make_contexts(prev_path, path)?;
let (mut exec_ctx, mut call_ctx) = make_contexts(prev_path, path, init_peer_id)?;
aqua.execute(&mut exec_ctx, &mut call_ctx)?;

View File

@ -64,13 +64,14 @@ pub(super) fn prepare<'i>(
pub(super) fn make_contexts(
prev_path: CallEvidencePath,
path: CallEvidencePath,
init_peer_id: String,
) -> Result<(ExecutionCtx<'static>, CallEvidenceCtx)> {
use AquamarineError::CurrentPeerIdEnvError as EnvError;
let current_peer_id = get_current_peer_id().map_err(|e| EnvError(e, String::from("CURRENT_PEER_ID")))?;
log::info!(target: RUN_PARAMS, "current peer id {}", current_peer_id);
let exec_ctx = ExecutionCtx::new(current_peer_id);
let exec_ctx = ExecutionCtx::new(current_peer_id, init_peer_id);
let current_path = merge_call_paths(prev_path, path)?;
let call_evidence_ctx = CallEvidenceCtx::new(current_path);

View File

@ -174,8 +174,8 @@ fn create_service() {
)"#,
);
let res = call_vm!(set_variables_vm, "init_user_id", script.clone(), "[]", "[]");
let res = call_vm!(vm, "init_user_id", script, "[]", res.data);
let res = call_vm!(set_variables_vm, "init_peer_id", script.clone(), "[]", "[]");
let res = call_vm!(vm, "init_peer_id", script, "[]", res.data);
let add_module_response = String::from("add_module response");
let add_blueprint_response = String::from("add_blueprint response");

View File

@ -235,7 +235,7 @@ fn evidence_create_service() {
Call(Executed(Rc::new(JValue::String(String::from("test"))))),
];
let res = call_vm!(vm, "init_user_id", script, "[]", json!(path).to_string());
let res = call_vm!(vm, "init_peer_id", script, "[]", json!(path).to_string());
let resulted_path: Vec<EvidenceState> = serde_json::from_str(&res.data).expect("should be a correct json");

View File

@ -242,3 +242,103 @@ fn join() {
assert_eq!(client_1_res_json, client_1_right_json);
assert_eq!(client_1_res.next_peer_pks, Vec::<String>::new());
}
#[test]
fn init_peer_id() {
let members_call_service1: HostExportedFunc = Box::new(|_, _| -> Option<IValue> {
Some(IValue::Record(
Vec1::new(vec![IValue::S32(0), IValue::String(String::from(r#"[["A"], ["B"]]"#))]).unwrap(),
))
});
let initiator_peer_id = String::from("initiator");
let mut relay_1 = create_aqua_vm(unit_call_service(), "Relay1");
let mut remote = create_aqua_vm(members_call_service1, "Remote");
let mut client_1 = create_aqua_vm(unit_call_service(), "A");
let mut initiator = create_aqua_vm(unit_call_service(), initiator_peer_id.clone());
let script = String::from(
r#"(seq
(seq
(call "Relay1" ("identity" "") [])
(seq
(call "Remote" ("920e3ba3-cbdf-4ae3-8972-0fa2f31fffd9" "get_users") [] members)
(fold members m
(par
(seq
(call "Relay1" ("identity" "") [])
(call "A" ("fgemb3" "add") [m])
)
(next m)
)
)
)
)
(call %init_peer_id% ("identity" "") [])
)
"#,
);
let initiator_1_res = call_vm!(initiator, initiator_peer_id.clone(), script.clone(), "", "");
let client_1_res = call_vm!(
client_1,
initiator_peer_id.clone(),
script.clone(),
initiator_1_res.data,
""
);
let relay_1_res = call_vm!(
relay_1,
initiator_peer_id.clone(),
script.clone(),
client_1_res.data,
""
);
let remote_res = call_vm!(remote, initiator_peer_id.clone(), script.clone(), relay_1_res.data, "");
let relay_1_res = call_vm!(relay_1, initiator_peer_id.clone(), script.clone(), remote_res.data, "");
let client_1_res = call_vm!(
client_1,
initiator_peer_id.clone(),
script.clone(),
relay_1_res.data,
""
);
let client_1_res_json: JValue = serde_json::from_str(&client_1_res.data).expect("stepper should return valid json");
let client_1_right_json = json!( [
{ "call": { "executed" : "test" } },
{ "call": { "executed" : [["A"], ["B"]]} },
{ "par": [2, 3] },
{ "call": { "executed" : "test" } },
{ "call": { "executed" : "test" } },
{ "par": [2, 0] },
{ "call": { "executed" : "test" } },
{ "call": { "executed" : "test" } },
{ "call": { "request_sent" : "A" } },
]);
assert_eq!(client_1_res_json, client_1_right_json);
assert_eq!(client_1_res.next_peer_pks, vec![initiator_peer_id.clone()]);
let initiator_1_res = call_vm!(initiator, initiator_peer_id, script, client_1_res.data, "");
let initiator_1_res_json: JValue =
serde_json::from_str(&initiator_1_res.data).expect("stepper should return valid json");
let initiator_1_right_json = json!( [
{ "call": { "executed" : "test" } },
{ "call": { "executed" : [["A"], ["B"]]} },
{ "par": [2, 3] },
{ "call": { "executed" : "test" } },
{ "call": { "executed" : "test" } },
{ "par": [2, 0] },
{ "call": { "executed" : "test" } },
{ "call": { "executed" : "test" } },
{ "call": { "executed" : "test" } },
]);
assert_eq!(initiator_1_res_json, initiator_1_right_json);
assert_eq!(initiator_1_res.next_peer_pks, Vec::<String>::new());
}

View File

@ -46,6 +46,6 @@ pub fn main() {
}
#[fce]
pub fn invoke(init_user_id: String, aqua: String, prev_data: String, data: String) -> StepperOutcome {
execute_aqua(init_user_id, aqua, prev_data, data)
pub fn invoke(init_peer_id: String, aqua: String, prev_data: String, data: String) -> StepperOutcome {
execute_aqua(init_peer_id, aqua, prev_data, data)
}

View File

@ -46,7 +46,7 @@ pub fn main() {
}
#[wasm_bindgen]
pub fn invoke(init_user_id: String, aqua: String, prev_data: String, data: String) -> String {
let outcome = execute_aqua(init_user_id, aqua, prev_data, data);
pub fn invoke(init_peer_id: String, aqua: String, prev_data: String, data: String) -> String {
let outcome = execute_aqua(init_peer_id, aqua, prev_data, data);
serde_json::to_string(&outcome).expect("Cannot parse StepperOutcome")
}