Store ParticleParameters in AquamarineVM, pass them to call_service (#55)

This commit is contained in:
folex 2020-12-25 17:31:31 +03:00 committed by GitHub
parent f022a2dec4
commit d25f224558
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 186 additions and 64 deletions

111
Cargo.lock generated
View File

@ -44,7 +44,9 @@ name = "aquamarine-vm"
version = "0.1.6"
dependencies = [
"fluence-faas",
"log",
"maplit",
"parking_lot 0.11.1",
"serde",
"serde_json",
"stepper-interface",
@ -722,15 +724,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "fluence"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
"fluence-sdk-main 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
]
[[package]]
name = "fluence"
version = "0.2.13"
@ -741,6 +734,15 @@ dependencies = [
"fluence-sdk-main 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fluence"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
"fluence-sdk-main 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
]
[[package]]
name = "fluence-app-service"
version = "0.1.18"
@ -779,14 +781,6 @@ dependencies = [
"wasmer-wasi-fl",
]
[[package]]
name = "fluence-sdk-macro"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-wit 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
]
[[package]]
name = "fluence-sdk-macro"
version = "0.2.13"
@ -797,13 +791,11 @@ dependencies = [
]
[[package]]
name = "fluence-sdk-main"
name = "fluence-sdk-macro"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
"log",
"serde",
"fluence-sdk-wit 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
]
[[package]]
@ -818,9 +810,20 @@ dependencies = [
]
[[package]]
name = "fluence-sdk-wit"
name = "fluence-sdk-main"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
"log",
"serde",
]
[[package]]
name = "fluence-sdk-wit"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da5f51cddeae52ff5b91d1a5d8be90e54629f4887f89f8d7501b829b374fe6a"
dependencies = [
"proc-macro2",
"quote",
@ -833,8 +836,7 @@ dependencies = [
[[package]]
name = "fluence-sdk-wit"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da5f51cddeae52ff5b91d1a5d8be90e54629f4887f89f8d7501b829b374fe6a"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"proc-macro2",
"quote",
@ -1202,6 +1204,15 @@ dependencies = [
"safe-transmute",
]
[[package]]
name = "instant"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "inventory"
version = "0.1.10"
@ -1337,6 +1348,15 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "lock_api"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.11"
@ -1576,9 +1596,9 @@ checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
[[package]]
name = "openssl"
version = "0.10.31"
version = "0.10.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d008f51b1acffa0d3450a68606e6a51c123012edaacb0f4e1426bd978869187"
checksum = "038d43985d1ddca7a9900630d8cd031b56e4794eecc2e9ea39dd17aa04399a70"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
@ -1596,9 +1616,9 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
[[package]]
name = "openssl-sys"
version = "0.9.59"
version = "0.9.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de52d8eabd217311538a39bba130d7dea1f1e118010fee7a033d966845e7d5fe"
checksum = "921fc71883267538946025deffb622905ecad223c28efbfdef9bb59a0175f3e6"
dependencies = [
"autocfg",
"cc",
@ -1638,8 +1658,19 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
dependencies = [
"lock_api",
"parking_lot_core",
"lock_api 0.3.4",
"parking_lot_core 0.7.2",
]
[[package]]
name = "parking_lot"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
dependencies = [
"instant",
"lock_api 0.4.2",
"parking_lot_core 0.8.2",
]
[[package]]
@ -1656,6 +1687,20 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "parking_lot_core"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272"
dependencies = [
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
@ -2205,7 +2250,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stepper-interface"
version = "0.1.0"
source = "git+https://github.com/fluencelabs/aquamarine#724117547205d8ccc742d325b542af8f3df801b8"
source = "git+https://github.com/fluencelabs/aquamarine?branch=master#5cb4cc0fb0c149a4bd3160dd0bac9c3d5ac3db7d"
dependencies = [
"fluence 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"serde",
@ -2847,7 +2892,7 @@ dependencies = [
"libc",
"nix 0.15.0",
"page_size",
"parking_lot",
"parking_lot 0.10.2",
"rustc_version",
"serde",
"serde-bench",
@ -2876,7 +2921,7 @@ dependencies = [
"libc",
"nix 0.15.0",
"page_size",
"parking_lot",
"parking_lot 0.10.2",
"rustc_version",
"serde",
"serde-bench",

View File

@ -15,8 +15,10 @@ fluence-faas = { path = "../fluence-faas", version = "0.1.18" }
stepper-interface = { git = "https://github.com/fluencelabs/aquamarine", branch = "master" }
maplit = "1.0.2"
serde_json = "1.0.57"
serde = "1.0.116"
serde_json = "1.0.60"
serde = "1.0.118"
log = "0.4.11"
parking_lot = "0.11.1"
[features]
# enable raw AquamarineVM API intended for testing

View File

@ -14,29 +14,57 @@
* limitations under the License.
*/
use crate::Result;
use crate::{Result, IType, CallServiceClosure};
use crate::AquamarineVMError;
use crate::config::AquamarineVMConfig;
use fluence_faas::FaaSConfig;
use fluence_faas::{FaaSConfig, HostExportedFunc};
use fluence_faas::FluenceFaaS;
use fluence_faas::HostImportDescriptor;
use fluence_faas::IValue;
use stepper_interface::StepperOutcome;
use std::path::PathBuf;
use std::path::Path;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use parking_lot::{Mutex};
const CALL_SERVICE_NAME: &str = "call_service";
const CURRENT_PEER_ID_ENV_NAME: &str = "CURRENT_PEER_ID";
unsafe impl Send for AquamarineVM {}
/// A newtype needed to mark it as `unsafe impl Send`
struct SendSafeFaaS(FluenceFaaS);
/// Mark runtime as Send, so libp2p on the node (use-site) is happy
unsafe impl Send for SendSafeFaaS {}
impl Deref for SendSafeFaaS {
type Target = FluenceFaaS;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for SendSafeFaaS {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/// Information about the particle that is being executed by the stepper at the moment
#[derive(Debug, Default, Clone)]
pub struct ParticleParameters {
pub init_user_id: String,
pub particle_id: String,
}
pub struct AquamarineVM {
faas: FluenceFaaS,
faas: SendSafeFaaS,
particle_data_store: PathBuf,
/// file name of the AIR interpreter .wasm
wasm_filename: String,
/// information about the particle that is being executed at the moment
current_particle: Arc<Mutex<ParticleParameters>>,
}
impl AquamarineVM {
@ -44,12 +72,14 @@ impl AquamarineVM {
pub fn new(config: AquamarineVMConfig) -> Result<Self> {
use AquamarineVMError::InvalidDataStorePath;
let current_particle: Arc<Mutex<ParticleParameters>> = <_>::default();
let call_service = call_service_descriptor(current_particle.clone(), config.call_service);
let (wasm_dir, wasm_filename) = split_dirname(config.aquamarine_wasm_path)?;
let faas_config = make_faas_config(
wasm_dir,
&wasm_filename,
config.call_service,
call_service,
config.current_peer_id,
config.logging_mask,
);
@ -60,9 +90,10 @@ impl AquamarineVM {
.map_err(|e| InvalidDataStorePath(e, particle_data_store.clone()))?;
let aqua_vm = Self {
faas,
faas: SendSafeFaaS(faas),
particle_data_store,
wasm_filename,
current_particle,
};
Ok(aqua_vm)
@ -73,22 +104,23 @@ impl AquamarineVM {
init_user_id: impl Into<String>,
aqua: impl Into<String>,
data: impl Into<Vec<u8>>,
particle_id: impl AsRef<Path>,
particle_id: impl Into<String>,
) -> Result<StepperOutcome> {
use AquamarineVMError::PersistDataError;
let prev_data_path = self.particle_data_store.join(particle_id);
// TODO: check for errors related to invalid file content (such as invalid UTF8 string)
let prev_data = std::fs::read_to_string(&prev_data_path).unwrap_or_default();
let particle_id = particle_id.into();
let init_user_id = init_user_id.into();
let prev_data = into_ibytes_array(prev_data.into_bytes());
let data = into_ibytes_array(data.into());
let args = vec![
IValue::String(init_user_id.into()),
IValue::String(aqua.into()),
IValue::Array(prev_data),
IValue::Array(data),
];
let prev_data_path = self.particle_data_store.join(&particle_id);
// TODO: check for errors related to invalid file content (such as invalid UTF8 string)
let prev_data = std::fs::read_to_string(&prev_data_path)
.unwrap_or_default()
.into_bytes();
let args = prepare_args(prev_data, data, init_user_id.clone(), aqua);
// Update ParticleParams with the new values so subsequent calls to `call_service` can use them
self.update_current_particle(particle_id, init_user_id);
let result =
self.faas
@ -103,6 +135,49 @@ impl AquamarineVM {
Ok(outcome)
}
fn update_current_particle(&self, particle_id: String, init_user_id: String) {
let mut params = self.current_particle.lock();
params.particle_id = particle_id;
params.init_user_id = init_user_id;
}
}
fn prepare_args(
prev_data: Vec<u8>,
data: impl Into<Vec<u8>>,
init_user_id: String,
aqua: impl Into<String>,
) -> Vec<IValue> {
let prev_data = into_ibytes_array(prev_data);
let data = into_ibytes_array(data.into());
vec![
IValue::String(init_user_id),
IValue::String(aqua.into()),
IValue::Array(prev_data),
IValue::Array(data),
]
}
fn call_service_descriptor(
params: Arc<Mutex<ParticleParameters>>,
call_service: CallServiceClosure,
) -> HostImportDescriptor {
let call_service_closure: HostExportedFunc = Box::new(move |_, ivalues: Vec<IValue>| {
let params = {
let lock = params.lock();
lock.deref().clone()
};
call_service(params, ivalues)
});
HostImportDescriptor {
host_exported_func: call_service_closure,
argument_types: vec![IType::String, IType::String, IType::String, IType::String],
output_type: Some(IType::Record(0)),
error_handler: None,
}
}
/// Splits given path into its directory and file stem
@ -186,14 +261,7 @@ impl AquamarineVM {
prev_data: impl Into<Vec<u8>>,
data: impl Into<Vec<u8>>,
) -> Result<StepperOutcome> {
let prev_data = into_ibytes_array(prev_data.into());
let data = into_ibytes_array(data.into());
let args = vec![
IValue::String(init_user_id.into()),
IValue::String(aqua.into()),
IValue::Array(prev_data.into()),
IValue::Array(data.into()),
];
let args = prepare_args(prev_data.into(), data, init_user_id.into(), aqua);
let result =
self.faas

View File

@ -14,16 +14,21 @@
* limitations under the License.
*/
use fluence_faas::HostImportDescriptor;
use std::path::PathBuf;
use crate::aquamarine_stepper_vm::ParticleParameters;
use crate::IValue;
pub type CallServiceClosure =
Box<dyn Fn(ParticleParameters, Vec<IValue>) -> Option<IValue> + 'static>;
/// Describes behaviour of the Aquamarine VM stepper.
pub struct AquamarineVMConfig {
/// Path to a aquamarine stepper Wasm file.
pub aquamarine_wasm_path: PathBuf,
/// Descriptor of a closure that will be invoked on call_service call from Aquamarine stepper.
pub call_service: HostImportDescriptor,
pub call_service: CallServiceClosure,
/// Current peer id.
pub current_peer_id: String,

View File

@ -29,6 +29,8 @@ mod config;
mod errors;
pub use aquamarine_stepper_vm::AquamarineVM;
pub use aquamarine_stepper_vm::ParticleParameters;
pub use config::CallServiceClosure;
pub use config::AquamarineVMConfig;
pub use errors::AquamarineVMError;