From d5527b75a41dfbc1aece4f9a61e0fe07f248a2d2 Mon Sep 17 00:00:00 2001 From: vms Date: Wed, 30 Sep 2020 20:23:32 +0300 Subject: [PATCH] the first prototype of request-response stepper --- Cargo.lock | 167 +++++++++++++++++++++++++++++++++ Cargo.toml | 4 + src/air.rs | 11 ++- src/defines.rs | 34 +++++++ src/errors.rs | 18 +++- src/fce.rs | 12 +-- src/instructions/call.rs | 107 ++++++++++++++++++--- src/instructions/mod.rs | 15 +-- src/instructions/null.rs | 31 ++++++ src/stepper/execution.rs | 4 +- src/stepper/mod.rs | 1 + src/stepper/stepper.rs | 10 +- src/stepper/stepper_outcome.rs | 4 +- src/wasm_bindgen.rs | 13 ++- 14 files changed, 389 insertions(+), 42 deletions(-) create mode 100644 src/defines.rs create mode 100644 src/instructions/null.rs diff --git a/Cargo.lock b/Cargo.lock index 5c1fe17f..88882086 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,10 +1,20 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "aho-corasick" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" +dependencies = [ + "memchr", +] + [[package]] name = "aquamarine" version = "0.1.0" dependencies = [ "fluence", + "jsonpath_lib", "log", "serde", "serde_derive", @@ -13,6 +23,29 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "array_tool" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f8cb5d814eb646a863c4f24978cff2880c4be96ad8cde2c0f0678732902e271" + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + [[package]] name = "bumpalo" version = "3.4.0" @@ -25,6 +58,19 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "fluence" version = "0.2.7" @@ -76,12 +122,59 @@ dependencies = [ "wasi", ] +[[package]] +name = "hashbrown" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" + +[[package]] +name = "hermit-abi" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c30f6d0bc6b00693347368a67d41b58f2fb851215ff1da49e90fe2c5c667151" +dependencies = [ + "libc", +] + +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + +[[package]] +name = "indexmap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e2e4c765aa53a0424761bf9f41aa7a6ac1efa87238f59560640e27fca028f2" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "itoa" version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" +[[package]] +name = "jsonpath_lib" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8727f6987896c010ec9add275f59de2ae418b672fafa77bc3673b4cee1f09ca" +dependencies = [ + "array_tool", + "env_logger", + "log", + "serde", + "serde_json", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -134,6 +227,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quote" version = "1.0.7" @@ -184,6 +283,24 @@ dependencies = [ "rand_core", ] +[[package]] +name = "regex" +version = "1.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", + "thread_local", +] + +[[package]] +name = "regex-syntax" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" + [[package]] name = "ryu" version = "1.0.5" @@ -216,6 +333,7 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "164eacbdb13512ec2745fb09d51fd5b22b0d65ed294a1dcf7285a360c80a675c" dependencies = [ + "indexmap", "itoa", "ryu", "serde", @@ -242,6 +360,24 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "termcolor" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + [[package]] name = "unicode-xid" version = "0.2.1" @@ -322,3 +458,34 @@ name = "wasm-bindgen-shared" version = "0.2.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml index 450c0852..4054a328 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,9 +19,13 @@ path = "test_module/main.rs" [dependencies] fluence = { git = "https://github.com/fluencelabs/rust-sdk", features = ["logger"] } + serde = { version = "1.0.116", features = ["derive"] } serde_derive = "1.0.116" serde_sexpr = "0.1.0" + +jsonpath_lib = "0.2.5" + log = "0.4.11" serde_json = "1.0" wasm-bindgen = "0.2.68" diff --git a/src/air.rs b/src/air.rs index 6fb92ac7..1975460d 100644 --- a/src/air.rs +++ b/src/air.rs @@ -63,6 +63,15 @@ a -> B (call fn [x.0]) ) +data - HashMap +x.h.a.v -> (x - key in data, h.a.v - json path) + +0 +1 + +(call 1 (counter inc) [x.a] b) +(call 0 (response inc) [b] _) + -- any (fastest) -- does not work (seq (seq @@ -71,7 +80,7 @@ a -> B y <- Q ) (xor - (call fn [x] z) + (call fn [x.h.a.v, x.a.b.n] z) (call fn [y] z) ) ) diff --git a/src/defines.rs b/src/defines.rs new file mode 100644 index 00000000..b02e2146 --- /dev/null +++ b/src/defines.rs @@ -0,0 +1,34 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// This file contains defines similar for both FCE and browser targets. + +pub(crate) type Result = std::result::Result; +pub(crate) type AquaData = std::collections::HashMap; +pub(crate) use crate::errors::AquamarineError; +pub(crate) use crate::stepper::StepperOutcome; + +pub(crate) const CALL_SERVICE_SUCCESS: i32 = 0; + +use serde_derive::Deserialize; +use serde_derive::Serialize; + +#[fluence::fce] +#[derive(Serialize, Deserialize)] +pub struct CallServiceResult { + pub ret_code: i32, + pub result: String, +} diff --git a/src/errors.rs b/src/errors.rs index 40bc7d56..949579a2 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -20,6 +20,7 @@ use serde_json::Error as SerdeJsonError; use serde_sexpr::Error as SExprError; use std::convert::Into; +use std::env::VarError; use std::error::Error; #[derive(Debug)] @@ -30,6 +31,15 @@ pub enum AquamarineError { /// Errors occurred while parsing supplied data. DataParseError(SerdeJsonError), + /// Indicates that environment variable with name CURRENT_PEER_ID isn't set. + CurrentPeerIdNotSet(VarError), + + /// Semantic errors in instructions. + InstructionError(String), + + /// Semantic errors in instructions. + LocalServiceError(String), + /// Aquamarine result deserialization errors. ExecutionError(String), } @@ -41,6 +51,9 @@ impl std::fmt::Display for AquamarineError { match self { AquamarineError::SExprParseError(err) => write!(f, "{}", err), AquamarineError::DataParseError(err) => write!(f, "{}", err), + AquamarineError::CurrentPeerIdNotSet(err) => write!(f, "{}", err), + AquamarineError::InstructionError(err_msg) => write!(f, "{}", err_msg), + AquamarineError::LocalServiceError(err_msg) => write!(f, "{}", err_msg), AquamarineError::ExecutionError(err_msg) => write!(f, "{}", err_msg), } } @@ -69,7 +82,10 @@ impl Into for AquamarineError { let ret_code = match self { AquamarineError::SExprParseError(_) => 1, AquamarineError::DataParseError(_) => 2, - AquamarineError::ExecutionError(_) => 3, + AquamarineError::CurrentPeerIdNotSet(_) => 3, + AquamarineError::InstructionError(_) => 4, + AquamarineError::LocalServiceError(_) => 5, + AquamarineError::ExecutionError(_) => 6, }; StepperOutcome { diff --git a/src/fce.rs b/src/fce.rs index 7f7a4b27..eb251793 100644 --- a/src/fce.rs +++ b/src/fce.rs @@ -15,14 +15,12 @@ */ mod air; +mod defines; mod errors; mod instructions; mod stepper; -pub(crate) type Result = std::result::Result; -pub(crate) type AquaData = serde_json::Value; -pub(crate) use crate::stepper::StepperOutcome; -pub(crate) use errors::AquamarineError; +pub(crate) use crate::defines::*; use crate::stepper::execute_aqua; use fluence::fce; @@ -36,12 +34,6 @@ pub fn invoke(init_user_id: String, aqua: String, data: String) -> StepperOutcom execute_aqua(init_user_id, aqua, data) } -#[fce] -pub struct CallServiceResult { - pub ret_code: i32, - pub result: String, -} - #[fce] #[link(wasm_import_module = "aqua_test_module")] extern "C" { diff --git a/src/instructions/call.rs b/src/instructions/call.rs index 306c8e3e..01ab86c2 100644 --- a/src/instructions/call.rs +++ b/src/instructions/call.rs @@ -15,29 +15,110 @@ */ use crate::AquaData; +use crate::AquamarineError; use crate::Result; use serde_derive::Deserialize; use serde_derive::Serialize; +const CURRENT_PEER_ALIAS: &str = "%current%"; +const CURRENT_PEER_ID_ENV_NAME: &str = "CURRENT_PEER_ID"; + +/* + (current) + (pk $pk) + (pk $pk $srv_id) + PEER_PART: resolves to (peer_pk) \/ (peer_pk, pk_srv_id) + + (fn $name) + (fn $name $srv_id) + FN_PART: resolves to (fn_name) \/ (fn_srv_id, fn_name) +*/ + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] -pub(crate) struct Call { - pub peer_part: (String, Option), - pub fn_part: (Option, String), - pub args: String, - pub result_name: String, +#[serde(untagged)] +pub enum PeerPart { + PeerPk(String), + PeerPkWithPkServiceId(String, String), } -impl super::ExecutableInstruction for Call { - fn execute(self, _data: &mut AquaData) -> Result<()> { - let service_id = match (self.peer_part.1, self.fn_part.0) { - (Some(service_id), None) => service_id, - (None, Some(service_id)) => service_id, - _ => unimplemented!(), - }; +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +#[serde(untagged)] +pub enum FunctionPart { + FuncName(String), + ServiceIdWithFuncName(String, String), +} - let _result = unsafe { crate::call_service(service_id, self.fn_part.1, self.args) }; +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct Call(PeerPart, FunctionPart, Vec, String); + +impl super::ExecutableInstruction for Call { + fn execute(self, data: &mut AquaData, next_peer_pks: &mut Vec) -> Result<()> { + let (peer_pk, service_id, func_name) = parse_peer_fn_parts(self.0, self.1)?; + let function_args = parse_args(self.2, data)?; + let function_args = serde_json::to_string(&function_args)?; + let result_name = parse_result_name(self.3)?; + + let current_peer_id = std::env::var(CURRENT_PEER_ID_ENV_NAME) + .map_err(|e| AquamarineError::CurrentPeerIdNotSet(e))?; + + if peer_pk == current_peer_id || peer_pk == CURRENT_PEER_ALIAS { + let result = unsafe { crate::call_service(service_id, func_name, function_args) }; + if result.ret_code != crate::CALL_SERVICE_SUCCESS { + return Err(AquamarineError::LocalServiceError(result.result)); + } + + let result: serde_json::Value = serde_json::from_str(&result.result)?; + data.insert(result_name, result); + } else { + next_peer_pks.push(peer_pk); + } Ok(()) } } + +#[rustfmt::skip] +fn parse_peer_fn_parts( + peer_part: PeerPart, + fn_part: FunctionPart, +) -> Result<(String, String, String)> { + match (peer_part, fn_part) { + (PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::ServiceIdWithFuncName(_service_id, func_name)) => { + Ok((peer_pk, peer_service_id, func_name)) + }, + (PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), FunctionPart::FuncName(func_name)) => { + Ok((peer_pk, peer_service_id, func_name)) + }, + (PeerPart::PeerPk(peer_pk), FunctionPart::ServiceIdWithFuncName(service_id, func_name)) => { + Ok((peer_pk, service_id, func_name)) + } + (PeerPart::PeerPk(_), FunctionPart::FuncName(_)) => Err(AquamarineError::InstructionError( + String::from("call should have service id specified by peer part or function part"), + )), + } +} + +fn parse_args(args: Vec, data: &AquaData) -> Result { + let result = Vec::with_capacity(args.len()); + + for arg in args { + let first_dot_pos = match arg.find('.') { + Some(pos) => {} + None => {} + }; + } + + Ok(serde_json::Value::Array(result)) +} + +fn parse_result_name(result_name: String) -> Result { + if !result_name.is_empty() { + Ok(result_name) + } else { + Err(AquamarineError::InstructionError(String::from( + "result name of a call instruction must be non empty", + ))) + } +} diff --git a/src/instructions/mod.rs b/src/instructions/mod.rs index d3ecf1f6..3f802fb0 100644 --- a/src/instructions/mod.rs +++ b/src/instructions/mod.rs @@ -15,19 +15,22 @@ */ mod call; +mod null; pub(self) use crate::stepper::ExecutableInstruction; + use crate::AquaData; use crate::Result; -pub(crate) use call::Call; +use call::Call; +use null::Null; use serde_derive::Deserialize; use serde_derive::Serialize; #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] -#[serde(rename_all = "kebab-case")] +#[serde(untagged)] pub(crate) enum Instruction { - Null, + Null(Null), Call(Call), /* Par(Box, Box), @@ -37,10 +40,10 @@ pub(crate) enum Instruction { } impl ExecutableInstruction for Instruction { - fn execute(self, data: &mut AquaData) -> Result<()> { + fn execute(self, data: &mut AquaData, next_peer_pks: &mut Vec) -> Result<()> { match self { - Instruction::Null => Ok(()), - Instruction::Call(call) => call.execute(data), + Instruction::Null(null) => null.execute(data, next_peer_pks), + Instruction::Call(call) => call.execute(data, next_peer_pks), } } } diff --git a/src/instructions/null.rs b/src/instructions/null.rs new file mode 100644 index 00000000..f916c220 --- /dev/null +++ b/src/instructions/null.rs @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::AquaData; +use crate::Result; + +use serde_derive::Deserialize; +use serde_derive::Serialize; + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct Null {} + +impl super::ExecutableInstruction for Null { + fn execute(self, _data: &mut AquaData, _next_peer_pks: &mut Vec) -> Result<()> { + Ok(()) + } +} diff --git a/src/stepper/execution.rs b/src/stepper/execution.rs index f4cea71a..33a30232 100644 --- a/src/stepper/execution.rs +++ b/src/stepper/execution.rs @@ -40,8 +40,8 @@ fn execute_aqua_impl(init_user_id: String, aqua: String, data: String) -> Result parsed_data ); - let data = super::stepper::execute(parsed_aqua, &mut parsed_data)?; - let data = serde_json::to_string(&data)?; + super::stepper::execute(parsed_aqua, &mut parsed_data)?; + let data = serde_json::to_string(&parsed_data)?; Ok(StepperOutcome { ret_code: 0, diff --git a/src/stepper/mod.rs b/src/stepper/mod.rs index a1bf9d3e..a562f9b8 100644 --- a/src/stepper/mod.rs +++ b/src/stepper/mod.rs @@ -19,6 +19,7 @@ mod stepper; mod stepper_outcome; pub use stepper_outcome::StepperOutcome; +pub use stepper_outcome::SUCCESS_ERROR_CODE; pub(crate) use execution::execute_aqua; pub(crate) use stepper::ExecutableInstruction; diff --git a/src/stepper/stepper.rs b/src/stepper/stepper.rs index 958c6c1a..49197f73 100644 --- a/src/stepper/stepper.rs +++ b/src/stepper/stepper.rs @@ -19,13 +19,15 @@ use crate::AquaData; use crate::Result; pub(crate) trait ExecutableInstruction { - fn execute(self, data: &mut AquaData) -> Result<()>; + fn execute(self, data: &mut AquaData, next_peer_pks: &mut Vec) -> Result<()>; } -pub(crate) fn execute(instructions: Vec, data: &mut AquaData) -> Result<()> { +pub(crate) fn execute(instructions: Vec, data: &mut AquaData) -> Result> { + let mut next_peer_pks = Vec::new(); + for instruction in instructions { - instruction.execute(data)?; + instruction.execute(data, &mut next_peer_pks)?; } - Ok(()) + Ok(next_peer_pks) } diff --git a/src/stepper/stepper_outcome.rs b/src/stepper/stepper_outcome.rs index fe2a4c2b..4efd5199 100644 --- a/src/stepper/stepper_outcome.rs +++ b/src/stepper/stepper_outcome.rs @@ -17,10 +17,12 @@ use fluence::fce; use serde::{Deserialize, Serialize}; +pub const SUCCESS_ERROR_CODE: i32 = 0; + #[fce] #[derive(Serialize, Deserialize)] pub struct StepperOutcome { - /// A return code, where 0 means success. + /// A return code, where SUCCESS_ERROR_CODE means success. pub ret_code: i32, /// Contains data if ret_code == 0, otherwise error message (that could be empty string). diff --git a/src/wasm_bindgen.rs b/src/wasm_bindgen.rs index 0ddcc943..86588683 100644 --- a/src/wasm_bindgen.rs +++ b/src/wasm_bindgen.rs @@ -15,13 +15,12 @@ */ mod air; +mod defines; mod errors; mod instructions; mod stepper; -pub(crate) type Result = std::result::Result; -pub(crate) type AquaData = std::collections::HashMap>; -pub(crate) use crate::stepper::StepperOutcome; +pub(crate) use crate::defines::*; use crate::stepper::execute_aqua; @@ -39,7 +38,13 @@ extern "C" { fn log(s: &str); } +pub fn call_service(service_id: String, fn_name: String, args: String) -> CallServiceResult { + let result = call_service_impl(service_id, fn_name, args); + serde_json::from_str(&result).unwrap() +} + #[wasm_bindgen(raw_module = "../src/call_service.ts")] extern "C" { - pub fn call_service(service_id: String, fn_name: String, args: String) -> String; + #[link_name = "call_service"] + fn call_service_impl(service_id: String, fn_name: String, args: String) -> String; }