diff --git a/Cargo.lock b/Cargo.lock index 57f85c8f..a5ff4b77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,9 +2,9 @@ # It is not intended for manual editing. [[package]] name = "aho-corasick" -version = "0.7.13" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" +checksum = "b476ce7103678b0c6d3d395dbbae31d48ff910bd28be979ba5d48c6351131d0d" dependencies = [ "memchr", ] @@ -41,6 +41,7 @@ dependencies = [ "fluence", "jsonpath_lib", "log", + "once_cell", "serde", "serde_derive", "serde_json", @@ -51,7 +52,7 @@ dependencies = [ [[package]] name = "aquamarine-vm" version = "0.1.0" -source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d" +source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed" dependencies = [ "fluence-faas", "serde", @@ -144,9 +145,9 @@ checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" [[package]] name = "cc" -version = "1.0.60" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef611cc68ff783f18535d77ddd080185275713d852c4f5cbb6122c462a7a825c" +checksum = "ed67cbde08356238e75fc4656be4749481eeffb09e19f320a25237d5221c985d" [[package]] name = "cfg-if" @@ -371,8 +372,8 @@ dependencies = [ [[package]] name = "fce" -version = "0.1.7" -source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d" +version = "0.1.8" +source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed" dependencies = [ "boolinator", "fce-wit-interfaces", @@ -392,7 +393,7 @@ dependencies = [ [[package]] name = "fce-wit-interfaces" version = "0.1.5" -source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d" +source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed" dependencies = [ "multimap", "wasmer-interface-types-fl", @@ -401,7 +402,7 @@ dependencies = [ [[package]] name = "fce-wit-parser" version = "0.1.7" -source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d" +source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed" dependencies = [ "anyhow", "fce-wit-interfaces", @@ -421,8 +422,8 @@ dependencies = [ [[package]] name = "fluence-faas" -version = "0.1.8" -source = "git+https://github.com/fluencelabs/fce#1c2578ede462430d6d15e6c750a515600342f69d" +version = "0.1.9" +source = "git+https://github.com/fluencelabs/fce#05473672a15d368802cc525c6290eee11b7907ed" dependencies = [ "cmd_lib", "fce", @@ -817,6 +818,12 @@ dependencies = [ "libc", ] +[[package]] +name = "once_cell" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad" + [[package]] name = "page_size" version = "0.4.2" @@ -983,9 +990,9 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] name = "regex" -version = "1.3.9" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6" +checksum = "8963b85b8ce3074fecffde43b4b0dded83ce2f367dc8d363afc56679f3ee820b" dependencies = [ "aho-corasick", "memchr", @@ -995,9 +1002,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.18" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" +checksum = "8cab7a364d15cde1e505267766a2d3c4e22a843e1a601f0fa7564c0f82ced11c" [[package]] name = "rustc_version" @@ -1082,9 +1089,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.58" +version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a230ea9107ca2220eea9d46de97eddcb04cd00e92d13dda78e478dd33fa82bd4" +checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95" dependencies = [ "indexmap", "itoa", @@ -1122,9 +1129,9 @@ checksum = "343f3f510c2915908f155e94f17220b19ccfacf2a64a2a5d8004f2c3e311e7fd" [[package]] name = "syn" -version = "1.0.42" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c51d92969d209b54a98397e1b91c8ae82d8c87a7bb87df0b29aa2ad81454228" +checksum = "e03e57e4fcbfe7749842d53e24ccb9aa12b7252dbe5e91d2acad31834c8b8fdd" dependencies = [ "proc-macro2", "quote", @@ -1188,9 +1195,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a" +checksum = "75cf45bb0bef80604d001caaec0d09da99611b3c0fd39d3080468875cdb65645" dependencies = [ "serde", ] diff --git a/crates/test-utils/src/lib.rs b/crates/test-utils/src/lib.rs index 702c0265..6b0bbeef 100644 --- a/crates/test-utils/src/lib.rs +++ b/crates/test-utils/src/lib.rs @@ -35,7 +35,10 @@ use aquamarine_vm::IValue; use std::path::PathBuf; -pub fn create_aqua_vm(call_service: HostExportedFunc) -> AquamarineVM { +pub fn create_aqua_vm( + call_service: HostExportedFunc, + current_peer_id: impl Into, +) -> AquamarineVM { let call_service_descriptor = HostImportDescriptor { host_exported_func: call_service, argument_types: vec![IType::String, IType::String, IType::String], @@ -46,7 +49,7 @@ pub fn create_aqua_vm(call_service: HostExportedFunc) -> AquamarineVM { let config = AquamarineVMConfig { aquamarine_wasm_path: PathBuf::from("../target/wasm32-wasi/debug/aquamarine.wasm"), call_service: call_service_descriptor, - current_peer_id: String::from("test_peer_id"), + current_peer_id: current_peer_id.into(), }; AquamarineVM::new(config).expect("vm should be created") diff --git a/stepper/Cargo.toml b/stepper/Cargo.toml index c7e0e97a..189c611a 100644 --- a/stepper/Cargo.toml +++ b/stepper/Cargo.toml @@ -23,6 +23,7 @@ serde_sexpr = "0.1.0" jsonpath_lib = "0.2.5" log = "0.4.11" +once_cell = "1.4.1" serde_json = "1.0" wasm-bindgen = "0.2.68" diff --git a/stepper/src/air/call.rs b/stepper/src/air/call.rs index fda751a7..e1b80037 100644 --- a/stepper/src/air/call.rs +++ b/stepper/src/air/call.rs @@ -14,7 +14,10 @@ * limitations under the License. */ -use super::ExecutionContext; +use super::CallEvidenceCtx; +use super::CallResult; +use super::EvidenceState; +use super::ExecutionCtx; use crate::AquamarineError; use crate::JValue; use crate::Result; @@ -52,70 +55,115 @@ pub enum FunctionPart { #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub(crate) struct Call(PeerPart, FunctionPart, Vec, String); +#[derive(Debug, PartialEq, Eq)] +struct ParsedCall { + peer_pk: String, + service_id: String, + function_name: String, + function_arg_paths: Vec, + result_variable_name: String, +} + impl super::ExecutableInstruction for Call { - fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> { - log::info!("call {:?} is called with context {:?}", self, ctx); + fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { + log::info!( + "call {:?} is called with contexts: {:?} {:?}", + self, + exec_ctx, + call_ctx + ); - let (peer_pk, service_id, func_name) = self.prepare_peer_fn_parts(ctx)?; - - let function_args = self.extract_args_by_paths(ctx)?; - let function_args = serde_json::to_string(&function_args) - .map_err(|e| AquamarineError::FuncArgsSerdeError(function_args, e))?; - - if peer_pk == ctx.current_peer_id || peer_pk == CURRENT_PEER_ALIAS { - let result = unsafe { - crate::call_service(service_id.to_string(), func_name.to_string(), function_args) - }; - if result.ret_code != crate::CALL_SERVICE_SUCCESS { - return Err(AquamarineError::LocalServiceError(result.result)); - } - - let result: JValue = serde_json::from_str(&result.result) - .map_err(|e| AquamarineError::CallServiceSerdeError(result, e))?; - self.set_result(ctx, result)?; - } else { - let peer_pk = peer_pk.to_string(); - ctx.next_peer_pks.push(peer_pk); - } - - Ok(()) + let parsed_call = ParsedCall::new(self, exec_ctx)?; + parsed_call.execute(exec_ctx, call_ctx) } } -impl Call { - #[rustfmt::skip] - fn prepare_peer_fn_parts<'a>(&'a self, ctx: &'a ExecutionContext) -> Result<(&'a str, &'a str, &'a str)> { - let (peer_pk, service_id, func_name) = match (&self.0, &self.1) { - (PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::ServiceIdWithFuncName(_service_id, func_name)) => { - Ok((peer_pk, peer_service_id, func_name)) - }, - (PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::FuncName(func_name)) => { - Ok((peer_pk, peer_service_id, func_name)) - }, - (PeerPart::PeerPk(peer_pk), FunctionPart::ServiceIdWithFuncName(service_id, func_name)) => { - Ok((peer_pk, service_id, func_name)) +impl ParsedCall { + pub fn new(call: &Call, exec_ctx: &ExecutionCtx) -> Result { + let (peer_pk, service_id, func_name) = Self::prepare_peer_fn_parts(call, exec_ctx)?; + let result_variable_name = Self::parse_result_variable_name(call)?; + + Ok(Self { + peer_pk: peer_pk.to_string(), + service_id: service_id.to_string(), + function_name: func_name.to_string(), + function_arg_paths: call.2.clone(), + result_variable_name: result_variable_name.to_string(), + }) + } + + pub fn execute( + self, + exec_ctx: &mut ExecutionCtx, + call_ctx: &mut CallEvidenceCtx, + ) -> Result<()> { + let should_executed = self.prepare_evidence_state(call_ctx, &exec_ctx.current_peer_id)?; + if !should_executed { + return Ok(()); + } + + if self.peer_pk != exec_ctx.current_peer_id && self.peer_pk != CURRENT_PEER_ALIAS { + set_remote_call_result(self.peer_pk, exec_ctx, call_ctx); + + return Ok(()); + } + + let function_args = self.extract_args_by_paths(exec_ctx)?; + let function_args = serde_json::to_string(&function_args) + .map_err(|e| AquamarineError::FuncArgsSerializationError(function_args, e))?; + + let result = + unsafe { crate::call_service(self.service_id, self.function_name, function_args) }; + + if result.ret_code != crate::CALL_SERVICE_SUCCESS { + return Err(AquamarineError::LocalServiceError(result.result)); + } + + let result: JValue = serde_json::from_str(&result.result) + .map_err(|e| AquamarineError::CallServiceResultDeserializationError(result, e))?; + set_local_call_result(self.result_variable_name, exec_ctx, call_ctx, result) + } + + fn prepare_peer_fn_parts<'a>( + raw_call: &'a Call, + exec_ctx: &'a ExecutionCtx, + ) -> Result<(&'a str, &'a str, &'a str)> { + let (peer_pk, service_id, func_name) = match (&raw_call.0, &raw_call.1) { + ( + PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), + FunctionPart::ServiceIdWithFuncName(_service_id, func_name), + ) => Ok((peer_pk, peer_service_id, func_name)), + ( + PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), + FunctionPart::FuncName(func_name), + ) => Ok((peer_pk, peer_service_id, func_name)), + ( + PeerPart::PeerPk(peer_pk), + FunctionPart::ServiceIdWithFuncName(service_id, func_name), + ) => Ok((peer_pk, service_id, func_name)), + (PeerPart::PeerPk(_), FunctionPart::FuncName(_)) => { + Err(AquamarineError::InstructionError(String::from( + "call should have service id specified by peer part or function part", + ))) } - (PeerPart::PeerPk(_), FunctionPart::FuncName(_)) => Err(AquamarineError::InstructionError( - String::from("call should have service id specified by peer part or function part"), - )), }?; let peer_pk = if peer_pk != CURRENT_PEER_ALIAS { - Self::prepare_call_arg(peer_pk, ctx)? + Self::prepare_call_arg(peer_pk, exec_ctx)? } else { peer_pk }; - let service_id = Self::prepare_call_arg(service_id, ctx)?; - let func_name = Self::prepare_call_arg(func_name, ctx)?; + let service_id = Self::prepare_call_arg(service_id, exec_ctx)?; + let func_name = Self::prepare_call_arg(func_name, exec_ctx)?; Ok((peer_pk, service_id, func_name)) } - fn extract_args_by_paths(&self, ctx: &ExecutionContext) -> Result { - let mut result = Vec::with_capacity(self.2.len()); + fn extract_args_by_paths(&self, ctx: &ExecutionCtx) -> Result { + let mut result = Vec::with_capacity(self.function_arg_paths.len()); - for arg_path in self.2.iter() { + for arg_path in self.function_arg_paths.iter() { if is_string_literal(arg_path) { result.push(JValue::String(arg_path[1..arg_path.len() - 1].to_string())); } else { @@ -127,8 +175,8 @@ impl Call { Ok(JValue::Array(result)) } - fn parse_result_variable_name(&self) -> Result<&str> { - let result_variable_name = &self.3; + fn parse_result_variable_name(call: &Call) -> Result<&str> { + let result_variable_name = &call.3; if result_variable_name.is_empty() { return Err(AquamarineError::InstructionError(String::from( @@ -136,6 +184,12 @@ impl Call { ))); } + if super::RESERVED_KEYWORDS.contains(result_variable_name.as_str()) { + return Err(AquamarineError::ReservedKeywordError( + result_variable_name.to_string(), + )); + } + if is_string_literal(result_variable_name) { return Err(AquamarineError::InstructionError(String::from( "result name of a call instruction must be non string literal", @@ -147,7 +201,7 @@ impl Call { fn get_args_by_path<'args_path, 'ctx>( args_path: &'args_path str, - ctx: &'ctx ExecutionContext, + ctx: &'ctx ExecutionCtx, ) -> Result> { let mut split_arg: Vec<&str> = args_path.splitn(2, '.').collect(); let arg_path_head = split_arg.remove(0); @@ -189,7 +243,11 @@ impl Call { Ok(values) } - fn prepare_call_arg<'a>(arg_path: &'a str, ctx: &'a ExecutionContext) -> Result<&'a str> { + fn prepare_call_arg<'a>(arg_path: &'a str, ctx: &'a ExecutionCtx) -> Result<&'a str> { + if super::RESERVED_KEYWORDS.contains(arg_path) { + return Err(AquamarineError::ReservedKeywordError(arg_path.to_string())); + } + if is_string_literal(arg_path) { return Ok(&arg_path[1..arg_path.len() - 1]); } @@ -214,49 +272,118 @@ impl Call { } } - fn set_result(&self, ctx: &mut ExecutionContext, result: JValue) -> Result<()> { - use std::collections::hash_map::Entry; - - let result_variable_name = self.parse_result_variable_name()?; - - let is_array = result_variable_name.ends_with("[]"); - if !is_array { - // if result is not an array, simply insert it into data - if ctx - .data - .insert(result_variable_name.to_string(), result) - .is_some() - { - return Err(AquamarineError::MultipleVariablesFound( - result_variable_name.to_string(), - )); - } - - return Ok(()); + fn prepare_evidence_state( + &self, + call_ctx: &mut CallEvidenceCtx, + current_peer_id: &str, + ) -> Result { + if call_ctx.unused_subtree_elements_count == 0 { + log::info!("call evidence: previous state wasn't found"); + return Ok(true); } - // if result is an array, insert result to the end of the array - match ctx - .data - // unwrap is safe because it's been checked for [] - .entry(result_variable_name.strip_suffix("[]").unwrap().to_string()) - { - Entry::Occupied(mut entry) => match entry.get_mut() { - JValue::Array(values) => values.push(result), - v => { - return Err(AquamarineError::IncompatibleJValueType( - v.clone(), - String::from("Array"), - )) + call_ctx.unused_subtree_elements_count -= 1; + // unwrap is safe here, because current_states length's been checked + let prev_state = call_ctx.current_states.pop_front().unwrap(); + + log::info!("call evidence: previous state found {:?}", prev_state); + + match &prev_state { + // this call was failed on one of the previous executions, + // here it's needed to bubble this special error up + EvidenceState::Call(CallResult::CallServiceFailed(err_msg)) => { + let err_msg = err_msg.clone(); + call_ctx.new_states.push(prev_state); + Err(AquamarineError::LocalServiceError(err_msg)) + } + EvidenceState::Call(CallResult::RequestSent) => { + // check whether current node can execute this call + if self.peer_pk == current_peer_id { + Ok(true) + } else { + call_ctx.new_states.push(prev_state); + Ok(false) } - }, - Entry::Vacant(entry) => { - entry.insert(JValue::Array(vec![result])); } + // this instruction's been already executed + EvidenceState::Call(CallResult::Executed) => { + call_ctx.new_states.push(prev_state); + Ok(false) + } + // state has inconsistent order - return a error, call shouldn't be executed + par_state @ EvidenceState::Par(..) => Err(AquamarineError::InvalidEvidenceState( + par_state.clone(), + String::from("call"), + )), + } + } +} + +fn set_local_call_result( + result_variable_name: String, + exec_ctx: &mut ExecutionCtx, + call_ctx: &mut CallEvidenceCtx, + result: JValue, +) -> Result<()> { + use std::collections::hash_map::Entry; + + let new_evidence_state = EvidenceState::Call(CallResult::Executed); + let is_array = result_variable_name.ends_with("[]"); + + if !is_array { + // if result is not an array, simply insert it into data + if exec_ctx + .data + .insert(result_variable_name.clone(), result) + .is_some() + { + return Err(AquamarineError::MultipleVariablesFound( + result_variable_name, + )); } - Ok(()) + log::info!("call evidence: adding new state {:?}", new_evidence_state); + call_ctx.new_states.push(new_evidence_state); + + return Ok(()); } + + // if result is an array, insert result to the end of the array + match exec_ctx + .data + // unwrap is safe because it's been checked for [] + .entry(result_variable_name.strip_suffix("[]").unwrap().to_string()) + { + Entry::Occupied(mut entry) => match entry.get_mut() { + JValue::Array(values) => values.push(result), + v => { + return Err(AquamarineError::IncompatibleJValueType( + v.clone(), + String::from("Array"), + )) + } + }, + Entry::Vacant(entry) => { + entry.insert(JValue::Array(vec![result])); + } + } + + log::info!("call evidence: adding new state {:?}", new_evidence_state); + call_ctx.new_states.push(new_evidence_state); + + Ok(()) +} + +fn set_remote_call_result( + peer_pk: String, + exec_ctx: &mut ExecutionCtx, + call_ctx: &mut CallEvidenceCtx, +) { + exec_ctx.next_peer_pks.push(peer_pk); + + let new_evidence_state = EvidenceState::Call(CallResult::RequestSent); + log::info!("call evidence: adding new state {:?}", new_evidence_state); + call_ctx.new_states.push(new_evidence_state); } fn is_string_literal(value: &str) -> bool { @@ -277,7 +404,7 @@ mod tests { #[test] fn current_peer_id_call() { - let mut vm = create_aqua_vm(echo_string_call_service()); + let mut vm = create_aqua_vm(echo_string_call_service(), "test_peer_id"); let script = String::from( r#" @@ -318,7 +445,7 @@ mod tests { #[test] fn remote_peer_id_call() { - let mut vm = create_aqua_vm(echo_string_call_service()); + let mut vm = create_aqua_vm(echo_string_call_service(), ""); let remote_peer_id = String::from("some_remote_peer_id"); let script = format!( @@ -339,7 +466,7 @@ mod tests { #[test] fn variables() { - let mut vm = create_aqua_vm(echo_string_call_service()); + let mut vm = create_aqua_vm(echo_string_call_service(), ""); let script = format!( r#"(call (remote_peer_id ("some_service_id" "local_fn_name") ("param") result_name))"#, @@ -369,7 +496,7 @@ mod tests { )) }); - let mut vm = create_aqua_vm(call_service); + let mut vm = create_aqua_vm(call_service, ""); let script = format!( r#"(call (%current_peer_id% ("some_service_id" "local_fn_name") ("arg1" "arg2" arg3) result_name))"#, diff --git a/stepper/src/air/execution_context.rs b/stepper/src/air/execution_context.rs new file mode 100644 index 00000000..399506fd --- /dev/null +++ b/stepper/src/air/execution_context.rs @@ -0,0 +1,39 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::fold::FoldState; +use crate::AquaData; + +use std::collections::HashMap; + +#[derive(Clone, Default, Debug)] +pub(crate) struct ExecutionCtx { + pub data: AquaData, + pub next_peer_pks: Vec, + pub current_peer_id: String, + pub folds: HashMap, +} + +impl ExecutionCtx { + pub(crate) fn new(data: AquaData, current_peer_id: String) -> Self { + Self { + data, + next_peer_pks: vec![], + current_peer_id, + folds: HashMap::new(), + } + } +} diff --git a/stepper/src/air/fold.rs b/stepper/src/air/fold.rs index d045cd81..7ad52de4 100644 --- a/stepper/src/air/fold.rs +++ b/stepper/src/air/fold.rs @@ -14,7 +14,8 @@ * limitations under the License. */ -use super::ExecutionContext; +use super::CallEvidenceCtx; +use super::ExecutionCtx; use super::Instruction; use crate::AquamarineError; use crate::JValue; @@ -47,15 +48,21 @@ pub(crate) struct FoldState { } impl super::ExecutableInstruction for Fold { - fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> { - log::info!("fold {:?} is called with context {:?}", self, ctx); + fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { + log::info!( + "fold {} {} is called with contexts {:?} {:?}", + self.0, + self.1, + exec_ctx, + call_ctx + ); let iterable_name = &self.0; let iterator_name = &self.1; let instr_head = self.2.clone(); // check that value exists and has array type - match ctx.data.get(iterable_name) { + match exec_ctx.data.get(iterable_name) { Some(JValue::Array(_)) => {} Some(v) => { return Err(AquamarineError::IncompatibleJValueType( @@ -76,7 +83,7 @@ impl super::ExecutableInstruction for Fold { instr_head: instr_head.clone(), }; - if ctx + if exec_ctx .folds .insert(iterator_name.clone(), fold_state) .is_some() @@ -84,23 +91,28 @@ impl super::ExecutableInstruction for Fold { return Err(AquamarineError::MultipleFoldStates(iterable_name.clone())); } - instr_head.execute(ctx)?; - ctx.folds.remove(iterator_name); + instr_head.execute(exec_ctx, call_ctx)?; + exec_ctx.folds.remove(iterator_name); Ok(()) } } impl super::ExecutableInstruction for Next { - fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> { - log::info!("next {:?} is called with context {:?}", self, ctx); + fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { + log::info!( + "next {:?} is called with contexts {:?} {:?}", + self, + exec_ctx, + call_ctx + ); let iterator_name = &self.0; - let fold_state = ctx + let fold_state = exec_ctx .folds .get_mut(iterator_name) .ok_or_else(|| AquamarineError::FoldStateNotFound(iterator_name.clone()))?; - let value = ctx + let value = exec_ctx .data .get(&fold_state.iterable_name) .expect("this has been checked on the fold instruction"); @@ -117,10 +129,10 @@ impl super::ExecutableInstruction for Next { } let next_instr = fold_state.instr_head.clone(); - next_instr.execute(ctx)?; + next_instr.execute(exec_ctx, call_ctx)?; // get the same fold state again because of borrow checker - match ctx.folds.get_mut(iterator_name) { + match exec_ctx.folds.get_mut(iterator_name) { Some(fold_state) => fold_state.cursor -= 1, _ => unreachable!("iterator value shouldn't changed inside fold"), }; @@ -142,7 +154,7 @@ mod tests { #[test] fn lfold() { - let mut vm = create_aqua_vm(echo_number_call_service()); + let mut vm = create_aqua_vm(echo_number_call_service(), ""); let lfold = String::from( r#" @@ -169,7 +181,7 @@ mod tests { #[test] fn rfold() { - let mut vm = create_aqua_vm(echo_number_call_service()); + let mut vm = create_aqua_vm(echo_number_call_service(), ""); let rfold = String::from( r#" @@ -196,7 +208,7 @@ mod tests { #[test] fn inner_fold() { - let mut vm = create_aqua_vm(echo_number_call_service()); + let mut vm = create_aqua_vm(echo_number_call_service(), ""); let script = String::from( r#" @@ -231,7 +243,7 @@ mod tests { #[test] fn inner_fold_with_same_iterator() { - let mut vm = create_aqua_vm(echo_number_call_service()); + let mut vm = create_aqua_vm(echo_number_call_service(), ""); let script = String::from( r#" diff --git a/stepper/src/air/mod.rs b/stepper/src/air/mod.rs index 2627e437..597a9b56 100644 --- a/stepper/src/air/mod.rs +++ b/stepper/src/air/mod.rs @@ -15,17 +15,22 @@ */ mod call; +mod execution_context; mod fold; mod null; mod par; mod seq; mod xor; -use crate::AquaData; +pub(crate) use execution_context::ExecutionCtx; + +pub(self) use crate::call_evidence::CallEvidenceCtx; +pub(self) use crate::call_evidence::CallResult; +pub(self) use crate::call_evidence::EvidenceState; + use crate::Result; use call::Call; use fold::Fold; -use fold::FoldState; use fold::Next; use null::Null; use par::Par; @@ -34,26 +39,15 @@ use xor::Xor; use serde_derive::Deserialize; use serde_derive::Serialize; -use std::collections::HashMap; -#[derive(Clone, Default, Debug)] -pub(super) struct ExecutionContext { - pub data: AquaData, - pub next_peer_pks: Vec, - pub current_peer_id: String, - pub folds: HashMap, -} +use once_cell::sync::Lazy; +use std::collections::HashSet; -impl ExecutionContext { - pub(super) fn new(data: AquaData, current_peer_id: String) -> Self { - Self { - data, - next_peer_pks: vec![], - current_peer_id, - folds: HashMap::new(), - } - } -} +pub(self) static RESERVED_KEYWORDS: Lazy> = Lazy::new(|| { + let mut set = HashSet::new(); + set.insert("__call"); + set +}); #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] @@ -68,19 +62,19 @@ pub(crate) enum Instruction { } pub(crate) trait ExecutableInstruction { - fn execute(&self, ctx: &mut ExecutionContext) -> Result<()>; + fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()>; } impl ExecutableInstruction for Instruction { - fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> { + fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { match self { - Instruction::Null(null) => null.execute(ctx), - Instruction::Call(call) => call.execute(ctx), - Instruction::Fold(fold) => fold.execute(ctx), - Instruction::Next(next) => next.execute(ctx), - Instruction::Par(par) => par.execute(ctx), - Instruction::Seq(seq) => seq.execute(ctx), - Instruction::Xor(xor) => xor.execute(ctx), + Instruction::Null(null) => null.execute(exec_ctx, call_ctx), + Instruction::Call(call) => call.execute(exec_ctx, call_ctx), + Instruction::Fold(fold) => fold.execute(exec_ctx, call_ctx), + Instruction::Next(next) => next.execute(exec_ctx, call_ctx), + Instruction::Par(par) => par.execute(exec_ctx, call_ctx), + Instruction::Seq(seq) => seq.execute(exec_ctx, call_ctx), + Instruction::Xor(xor) => xor.execute(exec_ctx, call_ctx), } } } diff --git a/stepper/src/air/null.rs b/stepper/src/air/null.rs index a418efc6..50dd0f11 100644 --- a/stepper/src/air/null.rs +++ b/stepper/src/air/null.rs @@ -14,7 +14,8 @@ * limitations under the License. */ -use super::ExecutionContext; +use super::CallEvidenceCtx; +use super::ExecutionCtx; use crate::Result; use serde_derive::Deserialize; @@ -24,8 +25,12 @@ use serde_derive::Serialize; pub(crate) struct Null {} impl super::ExecutableInstruction for Null { - fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> { - log::info!("null is called with context: {:?}", ctx); + fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { + log::info!( + "null is called with contexts: {:?} {:?}", + exec_ctx, + call_ctx + ); Ok(()) } diff --git a/stepper/src/air/par.rs b/stepper/src/air/par.rs index 3a50790b..8919a37f 100644 --- a/stepper/src/air/par.rs +++ b/stepper/src/air/par.rs @@ -14,8 +14,12 @@ * limitations under the License. */ -use super::ExecutionContext; +use super::CallEvidenceCtx; +use super::EvidenceState; +use super::ExecutableInstruction; +use super::ExecutionCtx; use super::Instruction; +use crate::AquamarineError; use crate::Result; use serde_derive::Deserialize; @@ -24,28 +28,88 @@ use serde_derive::Serialize; #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub(crate) struct Par(Box, Box); -impl super::ExecutableInstruction for Par { - fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> { - log::info!("par is called with context: {:?}", ctx); +impl ExecutableInstruction for Par { + fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { + log::info!("par is called with context: {:?} {:?}", exec_ctx, call_ctx); - self.0.execute(ctx)?; - self.1.execute(ctx)?; + let (left_subtree_size, right_subtree_size) = extract_subtree_sizes(call_ctx)?; + + let pre_states_count = call_ctx.current_states.len(); + let pre_unused_elements = call_ctx.unused_subtree_elements_count; + + let pre_new_states_count = call_ctx.new_states.len(); + call_ctx.new_states.push(EvidenceState::Par(0, 0)); + + let new_left_subtree_size = + execute_subtree(&self.0, left_subtree_size, exec_ctx, call_ctx)?; + let new_right_subtree_size = + execute_subtree(&self.1, right_subtree_size, exec_ctx, call_ctx)?; + + let new_par_evidence_state = + EvidenceState::Par(new_left_subtree_size, new_right_subtree_size); + log::info!( + "call evidence: adding new state {:?}", + new_par_evidence_state + ); + call_ctx.new_states[pre_new_states_count] = new_par_evidence_state; + + let post_states_count = call_ctx.current_states.len(); + call_ctx.unused_subtree_elements_count = + pre_unused_elements - (pre_states_count - post_states_count); Ok(()) } } +fn extract_subtree_sizes(call_ctx: &mut CallEvidenceCtx) -> Result<(usize, usize)> { + if call_ctx.unused_subtree_elements_count == 0 { + return Ok((0, 0)); + } + + call_ctx.unused_subtree_elements_count -= 1; + + log::info!( + "call evidence: the previous state was found {:?}", + call_ctx.current_states[0] + ); + + // unwrap is safe here because of length's been checked + match call_ctx.current_states.pop_front().unwrap() { + EvidenceState::Par(left, right) => Ok((left, right)), + state => Err(AquamarineError::InvalidEvidenceState( + state, + String::from("par"), + )), + } +} + +fn execute_subtree( + subtree: &Instruction, + subtree_size: usize, + exec_ctx: &mut ExecutionCtx, + call_ctx: &mut CallEvidenceCtx, +) -> Result { + call_ctx.unused_subtree_elements_count = subtree_size; + let before_states_count = call_ctx.new_states.len(); + + // execute subtree + subtree.execute(exec_ctx, call_ctx)?; + + Ok(call_ctx.new_states.len() - before_states_count) +} + #[cfg(test)] mod tests { use aqua_test_utils::create_aqua_vm; use aqua_test_utils::unit_call_service; - use aquamarine_vm::StepperOutcome; use serde_json::json; #[test] - fn par() { - let mut vm = create_aqua_vm(unit_call_service()); + fn par_remote_remote() { + use std::collections::HashSet; + + let mut vm = create_aqua_vm(unit_call_service(), ""); let script = String::from( r#" @@ -55,19 +119,37 @@ mod tests { ))"#, ); + let mut res = vm + .call(json!([String::from("asd"), script, String::from("{}"),])) + .expect("call should be successful"); + + let peers_result: HashSet<_> = res.next_peer_pks.drain(..).collect(); + let peers_right: HashSet<_> = vec![ + String::from("remote_peer_id_1"), + String::from("remote_peer_id_2"), + ] + .drain(..) + .collect(); + + assert_eq!(peers_result, peers_right); + } + + #[test] + fn par_local_remote() { + let mut vm = create_aqua_vm(unit_call_service(), ""); + + let script = String::from( + r#" + (par ( + (call (%current_peer_id% ("local_service_id" "local_fn_name") () result_name)) + (call ("remote_peer_id_2" ("service_id" "fn_name") () g)) + ))"#, + ); + let res = vm .call(json!([String::from("asd"), script, String::from("{}"),])) .expect("call should be successful"); - assert_eq!( - res, - StepperOutcome { - data: String::from("{}"), - next_peer_pks: vec![ - String::from("remote_peer_id_1"), - String::from("remote_peer_id_2") - ] - } - ); + assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_2")]); } } diff --git a/stepper/src/air/seq.rs b/stepper/src/air/seq.rs index e95424ed..23517660 100644 --- a/stepper/src/air/seq.rs +++ b/stepper/src/air/seq.rs @@ -14,7 +14,8 @@ * limitations under the License. */ -use super::ExecutionContext; +use super::CallEvidenceCtx; +use super::ExecutionCtx; use super::Instruction; use crate::Result; @@ -25,14 +26,14 @@ use serde_derive::Serialize; pub(crate) struct Seq(Box, Box); impl super::ExecutableInstruction for Seq { - fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> { - log::info!("seq is called with context: {:?}", ctx); + fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { + log::info!("seq is called with contexts: {:?} {:?}", exec_ctx, call_ctx); - let pks_count_before_call = ctx.next_peer_pks.len(); - self.0.execute(ctx)?; + let pks_count_before_call = exec_ctx.next_peer_pks.len(); + self.0.execute(exec_ctx, call_ctx)?; - if pks_count_before_call == ctx.next_peer_pks.len() { - self.1.execute(ctx)?; + if pks_count_before_call == exec_ctx.next_peer_pks.len() { + self.1.execute(exec_ctx, call_ctx)?; } Ok(()) @@ -43,13 +44,12 @@ impl super::ExecutableInstruction for Seq { mod tests { use aqua_test_utils::create_aqua_vm; use aqua_test_utils::unit_call_service; - use aquamarine_vm::StepperOutcome; use serde_json::json; #[test] - fn par() { - let mut vm = create_aqua_vm(unit_call_service()); + fn seq_remote_remote() { + let mut vm = create_aqua_vm(unit_call_service(), ""); let script = String::from( r#" @@ -59,16 +59,43 @@ mod tests { ))"#, ); + let res = vm + .call(json!([String::from("asd"), script, String::from("{}")])) + .expect("call should be successful"); + + assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_1")]); + + let res = vm + .call(json!([ + String::from("asd"), + script, + json!({ + "__call": [{"call": "executed"}] + } + ) + .to_string(), + ])) + .expect("call should be successful"); + + assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_2")]); + } + + #[test] + fn seq_local_remote() { + let mut vm = create_aqua_vm(unit_call_service(), ""); + + let script = String::from( + r#" + (seq ( + (call (%current_peer_id% ("local_service_id" "local_fn_name") () result_name)) + (call ("remote_peer_id_2" ("service_id" "fn_name") () g)) + ))"#, + ); + let res = vm .call(json!([String::from("asd"), script, String::from("{}"),])) .expect("call should be successful"); - assert_eq!( - res, - StepperOutcome { - data: String::from("{}"), - next_peer_pks: vec![String::from("remote_peer_id_1")] - } - ); + assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_2")]); } } diff --git a/stepper/src/air/xor.rs b/stepper/src/air/xor.rs index ba635eca..29a56094 100644 --- a/stepper/src/air/xor.rs +++ b/stepper/src/air/xor.rs @@ -14,7 +14,8 @@ * limitations under the License. */ -use super::ExecutionContext; +use super::CallEvidenceCtx; +use super::ExecutionCtx; use super::Instruction; use crate::AquamarineError; use crate::Result; @@ -26,11 +27,11 @@ use serde_derive::Serialize; pub(crate) struct Xor(Box, Box); impl super::ExecutableInstruction for Xor { - fn execute(&self, ctx: &mut ExecutionContext) -> Result<()> { - log::info!("xor is called with context: {:?}", ctx); + fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { + log::info!("xor is called with contexts: {:?} {:?}", exec_ctx, call_ctx); - match self.0.execute(ctx) { - Err(AquamarineError::LocalServiceError(_)) => self.1.execute(ctx), + match self.0.execute(exec_ctx, call_ctx) { + Err(AquamarineError::LocalServiceError(_)) => self.1.execute(exec_ctx, call_ctx), res => res, } } @@ -72,7 +73,7 @@ mod tests { } }); - let mut vm = create_aqua_vm(call_service); + let mut vm = create_aqua_vm(call_service, ""); let script = String::from( r#" diff --git a/stepper/src/call_evidence/context.rs b/stepper/src/call_evidence/context.rs new file mode 100644 index 00000000..be4be92d --- /dev/null +++ b/stepper/src/call_evidence/context.rs @@ -0,0 +1,40 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::EvidenceState; + +use serde::Deserialize; +use serde::Serialize; + +use std::collections::VecDeque; + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub(crate) struct CallEvidenceCtx { + pub(crate) current_states: VecDeque, + pub(crate) unused_subtree_elements_count: usize, + pub(crate) new_states: Vec, +} + +impl CallEvidenceCtx { + pub fn new(current_states: VecDeque) -> Self { + let right = current_states.len(); + Self { + current_states, + unused_subtree_elements_count: right, + new_states: vec![], + } + } +} diff --git a/stepper/src/call_evidence/mod.rs b/stepper/src/call_evidence/mod.rs new file mode 100644 index 00000000..fb0038fb --- /dev/null +++ b/stepper/src/call_evidence/mod.rs @@ -0,0 +1,22 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod context; +mod state; + +pub(crate) use context::CallEvidenceCtx; +pub(crate) use state::CallResult; +pub(crate) use state::EvidenceState; diff --git a/stepper/src/call_evidence/state.rs b/stepper/src/call_evidence/state.rs new file mode 100644 index 00000000..0487d78e --- /dev/null +++ b/stepper/src/call_evidence/state.rs @@ -0,0 +1,38 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum CallResult { + /// Request was sent to a target node and it shouldn't be called again. + RequestSent, + + /// A corresponding call's been already executed. + Executed, + + /// call_service ended with a service error. + CallServiceFailed(String), +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum EvidenceState { + Par(usize, usize), + Call(CallResult), +} diff --git a/stepper/src/errors.rs b/stepper/src/errors.rs index 2054112b..14ab1ed4 100644 --- a/stepper/src/errors.rs +++ b/stepper/src/errors.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use crate::call_evidence::EvidenceState; use crate::CallServiceResult; use crate::JValue; use crate::StepperOutcome; @@ -31,14 +32,17 @@ pub(crate) enum AquamarineError { /// Errors occurred while parsing aqua script in the form of S expressions. SExprParseError(SExprError), - /// Errors occurred while parsing aqua data. - DataSerdeError(SerdeJsonError), + /// Errors occurred on aqua data deserialization. + DataDeserializationError(SerdeJsonError), + + /// Errors occurred on aqua data serialization. + DataSerializationError(SerdeJsonError), /// Errors occurred while parsing function arguments of an expression. - FuncArgsSerdeError(JValue, SerdeJsonError), + FuncArgsSerializationError(JValue, SerdeJsonError), /// Errors occurred while parsing returned by call_service value. - CallServiceSerdeError(CallServiceResult, SerdeJsonError), + CallServiceResultDeserializationError(CallServiceResult, SerdeJsonError), /// Indicates that environment variable with name CURRENT_PEER_ID isn't set. CurrentPeerIdEnvError(VarError, String), @@ -69,6 +73,18 @@ pub(crate) enum AquamarineError { /// Multiple fold states found for such iterator name. MultipleFoldStates(String), + + /// Expected evidence state of different type. + InvalidEvidenceState(EvidenceState, String), + + /// Errors occurred on call evidence deserialization. + CallEvidenceDeserializationError(SerdeJsonError), + + /// Errors occurred on call evidence serialization. + CallEvidenceSerializationError(SerdeJsonError), + + /// Errors occurred when reserved keyword is used for variable name. + ReservedKeywordError(String), } impl Error for AquamarineError {} @@ -79,17 +95,22 @@ impl std::fmt::Display for AquamarineError { AquamarineError::SExprParseError(err) => { write!(f, "aqua script can't be parsed: {:?}", err) } - AquamarineError::DataSerdeError(err) => write!( + AquamarineError::DataDeserializationError(err) => write!( f, - "an error occurred while serializing/deserializing data: {:?}", + "an error occurred while data deserialization: {:?}", err ), - AquamarineError::FuncArgsSerdeError(args, err) => write!( + AquamarineError::DataSerializationError(err) => write!( + f, + "an error occurred while data serialization: {:?}", + err + ), + AquamarineError::FuncArgsSerializationError(args, err) => write!( f, "function arguments {} can't be serialized or deserialized with an error: {:?}", args, err ), - AquamarineError::CallServiceSerdeError(result, err) => write!( + AquamarineError::CallServiceResultDeserializationError(result, err) => write!( f, "call_service result \"{:?}\" can't be serialized or deserialized with an error: {:?}", result, err @@ -138,6 +159,26 @@ impl std::fmt::Display for AquamarineError { "multiple fold states found for iterable {}", iterator ), + AquamarineError::InvalidEvidenceState(found_state, expected) => write!( + f, + "invalid evidence state: expected {}, but found {:?}", + expected, found_state + ), + AquamarineError::CallEvidenceDeserializationError(err) => write!( + f, + "an error occurred while data deserialization: {:?}", + err + ), + AquamarineError::CallEvidenceSerializationError(err) => write!( + f, + "an error occurred while data serialization: {:?}", + err + ), + AquamarineError::ReservedKeywordError(variable_name) => write!( + f, + "a variable can't be named as {} because this name is reserved", + variable_name + ), } } } @@ -158,19 +199,24 @@ impl Into for AquamarineError { fn into(self) -> StepperOutcome { let ret_code = match self { AquamarineError::SExprParseError(_) => 1, - AquamarineError::DataSerdeError(..) => 2, - AquamarineError::FuncArgsSerdeError(..) => 3, - AquamarineError::CallServiceSerdeError(..) => 4, - AquamarineError::CurrentPeerIdEnvError(..) => 5, - AquamarineError::InstructionError(..) => 6, - AquamarineError::LocalServiceError(..) => 7, - AquamarineError::VariableNotFound(..) => 8, - AquamarineError::MultipleVariablesFound(..) => 9, - AquamarineError::VariableNotInJsonPath(..) => 10, - AquamarineError::IncompatibleJValueType(..) => 11, - AquamarineError::MultipleValuesInJsonPath(..) => 12, - AquamarineError::FoldStateNotFound(..) => 13, - AquamarineError::MultipleFoldStates(..) => 14, + AquamarineError::DataDeserializationError(..) => 2, + AquamarineError::DataSerializationError(..) => 3, + AquamarineError::FuncArgsSerializationError(..) => 4, + AquamarineError::CallServiceResultDeserializationError(..) => 5, + AquamarineError::CurrentPeerIdEnvError(..) => 6, + AquamarineError::InstructionError(..) => 7, + AquamarineError::LocalServiceError(..) => 8, + AquamarineError::VariableNotFound(..) => 9, + AquamarineError::MultipleVariablesFound(..) => 10, + AquamarineError::VariableNotInJsonPath(..) => 11, + AquamarineError::IncompatibleJValueType(..) => 12, + AquamarineError::MultipleValuesInJsonPath(..) => 13, + AquamarineError::FoldStateNotFound(..) => 14, + AquamarineError::MultipleFoldStates(..) => 15, + AquamarineError::InvalidEvidenceState(..) => 16, + AquamarineError::CallEvidenceDeserializationError(..) => 17, + AquamarineError::CallEvidenceSerializationError(..) => 18, + AquamarineError::ReservedKeywordError(..) => 19, }; StepperOutcome { diff --git a/stepper/src/execution.rs b/stepper/src/execution.rs index b41129d5..b8cc795e 100644 --- a/stepper/src/execution.rs +++ b/stepper/src/execution.rs @@ -16,13 +16,18 @@ use super::StepperOutcome; use crate::air::ExecutableInstruction; -use crate::air::ExecutionContext; +use crate::air::ExecutionCtx; use crate::air::Instruction; +use crate::call_evidence::EvidenceState; use crate::get_current_peer_id; use crate::AquaData; use crate::AquamarineError; use crate::Result; +use crate::call_evidence::CallEvidenceCtx; + +use std::collections::VecDeque; + pub(crate) fn execute_aqua(init_user_id: String, aqua: String, data: String) -> StepperOutcome { log::info!( "stepper invoked with user_id = {}, aqua = {:?}, data = {:?}", @@ -35,8 +40,8 @@ pub(crate) fn execute_aqua(init_user_id: String, aqua: String, data: String) -> } fn execute_aqua_impl(_init_user_id: String, aqua: String, data: String) -> Result { - let parsed_data: AquaData = - serde_json::from_str(&data).map_err(AquamarineError::DataSerdeError)?; + let mut parsed_data: AquaData = + serde_json::from_str(&data).map_err(AquamarineError::DataDeserializationError)?; let formatted_aqua = format_aqua(aqua); let parsed_aqua = serde_sexpr::from_str::(&formatted_aqua)?; @@ -49,16 +54,31 @@ fn execute_aqua_impl(_init_user_id: String, aqua: String, data: String) -> Resul let current_peer_id = get_current_peer_id() .map_err(|e| AquamarineError::CurrentPeerIdEnvError(e, String::from("CURRENT_PEER_ID")))?; - let mut execution_ctx = ExecutionContext::new(parsed_data, current_peer_id); - parsed_aqua.execute(&mut execution_ctx)?; + let call_evidence_ctx_key: &str = "__call"; + let current_states: VecDeque = match parsed_data.remove(call_evidence_ctx_key) { + Some(jvalue) => serde_json::from_value(jvalue) + .map_err(AquamarineError::CallEvidenceDeserializationError)?, + None => VecDeque::new(), + }; - let data = - serde_json::to_string(&execution_ctx.data).map_err(AquamarineError::DataSerdeError)?; + let mut execution_ctx = ExecutionCtx::new(parsed_data, current_peer_id); + let mut call_evidence_ctx = CallEvidenceCtx::new(current_states); + + parsed_aqua.execute(&mut execution_ctx, &mut call_evidence_ctx)?; + + let serialized_call_ctx = serde_json::to_value(call_evidence_ctx.new_states) + .map_err(AquamarineError::CallEvidenceSerializationError)?; + execution_ctx + .data + .insert(call_evidence_ctx_key.to_string(), serialized_call_ctx); + + let data = serde_json::to_string(&execution_ctx.data) + .map_err(AquamarineError::DataSerializationError)?; Ok(StepperOutcome { ret_code: 0, data, - next_peer_pks: execution_ctx.next_peer_pks, + next_peer_pks: dedup(execution_ctx.next_peer_pks), }) } @@ -92,6 +112,15 @@ fn format_aqua(aqua: String) -> String { String::from_iter(formatted_aqua.into_iter()) } +use std::hash::Hash; + +fn dedup(mut vec: Vec) -> Vec { + use std::collections::HashSet; + + let set: HashSet<_> = vec.drain(..).collect(); // dedup + set.into_iter().collect() +} + #[cfg(test)] mod tests { #[test] diff --git a/stepper/src/fce.rs b/stepper/src/fce.rs index be24a63b..f2438bfa 100644 --- a/stepper/src/fce.rs +++ b/stepper/src/fce.rs @@ -26,6 +26,7 @@ )] mod air; +mod call_evidence; mod defines; mod errors; mod execution; diff --git a/stepper/src/wasm_bindgen.rs b/stepper/src/wasm_bindgen.rs index a6edb901..7d533b8f 100644 --- a/stepper/src/wasm_bindgen.rs +++ b/stepper/src/wasm_bindgen.rs @@ -15,6 +15,7 @@ */ mod air; +mod call_evidence; mod defines; mod errors; mod execution; diff --git a/stepper/tests/basic.rs b/stepper/tests/air_basic.rs similarity index 81% rename from stepper/tests/basic.rs rename to stepper/tests/air_basic.rs index b49248a6..123d8134 100644 --- a/stepper/tests/basic.rs +++ b/stepper/tests/air_basic.rs @@ -19,7 +19,6 @@ use aqua_test_utils::unit_call_service; use aquamarine_vm::vec1::Vec1; use aquamarine_vm::HostExportedFunc; use aquamarine_vm::IValue; -use aquamarine_vm::StepperOutcome; use serde_json::json; @@ -27,7 +26,7 @@ type JValue = serde_json::Value; #[test] fn seq_par_call() { - let mut vm = create_aqua_vm(unit_call_service()); + let mut vm = create_aqua_vm(unit_call_service(), ""); let script = String::from( r#" @@ -43,17 +42,26 @@ fn seq_par_call() { let res = vm .call(json!([String::from("asd"), script, String::from("{}"),])) .expect("should be successful"); - let right_outcome = StepperOutcome { - data: String::from("{\"result_1\":\"test\"}"), - next_peer_pks: vec![String::from("remote_peer_id")], - }; - assert_eq!(res, right_outcome); + let resulted_json: JValue = + serde_json::from_str(&res.data).expect("stepper should return valid json"); + + let right_json = json!( { + "result_1" : "test", + "__call": [ + { "par": [1,1] }, + { "call": "executed" }, + { "call": "request_sent" }, + ] + }); + + assert_eq!(resulted_json, right_json); + assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id")]); } #[test] fn par_par_call() { - let mut vm = create_aqua_vm(unit_call_service()); + let mut vm = create_aqua_vm(unit_call_service(), ""); let script = String::from( r#" @@ -73,7 +81,17 @@ fn par_par_call() { let resulted_json: JValue = serde_json::from_str(&res.data).expect("stepper should return valid json"); - let right_json = json!( {"result_1" : "test", "result_2" : "test"} ); + let right_json = json!( { + "result_1" : "test", + "result_2" : "test", + "__call": [ + { "par": [3,1] }, + { "par": [1,1] }, + { "call": "executed" }, + { "call": "request_sent" }, + { "call": "executed" }, + ] + }); assert_eq!(resulted_json, right_json); assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id")]); @@ -136,7 +154,7 @@ fn create_service() { )) }); - let mut vm = create_aqua_vm(call_service); + let mut vm = create_aqua_vm(call_service, ""); let res = vm .call(json!([String::from("init_user_pk"), script, data,])) @@ -156,6 +174,10 @@ fn create_service() { String::from("service_id"), JValue::String(String::from("create response")), ); + data_value.as_object_mut().unwrap().insert( + String::from("__call"), + json!([{"call": "executed"}, {"call": "executed"}, {"call": "executed"}, {"call": "request_sent"}]), + ); assert_eq!(resulted_data, data_value); assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id")]); diff --git a/stepper/tests/call_evidence_basic.rs b/stepper/tests/call_evidence_basic.rs new file mode 100644 index 00000000..8f2cc29a --- /dev/null +++ b/stepper/tests/call_evidence_basic.rs @@ -0,0 +1,518 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use aqua_test_utils::create_aqua_vm; +use aqua_test_utils::echo_number_call_service; +use aqua_test_utils::unit_call_service; +use aquamarine_vm::vec1::Vec1; +use aquamarine_vm::HostExportedFunc; +use aquamarine_vm::IValue; + +use serde_json::json; + +type JValue = serde_json::Value; + +#[test] +fn evidence_seq_par_call() { + env_logger::init(); + + let mut vm = create_aqua_vm(unit_call_service(), ""); + + let script = String::from( + r#" + (seq ( + (par ( + (call (%current_peer_id% ("local_service_id" "local_fn_name") () result_1)) + (call ("remote_peer_id" ("service_id" "fn_name") () g)) + )) + (call (%current_peer_id% ("local_service_id" "local_fn_name") () result_2)) + ))"#, + ); + + let res = vm + .call(json!([ + String::from("asd"), + script, + json!({ + "__call": [ + { "par": [1,1] }, + { "call": "executed" }, + { "call": "executed" }, + ] + }) + .to_string(), + ])) + .expect("should be successful"); + + let resulted_json: JValue = + serde_json::from_str(&res.data).expect("stepper should return valid json"); + + let right_json = json!( { + "result_2": "test", + "__call": [ + { "par": [1,1] }, + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + ] + }); + + assert_eq!(resulted_json, right_json); + assert!(res.next_peer_pks.is_empty()); +} + +#[test] +fn evidence_par_par_call() { + let mut vm = create_aqua_vm(unit_call_service(), "some_peer_id"); + + let script = String::from( + r#" + (par ( + (par ( + (call ("some_peer_id" ("local_service_id" "local_fn_name") () result_1)) + (call ("remote_peer_id" ("service_id" "fn_name") () g)) + )) + (call (%current_peer_id% ("local_service_id" "local_fn_name") () result_2)) + ))"#, + ); + + let res = vm + .call(json!([ + String::from("asd"), + script, + json!({ + "__call": [ + { "par": [3,0] }, + { "par": [1,1] }, + { "call": "request_sent" }, + { "call": "executed" }, + ] + }) + .to_string(), + ])) + .expect("should be successful"); + + let resulted_json: JValue = + serde_json::from_str(&res.data).expect("stepper should return valid json"); + + let right_json = json!( { + "result_1" : "test", + "result_2" : "test", + "__call": [ + { "par": [3,1] }, + { "par": [1,1] }, + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + ] + }); + + assert_eq!(resulted_json, right_json); + assert!(res.next_peer_pks.is_empty()); +} + +#[test] +fn evidence_seq_seq() { + let peer_id_1 = String::from("12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er"); + let peer_id_2 = String::from("12D3KooWAzJcYitiZrerycVB4Wryrx22CFKdDGx7c4u31PFdfTbR"); + let mut vm1 = create_aqua_vm(unit_call_service(), peer_id_1.clone()); + let mut vm2 = create_aqua_vm(unit_call_service(), peer_id_2.clone()); + + let script = format!( + r#" + (seq ( + (call ("{}" ("identity" "") () void0)) + (seq ( + (call ("{}" ("add_blueprint" "") () blueprint_id)) + (call ("{}" ("addBlueprint-14d8488e-d10d-474d-96b2-878f6a7d74c8" "") () void1)) + )) + )) + "#, + peer_id_1, peer_id_1, peer_id_2 + ); + + let res1 = vm2 + .call(json!([String::from("asd"), script, String::from("{}")])) + .expect("should be successful"); + + assert_eq!(res1.next_peer_pks, vec![peer_id_1.clone()]); + + let res2 = vm1 + .call(json!([String::from("asd"), script, res1.data])) + .expect("should be successful"); + + assert_eq!(res2.next_peer_pks, vec![peer_id_2.clone()]); + + let res3 = vm2 + .call(json!([String::from("asd"), script, res2.data])) + .expect("should be successful"); + + let resulted_json: JValue = + serde_json::from_str(&res3.data).expect("stepper should return valid json"); + + let right_json = json!( { + "void0": "test", + "void1": "test", + "blueprint_id": "test", + "__call": [ + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + ] + }); + + assert_eq!(resulted_json, right_json); + assert!(res3.next_peer_pks.is_empty()); +} + +#[test] +fn evidence_create_service() { + let module = "greeting"; + let config = json!( + { + "name": module, + "mem_pages_count": 100, + "logger_enabled": true, + "wasi": { + "envs": json!({}), + "preopened_files": vec!["/tmp"], + "mapped_dirs": json!({}), + } + } + ); + let mut data_value = json!({ + "module_bytes": vec![1,2], + "module_config": config, + "blueprint": { "name": "blueprint", "dependencies": [module] }, + "__call": [ + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + ] + }); + let data = data_value.to_string(); + + let script = String::from( + r#" + (seq ( + (call (%current_peer_id% ("add_module" "") (module_bytes module_config) module)) + (seq ( + (call (%current_peer_id% ("add_blueprint" "") (blueprint) blueprint_id)) + (seq ( + (call (%current_peer_id% ("create" "") (blueprint_id) service_id)) + (call ("remote_peer_id" ("" "") (service_id) client_result)) + )) + )) + ))"#, + ); + + let call_service: HostExportedFunc = Box::new(|_, args| -> Option { + let builtin_service = match &args[0] { + IValue::String(str) => str, + _ => unreachable!(), + }; + + let response = match builtin_service.as_str() { + "add_module" => String::from("add_module response"), + "add_blueprint" => String::from("add_blueprint response"), + "create" => String::from("create response"), + _ => String::from("unknown response"), + }; + + Some(IValue::Record( + Vec1::new(vec![ + IValue::S32(0), + IValue::String(format!("\"{}\"", response)), + ]) + .unwrap(), + )) + }); + + let mut vm = create_aqua_vm(call_service, ""); + + let res = vm + .call(json!([String::from("init_user_pk"), script, data,])) + .expect("should be successful"); + + let resulted_data: JValue = serde_json::from_str(&res.data).expect("should be correct json"); + + data_value.as_object_mut().unwrap().insert( + String::from("__call"), + json!([{"call": "executed"}, {"call": "executed"}, {"call": "executed"}, {"call": "executed"}]), + ); + + assert_eq!(resulted_data, data_value); + assert!(res.next_peer_pks.is_empty()); +} + +#[test] +fn evidence_par_seq_fold_call() { + let return_numbers_call_service: HostExportedFunc = Box::new(|_, args| -> Option { + Some(IValue::Record( + Vec1::new(vec![ + IValue::S32(0), + IValue::String(String::from( + "[\"1\", \"2\", \"3\", \"4\", \"5\", \"6\", \"7\", \"8\", \"9\", \"10\"]", + )), + ]) + .unwrap(), + )) + }); + + let mut vm1 = create_aqua_vm(return_numbers_call_service, "some_peer_id_1"); + let mut vm2 = create_aqua_vm(echo_number_call_service(), "some_peer_id_2"); + let mut vm3 = create_aqua_vm(unit_call_service(), "some_peer_id_3"); + + let script = String::from( + r#" + (par ( + (seq ( + (call ("some_peer_id_1" ("local_service_id" "local_fn_name") () IterableResultPeer1)) + (fold (IterableResultPeer1 i + (par ( + (call ("some_peer_id_2" ("local_service_id" "local_fn_name") (i) acc[])) + (next i) + )) + )) + )) + (call ("some_peer_id_3" ("local_service_id" "local_fn_name") () result_2)) + ))"#, + ); + + let res1 = vm2 + .call(json!([ + String::from("asd"), + script, + json!({ + "__call": [] + }) + .to_string(), + ])) + .expect("should be successful"); + + let res2 = vm1 + .call(json!([String::from("asd"), script, res1.data])) + .expect("should be successful"); + + let mut data = res2.data; + + for _ in 0..100 { + let res3 = vm2 + .call(json!([String::from("asd"), script, data])) + .expect("should be successful"); + + data = res3.data; + } + + let res4 = vm3 + .call(json!([String::from("asd"), script, data])) + .expect("should be successful"); + + let resulted_json: JValue = + serde_json::from_str(&res4.data).expect("stepper should return valid json"); + + let right_json = json!( { + "result_2": "test", + "IterableResultPeer1": ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"], + "acc": [1,2,3,4,5,6,7,8,9,10], + "__call": [ + { "par": [21,1] }, + { "call": "executed" }, + { "par": [1,18] }, + { "call": "executed" }, + { "par": [1,16] }, + { "call": "executed" }, + { "par": [1,14] }, + { "call": "executed" }, + { "par": [1,12] }, + { "call": "executed" }, + { "par": [1,10] }, + { "call": "executed" }, + { "par": [1,8] }, + { "call": "executed" }, + { "par": [1,6] }, + { "call": "executed" }, + { "par": [1,4] }, + { "call": "executed" }, + { "par": [1,2] }, + { "call": "executed" }, + { "par": [1,0] }, + { "call": "executed" }, + { "call": "executed" }, + ] + }); + + assert_eq!(resulted_json, right_json); + assert!(res4.next_peer_pks.is_empty()); +} + +#[test] +fn evidence_par_seq_fold_in_cycle_call() { + let return_numbers_call_service: HostExportedFunc = Box::new(|_, args| -> Option { + Some(IValue::Record( + Vec1::new(vec![ + IValue::S32(0), + IValue::String(String::from( + "[\"1\", \"2\", \"3\", \"4\", \"5\", \"6\", \"7\", \"8\", \"9\", \"10\"]", + )), + ]) + .unwrap(), + )) + }); + + let mut vm1 = create_aqua_vm(return_numbers_call_service, "some_peer_id_1"); + let mut vm2 = create_aqua_vm(echo_number_call_service(), "some_peer_id_2"); + let mut vm3 = create_aqua_vm(unit_call_service(), "some_peer_id_3"); + + let script = String::from( + r#" + (par ( + (seq ( + (call ("some_peer_id_1" ("local_service_id" "local_fn_name") () IterableResultPeer1)) + (fold (IterableResultPeer1 i + (par ( + (call ("some_peer_id_2" ("local_service_id" "local_fn_name") (i) acc[])) + (next i) + )) + )) + )) + (call ("some_peer_id_3" ("local_service_id" "local_fn_name") () result_2)) + ))"#, + ); + + let mut data = String::from("{}"); + + for _ in 0..100 { + let res1 = vm1 + .call(json!([String::from("asd"), script, data])) + .expect("should be successful"); + + data = res1.data; + + let res2 = vm2 + .call(json!([String::from("asd"), script, data])) + .expect("should be successful"); + + data = res2.data; + + let res3 = vm3 + .call(json!([String::from("asd"), script, data])) + .expect("should be successful"); + + data = res3.data; + } + + let resulted_json: JValue = + serde_json::from_str(&data).expect("stepper should return valid json"); + + let right_json = json!( { + "result_2": "test", + "IterableResultPeer1": ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"], + "acc": [1,2,3,4,5,6,7,8,9,10], + "__call": [ + { "par": [21,1] }, + { "call": "executed" }, + { "par": [1,18] }, + { "call": "executed" }, + { "par": [1,16] }, + { "call": "executed" }, + { "par": [1,14] }, + { "call": "executed" }, + { "par": [1,12] }, + { "call": "executed" }, + { "par": [1,10] }, + { "call": "executed" }, + { "par": [1,8] }, + { "call": "executed" }, + { "par": [1,6] }, + { "call": "executed" }, + { "par": [1,4] }, + { "call": "executed" }, + { "par": [1,2] }, + { "call": "executed" }, + { "par": [1,0] }, + { "call": "executed" }, + { "call": "executed" }, + ] + }); + + assert_eq!(resulted_json, right_json); +} + +#[test] +fn evidence_seq_par_seq_seq() { + let peer_id_1 = String::from("12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er"); + let peer_id_2 = String::from("12D3KooWAzJcYitiZrerycVB4Wryrx22CFKdDGx7c4u31PFdfTbR"); + let mut vm1 = create_aqua_vm(unit_call_service(), peer_id_1.clone()); + let mut vm2 = create_aqua_vm(unit_call_service(), peer_id_2.clone()); + let script = format!( + r#" + (seq ( + (par ( + (seq ( + (call ("{}" ("" "") () result_1)) + (call ("{}" ("" "") () result_2)) + )) + (seq ( + (call ("{}" ("" "") () result_3)) + (call ("{}" ("" "") () result_4)) + )) + )) + (call ("{}" ("" "") () result_5)) + )) + "#, + peer_id_1, peer_id_2, peer_id_2, peer_id_1, peer_id_2 + ); + + let res1 = vm2 + .call(json!([String::from("asd"), script, String::from("{}")])) + .expect("should be successful"); + + assert_eq!(res1.next_peer_pks, vec![peer_id_1.clone()]); + + let res2 = vm1 + .call(json!([String::from("asd"), script, res1.data])) + .expect("should be successful"); + + assert_eq!(res2.next_peer_pks, vec![peer_id_2.clone()]); + + let res3 = vm2 + .call(json!([String::from("asd"), script, res2.data])) + .expect("should be successful"); + + let resulted_json: JValue = + serde_json::from_str(&res3.data).expect("stepper should return valid json"); + + let right_json = json!( { + "result_1": "test", + "result_2": "test", + "result_3": "test", + "result_4": "test", + "result_5": "test", + "__call": [ + { "par": [2,2] }, + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + { "call": "executed" }, + ] + }); + + assert_eq!(resulted_json, right_json); + assert!(res3.next_peer_pks.is_empty()); +}