diff --git a/Cargo.lock b/Cargo.lock index 90da3459..d1651722 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,7 +44,9 @@ name = "aquamarine-vm" version = "0.1.6" dependencies = [ "fluence-faas", + "log", "maplit", + "parking_lot 0.11.1", "serde", "serde_json", "stepper-interface", @@ -722,15 +724,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "fluence" -version = "0.2.13" -source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802" -dependencies = [ - "fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)", - "fluence-sdk-main 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)", -] - [[package]] name = "fluence" version = "0.2.13" @@ -741,6 +734,15 @@ dependencies = [ "fluence-sdk-main 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fluence" +version = "0.2.13" +source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802" +dependencies = [ + "fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)", + "fluence-sdk-main 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)", +] + [[package]] name = "fluence-app-service" version = "0.1.18" @@ -779,14 +781,6 @@ dependencies = [ "wasmer-wasi-fl", ] -[[package]] -name = "fluence-sdk-macro" -version = "0.2.13" -source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802" -dependencies = [ - "fluence-sdk-wit 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)", -] - [[package]] name = "fluence-sdk-macro" version = "0.2.13" @@ -797,13 +791,11 @@ dependencies = [ ] [[package]] -name = "fluence-sdk-main" +name = "fluence-sdk-macro" version = "0.2.13" source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802" dependencies = [ - "fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)", - "log", - "serde", + "fluence-sdk-wit 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)", ] [[package]] @@ -818,9 +810,20 @@ dependencies = [ ] [[package]] -name = "fluence-sdk-wit" +name = "fluence-sdk-main" version = "0.2.13" source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802" +dependencies = [ + "fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)", + "log", + "serde", +] + +[[package]] +name = "fluence-sdk-wit" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da5f51cddeae52ff5b91d1a5d8be90e54629f4887f89f8d7501b829b374fe6a" dependencies = [ "proc-macro2", "quote", @@ -833,8 +836,7 @@ dependencies = [ [[package]] name = "fluence-sdk-wit" version = "0.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da5f51cddeae52ff5b91d1a5d8be90e54629f4887f89f8d7501b829b374fe6a" +source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802" dependencies = [ "proc-macro2", "quote", @@ -1202,6 +1204,15 @@ dependencies = [ "safe-transmute", ] +[[package]] +name = "instant" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "inventory" version = "0.1.10" @@ -1337,6 +1348,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lock_api" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.11" @@ -1576,9 +1596,9 @@ checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0" [[package]] name = "openssl" -version = "0.10.31" +version = "0.10.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d008f51b1acffa0d3450a68606e6a51c123012edaacb0f4e1426bd978869187" +checksum = "038d43985d1ddca7a9900630d8cd031b56e4794eecc2e9ea39dd17aa04399a70" dependencies = [ "bitflags", "cfg-if 1.0.0", @@ -1596,9 +1616,9 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" [[package]] name = "openssl-sys" -version = "0.9.59" +version = "0.9.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de52d8eabd217311538a39bba130d7dea1f1e118010fee7a033d966845e7d5fe" +checksum = "921fc71883267538946025deffb622905ecad223c28efbfdef9bb59a0175f3e6" dependencies = [ "autocfg", "cc", @@ -1638,8 +1658,19 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" dependencies = [ - "lock_api", - "parking_lot_core", + "lock_api 0.3.4", + "parking_lot_core 0.7.2", +] + +[[package]] +name = "parking_lot" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +dependencies = [ + "instant", + "lock_api 0.4.2", + "parking_lot_core 0.8.2", ] [[package]] @@ -1656,6 +1687,20 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "parking_lot_core" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272" +dependencies = [ + "cfg-if 1.0.0", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.9", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -2205,7 +2250,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "stepper-interface" version = "0.1.0" -source = "git+https://github.com/fluencelabs/aquamarine#724117547205d8ccc742d325b542af8f3df801b8" +source = "git+https://github.com/fluencelabs/aquamarine?branch=master#5cb4cc0fb0c149a4bd3160dd0bac9c3d5ac3db7d" dependencies = [ "fluence 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "serde", @@ -2847,7 +2892,7 @@ dependencies = [ "libc", "nix 0.15.0", "page_size", - "parking_lot", + "parking_lot 0.10.2", "rustc_version", "serde", "serde-bench", @@ -2876,7 +2921,7 @@ dependencies = [ "libc", "nix 0.15.0", "page_size", - "parking_lot", + "parking_lot 0.10.2", "rustc_version", "serde", "serde-bench", diff --git a/aquamarine-vm/Cargo.toml b/aquamarine-vm/Cargo.toml index 58e635c1..972519e0 100644 --- a/aquamarine-vm/Cargo.toml +++ b/aquamarine-vm/Cargo.toml @@ -15,8 +15,10 @@ fluence-faas = { path = "../fluence-faas", version = "0.1.18" } stepper-interface = { git = "https://github.com/fluencelabs/aquamarine", branch = "master" } maplit = "1.0.2" -serde_json = "1.0.57" -serde = "1.0.116" +serde_json = "1.0.60" +serde = "1.0.118" +log = "0.4.11" +parking_lot = "0.11.1" [features] # enable raw AquamarineVM API intended for testing diff --git a/aquamarine-vm/src/aquamarine_stepper_vm.rs b/aquamarine-vm/src/aquamarine_stepper_vm.rs index 1abbc555..87f20ed0 100644 --- a/aquamarine-vm/src/aquamarine_stepper_vm.rs +++ b/aquamarine-vm/src/aquamarine_stepper_vm.rs @@ -14,29 +14,57 @@ * limitations under the License. */ -use crate::Result; +use crate::{Result, IType, CallServiceClosure}; use crate::AquamarineVMError; use crate::config::AquamarineVMConfig; -use fluence_faas::FaaSConfig; +use fluence_faas::{FaaSConfig, HostExportedFunc}; use fluence_faas::FluenceFaaS; use fluence_faas::HostImportDescriptor; use fluence_faas::IValue; use stepper_interface::StepperOutcome; use std::path::PathBuf; -use std::path::Path; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; +use parking_lot::{Mutex}; const CALL_SERVICE_NAME: &str = "call_service"; const CURRENT_PEER_ID_ENV_NAME: &str = "CURRENT_PEER_ID"; -unsafe impl Send for AquamarineVM {} +/// A newtype needed to mark it as `unsafe impl Send` +struct SendSafeFaaS(FluenceFaaS); + +/// Mark runtime as Send, so libp2p on the node (use-site) is happy +unsafe impl Send for SendSafeFaaS {} + +impl Deref for SendSafeFaaS { + type Target = FluenceFaaS; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for SendSafeFaaS { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Information about the particle that is being executed by the stepper at the moment +#[derive(Debug, Default, Clone)] +pub struct ParticleParameters { + pub init_user_id: String, + pub particle_id: String, +} pub struct AquamarineVM { - faas: FluenceFaaS, + faas: SendSafeFaaS, particle_data_store: PathBuf, /// file name of the AIR interpreter .wasm wasm_filename: String, + /// information about the particle that is being executed at the moment + current_particle: Arc>, } impl AquamarineVM { @@ -44,12 +72,14 @@ impl AquamarineVM { pub fn new(config: AquamarineVMConfig) -> Result { use AquamarineVMError::InvalidDataStorePath; + let current_particle: Arc> = <_>::default(); + let call_service = call_service_descriptor(current_particle.clone(), config.call_service); let (wasm_dir, wasm_filename) = split_dirname(config.aquamarine_wasm_path)?; let faas_config = make_faas_config( wasm_dir, &wasm_filename, - config.call_service, + call_service, config.current_peer_id, config.logging_mask, ); @@ -60,9 +90,10 @@ impl AquamarineVM { .map_err(|e| InvalidDataStorePath(e, particle_data_store.clone()))?; let aqua_vm = Self { - faas, + faas: SendSafeFaaS(faas), particle_data_store, wasm_filename, + current_particle, }; Ok(aqua_vm) @@ -73,22 +104,23 @@ impl AquamarineVM { init_user_id: impl Into, aqua: impl Into, data: impl Into>, - particle_id: impl AsRef, + particle_id: impl Into, ) -> Result { use AquamarineVMError::PersistDataError; - let prev_data_path = self.particle_data_store.join(particle_id); - // TODO: check for errors related to invalid file content (such as invalid UTF8 string) - let prev_data = std::fs::read_to_string(&prev_data_path).unwrap_or_default(); + let particle_id = particle_id.into(); + let init_user_id = init_user_id.into(); - let prev_data = into_ibytes_array(prev_data.into_bytes()); - let data = into_ibytes_array(data.into()); - let args = vec![ - IValue::String(init_user_id.into()), - IValue::String(aqua.into()), - IValue::Array(prev_data), - IValue::Array(data), - ]; + let prev_data_path = self.particle_data_store.join(&particle_id); + // TODO: check for errors related to invalid file content (such as invalid UTF8 string) + let prev_data = std::fs::read_to_string(&prev_data_path) + .unwrap_or_default() + .into_bytes(); + + let args = prepare_args(prev_data, data, init_user_id.clone(), aqua); + + // Update ParticleParams with the new values so subsequent calls to `call_service` can use them + self.update_current_particle(particle_id, init_user_id); let result = self.faas @@ -103,6 +135,49 @@ impl AquamarineVM { Ok(outcome) } + + fn update_current_particle(&self, particle_id: String, init_user_id: String) { + let mut params = self.current_particle.lock(); + params.particle_id = particle_id; + params.init_user_id = init_user_id; + } +} + +fn prepare_args( + prev_data: Vec, + data: impl Into>, + init_user_id: String, + aqua: impl Into, +) -> Vec { + let prev_data = into_ibytes_array(prev_data); + let data = into_ibytes_array(data.into()); + + vec![ + IValue::String(init_user_id), + IValue::String(aqua.into()), + IValue::Array(prev_data), + IValue::Array(data), + ] +} + +fn call_service_descriptor( + params: Arc>, + call_service: CallServiceClosure, +) -> HostImportDescriptor { + let call_service_closure: HostExportedFunc = Box::new(move |_, ivalues: Vec| { + let params = { + let lock = params.lock(); + lock.deref().clone() + }; + call_service(params, ivalues) + }); + + HostImportDescriptor { + host_exported_func: call_service_closure, + argument_types: vec![IType::String, IType::String, IType::String, IType::String], + output_type: Some(IType::Record(0)), + error_handler: None, + } } /// Splits given path into its directory and file stem @@ -186,14 +261,7 @@ impl AquamarineVM { prev_data: impl Into>, data: impl Into>, ) -> Result { - let prev_data = into_ibytes_array(prev_data.into()); - let data = into_ibytes_array(data.into()); - let args = vec![ - IValue::String(init_user_id.into()), - IValue::String(aqua.into()), - IValue::Array(prev_data.into()), - IValue::Array(data.into()), - ]; + let args = prepare_args(prev_data.into(), data, init_user_id.into(), aqua); let result = self.faas diff --git a/aquamarine-vm/src/config.rs b/aquamarine-vm/src/config.rs index cede41d4..292540c7 100644 --- a/aquamarine-vm/src/config.rs +++ b/aquamarine-vm/src/config.rs @@ -14,16 +14,21 @@ * limitations under the License. */ -use fluence_faas::HostImportDescriptor; use std::path::PathBuf; +use crate::aquamarine_stepper_vm::ParticleParameters; +use crate::IValue; + +pub type CallServiceClosure = + Box) -> Option + 'static>; + /// Describes behaviour of the Aquamarine VM stepper. pub struct AquamarineVMConfig { /// Path to a aquamarine stepper Wasm file. pub aquamarine_wasm_path: PathBuf, /// Descriptor of a closure that will be invoked on call_service call from Aquamarine stepper. - pub call_service: HostImportDescriptor, + pub call_service: CallServiceClosure, /// Current peer id. pub current_peer_id: String, diff --git a/aquamarine-vm/src/lib.rs b/aquamarine-vm/src/lib.rs index ad29b843..866b933d 100644 --- a/aquamarine-vm/src/lib.rs +++ b/aquamarine-vm/src/lib.rs @@ -29,6 +29,8 @@ mod config; mod errors; pub use aquamarine_stepper_vm::AquamarineVM; +pub use aquamarine_stepper_vm::ParticleParameters; +pub use config::CallServiceClosure; pub use config::AquamarineVMConfig; pub use errors::AquamarineVMError; diff --git a/examples/call_parameters/artifacts/call_parameters.wasm b/examples/call_parameters/artifacts/call_parameters.wasm index 2a0452eb..a589d140 100755 Binary files a/examples/call_parameters/artifacts/call_parameters.wasm and b/examples/call_parameters/artifacts/call_parameters.wasm differ diff --git a/examples/greeting/artifacts/greeting.wasm b/examples/greeting/artifacts/greeting.wasm index ebdbc773..a3384c7a 100755 Binary files a/examples/greeting/artifacts/greeting.wasm and b/examples/greeting/artifacts/greeting.wasm differ diff --git a/examples/ipfs-node/artifacts/ipfs_effector.wasm b/examples/ipfs-node/artifacts/ipfs_effector.wasm index 202dbb1f..f405e03e 100755 Binary files a/examples/ipfs-node/artifacts/ipfs_effector.wasm and b/examples/ipfs-node/artifacts/ipfs_effector.wasm differ diff --git a/examples/ipfs-node/artifacts/ipfs_pure.wasm b/examples/ipfs-node/artifacts/ipfs_pure.wasm index 167d62f1..53236f46 100755 Binary files a/examples/ipfs-node/artifacts/ipfs_pure.wasm and b/examples/ipfs-node/artifacts/ipfs_pure.wasm differ diff --git a/examples/records/artifacts/records_effector.wasm b/examples/records/artifacts/records_effector.wasm index 7a5d68b1..07cc17a1 100755 Binary files a/examples/records/artifacts/records_effector.wasm and b/examples/records/artifacts/records_effector.wasm differ diff --git a/examples/records/artifacts/records_pure.wasm b/examples/records/artifacts/records_pure.wasm index f5e611ab..a285de60 100755 Binary files a/examples/records/artifacts/records_pure.wasm and b/examples/records/artifacts/records_pure.wasm differ diff --git a/examples/sqlite/artifacts/sqlite_test.wasm b/examples/sqlite/artifacts/sqlite_test.wasm index 1960f1aa..0d96cdcb 100755 Binary files a/examples/sqlite/artifacts/sqlite_test.wasm and b/examples/sqlite/artifacts/sqlite_test.wasm differ diff --git a/examples/url-downloader/artifacts/curl_adapter.wasm b/examples/url-downloader/artifacts/curl_adapter.wasm index a78993c1..7ad2c700 100755 Binary files a/examples/url-downloader/artifacts/curl_adapter.wasm and b/examples/url-downloader/artifacts/curl_adapter.wasm differ diff --git a/examples/url-downloader/artifacts/facade.wasm b/examples/url-downloader/artifacts/facade.wasm index 96b5ed99..98d976ea 100755 Binary files a/examples/url-downloader/artifacts/facade.wasm and b/examples/url-downloader/artifacts/facade.wasm differ diff --git a/examples/url-downloader/artifacts/local_storage.wasm b/examples/url-downloader/artifacts/local_storage.wasm index 25dba77e..988fc228 100755 Binary files a/examples/url-downloader/artifacts/local_storage.wasm and b/examples/url-downloader/artifacts/local_storage.wasm differ