Support prev data persistence (#35)

This commit is contained in:
vms 2020-10-16 15:26:09 +03:00 committed by GitHub
parent 05473672a1
commit 693e8cdfb1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 229 additions and 72 deletions

15
Cargo.lock generated
View File

@ -44,6 +44,7 @@ name = "aquamarine-vm"
version = "0.1.0"
dependencies = [
"fluence-faas",
"maplit",
"serde",
"serde_json",
]
@ -1307,6 +1308,12 @@ dependencies = [
"cfg-if 0.1.10",
]
[[package]]
name = "maplit"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "matches"
version = "0.1.8"
@ -1996,9 +2003,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.116"
version = "1.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96fe57af81d28386a513cbc6858332abc6117cfdb5999647c6444b8f43a370a5"
checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a"
dependencies = [
"serde_derive",
]
@ -2024,9 +2031,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.116"
version = "1.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f630a6370fd8e457873b4bd2ffdae75408bc291ba72be773772a4c2a065d9ae8"
checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e"
dependencies = [
"proc-macro2",
"quote",

View File

@ -12,5 +12,7 @@ path = "src/lib.rs"
[dependencies]
fluence-faas = { path = "../fluence-faas", version = "0.1.9" }
maplit = "1.0.2"
serde_json = "1.0.57"
serde = "1.0.116"

View File

@ -20,10 +20,14 @@ use crate::config::AquamarineVMConfig;
use crate::stepper_outcome::StepperOutcome;
use crate::stepper_outcome::RawStepperOutcome;
use fluence_faas::FaaSConfig;
use fluence_faas::FluenceFaaS;
use fluence_faas::HostImportDescriptor;
use fluence_faas::IValue;
use std::collections::HashMap;
use std::convert::TryInto;
use std::path::PathBuf;
use std::path::Path;
const AQUAMARINE_WASM_FILE_NAME: &str = "aquamarine";
const CALL_SERVICE_NAME: &str = "call_service";
@ -33,72 +37,138 @@ unsafe impl Send for AquamarineVM {}
pub struct AquamarineVM {
faas: FluenceFaaS,
particle_data_store: PathBuf,
}
impl AquamarineVM {
/// Create AquamarineVM with provided config.
pub fn new(config: AquamarineVMConfig) -> Result<Self> {
use fluence_faas::FaaSConfig;
use fluence_faas::FaaSModuleConfig;
use AquamarineVMError::InvalidDataStorePath;
let mut host_imports = HashMap::new();
host_imports.insert(String::from(CALL_SERVICE_NAME), config.call_service);
let faas_config = Self::make_faas_config(
config.aquamarine_wasm_path,
config.call_service,
config.current_peer_id,
);
let faas = FluenceFaaS::with_raw_config(faas_config)?;
let mut aquamarine_module_config = FaaSModuleConfig {
mem_pages_count: None,
logger_enabled: true,
host_imports,
wasi: None,
let particle_data_store = config.particle_data_store;
std::fs::create_dir_all(&particle_data_store)
.map_err(|e| InvalidDataStorePath(e, particle_data_store.clone()))?;
Ok(Self {
faas,
particle_data_store,
})
}
pub fn call(
&mut self,
init_user_id: impl Into<String>,
aqua: impl Into<String>,
data: impl Into<String>,
particle_id: impl AsRef<Path>,
) -> Result<StepperOutcome> {
use AquamarineVMError::PersistDataError;
let prev_data_path = self.particle_data_store.join(particle_id);
// TODO: check for errors related to invalid file content (such as invalid UTF8 string)
let prev_data = std::fs::read_to_string(&prev_data_path).unwrap_or(String::from("{}"));
let args = vec![
IValue::String(init_user_id.into()),
IValue::String(aqua.into()),
IValue::String(prev_data.into()),
IValue::String(data.into()),
];
let result = self.faas.call_with_ivalues(
AQUAMARINE_WASM_FILE_NAME,
"invoke",
&args,
<_>::default(),
)?;
let raw_outcome = Self::make_raw_outcome(result)?;
std::fs::write(&prev_data_path, &raw_outcome.data)
.map_err(|e| PersistDataError(e, prev_data_path))?;
raw_outcome.try_into()
}
fn make_faas_config(
aquamarine_wasm_path: PathBuf,
call_service: HostImportDescriptor,
current_peer_id: String,
) -> FaaSConfig {
use maplit::hashmap;
let make_faas_module_config = |call_service: HostImportDescriptor| {
use fluence_faas::FaaSModuleConfig;
let host_imports = hashmap! {
String::from(CALL_SERVICE_NAME) => call_service
};
FaaSModuleConfig {
mem_pages_count: None,
logger_enabled: true,
host_imports,
wasi: None,
}
};
let mut envs = HashMap::new();
envs.insert(
CURRENT_PEER_ID_ENV_NAME.as_bytes().to_vec(),
config.current_peer_id.into_bytes(),
);
let mut aquamarine_module_config = make_faas_module_config(call_service);
let envs = hashmap! {
CURRENT_PEER_ID_ENV_NAME.as_bytes().to_vec() => current_peer_id.into_bytes(),
};
aquamarine_module_config.extend_wasi_envs(envs);
let mut aquamarine_wasm_dir = config.aquamarine_wasm_path;
let mut aquamarine_wasm_dir = aquamarine_wasm_path;
// faas config requires a path to the directory with Wasm modules
aquamarine_wasm_dir.pop();
let faas_config = FaaSConfig {
FaaSConfig {
modules_dir: Some(aquamarine_wasm_dir),
modules_config: vec![(
String::from(AQUAMARINE_WASM_FILE_NAME),
aquamarine_module_config,
)],
default_modules_config: None,
};
let faas = FluenceFaaS::with_raw_config(faas_config)?;
Ok(Self { faas })
}
}
#[rustfmt::skip]
pub fn call(&mut self, args: serde_json::Value) -> Result<StepperOutcome> {
use fluence_faas::IValue;
fn make_raw_outcome(mut result: Vec<IValue>) -> Result<RawStepperOutcome> {
use AquamarineVMError::AquamarineResultError;
let mut result = self
.faas
.call_with_json(AQUAMARINE_WASM_FILE_NAME, "invoke", args, <_>::default())?;
let raw_outcome = match result.remove(0) {
match result.remove(0) {
IValue::Record(record_values) => {
let mut record_values = record_values.into_vec();
if record_values.len() != 3 {
return Err(AquamarineVMError::AquamarineResultError(format!("expected StepperOutcome struct with 3 fields, got {:?}", record_values)));
return Err(AquamarineResultError(format!(
"expected StepperOutcome struct with 3 fields, got {:?}",
record_values
)));
}
let ret_code = match record_values.remove(0) {
IValue::S32(ret_code) => ret_code,
v => return Err(AquamarineVMError::AquamarineResultError(format!("expected i32 for ret_code, got {:?}", v))),
v => {
return Err(AquamarineResultError(format!(
"expected i32 for ret_code, got {:?}",
v
)))
}
};
let data = match record_values.remove(0) {
IValue::String(str) => str,
v => return Err(AquamarineVMError::AquamarineResultError(format!("expected string for data, got {:?}", v))),
v => {
return Err(AquamarineResultError(format!(
"expected string for data, got {:?}",
v
)))
}
};
let next_peer_pks = match record_values.remove(0) {
@ -107,24 +177,33 @@ impl AquamarineVM {
.into_iter()
.map(|v| match v {
IValue::String(str) => Ok(str),
v => Err(AquamarineVMError::AquamarineResultError(format!("expected string for next_peer_pks, got {:?}", v))),
v => Err(AquamarineResultError(format!(
"expected string for next_peer_pks, got {:?}",
v
))),
})
.collect::<Result<Vec<String>>>()?;
Ok(array)
},
v => Err(AquamarineVMError::AquamarineResultError(format!("expected array for next_peer_pks, got {:?}", v))),
}
v => Err(AquamarineResultError(format!(
"expected array for next_peer_pks, got {:?}",
v
))),
}?;
RawStepperOutcome {
Ok(RawStepperOutcome {
ret_code,
data,
next_peer_pks,
}
})
}
v => return Err(AquamarineVMError::AquamarineResultError(format!("expected record for StepperOutcome, got {:?}", v))),
};
raw_outcome.try_into()
v => {
return Err(AquamarineResultError(format!(
"expected record for StepperOutcome, got {:?}",
v
)))
}
}
}
}

View File

@ -27,4 +27,8 @@ pub struct AquamarineVMConfig {
/// Current peer id.
pub current_peer_id: String,
/// Path to a folder contains prev data.
/// AquamarineVM uses it to store data obtained after stepper execution, and load it as a prev_data by particle_id.
pub particle_data_store: PathBuf,
}

View File

@ -17,7 +17,9 @@
use crate::stepper_outcome::StepperError;
use fluence_faas::FaaSError;
use std::io::Error as IOError;
use std::error::Error;
use std::path::PathBuf;
#[derive(Debug)]
pub enum AquamarineVMError {
@ -29,6 +31,12 @@ pub enum AquamarineVMError {
/// Errors related to stepper execution.
StepperError(StepperError),
/// I/O errors while persisting resulted data.
PersistDataError(IOError, PathBuf),
/// Errors related to particle_data_store path from supplied config.
InvalidDataStorePath(IOError, PathBuf),
}
impl Error for AquamarineVMError {}
@ -39,6 +47,16 @@ impl std::fmt::Display for AquamarineVMError {
AquamarineVMError::FaaSError(err) => write!(f, "{}", err),
AquamarineVMError::AquamarineResultError(err_msg) => write!(f, "{}", err_msg),
AquamarineVMError::StepperError(err) => write!(f, "{}", err),
AquamarineVMError::PersistDataError(err, path) => write!(
f,
"an error occurred while saving prev data {:?} by {:?} path",
err, path
),
AquamarineVMError::InvalidDataStorePath(err, path) => write!(
f,
"an error occurred while creating data storage {:?} by {:?} path",
err, path
),
}
}
}

View File

@ -87,6 +87,24 @@ pub enum StepperError {
/// Related to such ret_code that doesn't have match with current StepperError.
UnknownError(String),
/// Errors occurred on call evidence deserialization.
CallEvidenceDeserializationError(String),
/// Errors occurred on call evidence serialization.
CallEvidenceSerializationError(String),
/// Errors occurred when reserved keyword is used for variable name.
ReservedKeywordError(String),
/// Errors occurred when previous and current evidence states are incompatible.
IncompatibleEvidenceStates(String),
/// Errors occurred when previous and current call results are incompatible.
IncompatibleCallResults(String),
/// Errors occurred when evidence path contains less elements then corresponding Par has.
EvidencePathTooSmall(String),
}
impl Error for StepperError {}
@ -110,6 +128,12 @@ impl std::fmt::Display for StepperError {
StepperError::FoldStateNotFound(err_msg) => write!(f, "{}", err_msg),
StepperError::MultipleFoldStates(err_msg) => write!(f, "{}", err_msg),
StepperError::InvalidEvidenceState(err_msg) => write!(f, "{}", err_msg),
StepperError::CallEvidenceDeserializationError(err_msg) => write!(f, "{}", err_msg),
StepperError::CallEvidenceSerializationError(err_msg) => write!(f, "{}", err_msg),
StepperError::ReservedKeywordError(err_msg) => write!(f, "{}", err_msg),
StepperError::IncompatibleEvidenceStates(err_msg) => write!(f, "{}", err_msg),
StepperError::IncompatibleCallResults(err_msg) => write!(f, "{}", err_msg),
StepperError::EvidencePathTooSmall(err_msg) => write!(f, "{}", err_msg),
StepperError::UnknownError(err_msg) => write!(f, "{}", err_msg),
}
}
@ -147,7 +171,12 @@ impl TryFrom<RawStepperOutcome> for StepperOutcome {
13 => to_vm_error!(MultipleValuesInJsonPath),
14 => to_vm_error!(FoldStateNotFound),
15 => to_vm_error!(MultipleFoldStates),
16 => to_vm_error!(InvalidEvidenceState),
16 => to_vm_error!(CallEvidenceDeserializationError),
17 => to_vm_error!(CallEvidenceSerializationError),
18 => to_vm_error!(ReservedKeywordError),
19 => to_vm_error!(IncompatibleEvidenceStates),
20 => to_vm_error!(IncompatibleCallResults),
21 => to_vm_error!(EvidencePathTooSmall),
_ => to_vm_error!(UnknownError),
}
}

View File

@ -46,20 +46,20 @@ impl FCE {
&mut self,
module_name: MN,
func_name: FN,
argument: &[IValue],
arguments: &[IValue],
) -> Result<Vec<IValue>> {
self.call_(module_name.as_ref(), func_name.as_ref(), argument)
self.call_(module_name.as_ref(), func_name.as_ref(), arguments)
}
fn call_(
&mut self,
module_name: &str,
func_name: &str,
argument: &[IValue],
arguments: &[IValue],
) -> Result<Vec<IValue>> {
match self.modules.get_mut(module_name) {
// TODO: refactor errors
Some(module) => module.call(func_name.as_ref(), argument),
Some(module) => module.call(func_name.as_ref(), arguments),
None => Err(FCEError::NoSuchModule(format!(
"trying to call module with name {} that is not loaded",
module_name

View File

@ -48,27 +48,22 @@ impl std::fmt::Display for HostImportError {
HostImportError::MismatchWValues(expected_type, found_value) => write!(
f,
"Expected {} type, but found {:?} value during interface values lifting from Wasm memory",
expected_type,
found_value
),
HostImportError::MismatchWValuesCount => write!(
f,
"Not enough WValue arguments are provided from the Wasm side"
expected_type, found_value
),
HostImportError::MismatchWValuesCount => {
write!(f, "Not enough WValue arguments are provided from the Wasm side")
}
HostImportError::InvalidMemoryAccess(offset, size) => write!(
f,
"Invalid memory access while lifting IValues, offset {}, size {}", offset, size
),
HostImportError::OddPointersCount(itype) => write!(
f,
"Arrays of pointers for value type {:?} contains odd count",
itype
),
HostImportError::RecordTypeNotFound(record_type_id) => write!(
f,
"Record with type id {} not found",
record_type_id
"Invalid memory access while lifting IValues, offset {}, size {}",
offset, size
),
HostImportError::OddPointersCount(itype) => {
write!(f, "Arrays of pointers for value type {:?} contains odd count", itype)
}
HostImportError::RecordTypeNotFound(record_type_id) => {
write!(f, "Record with type id {} not found", record_type_id)
}
}
}
}

BIN
examples/records/artifacts/records_effector.wasm Executable file → Normal file

Binary file not shown.

BIN
examples/records/artifacts/records_pure.wasm Executable file → Normal file

Binary file not shown.

View File

@ -151,6 +151,28 @@ impl AppService {
// This API is intended for testing purposes (mostly in FCE REPL)
#[cfg(feature = "raw-module-api")]
impl AppService {
pub fn new_with_empty_facade<C, S>(
config: C,
service_id: S,
envs: HashMap<Vec<u8>, Vec<u8>>,
) -> Result<Self>
where
C: TryInto<AppServiceConfig>,
S: Into<String>,
AppServiceError: From<C::Error>,
{
let mut config: AppServiceConfig = config.try_into()?;
let service_id = service_id.into();
Self::set_env_and_dirs(&mut config, service_id, envs)?;
let faas = FluenceFaaS::with_raw_config(config.faas_config)?;
Ok(Self {
faas,
facade_module_name: String::new(),
})
}
pub fn call_with_module_name<MN: AsRef<str>, FN: AsRef<str>>(
&mut self,
module_name: MN,

View File

@ -80,12 +80,13 @@ impl FluenceFaaS {
let modules_dir = config.modules_dir;
for (module_name, module_config) in config.modules_config {
let module_bytes = modules.remove(&module_name).ok_or_else(|| {
FaaSError::InstantiationError(format!(
let module_bytes =
modules.remove(&module_name).ok_or_else(|| {
FaaSError::InstantiationError(format!(
"module with name {} is specified in config (dir: {:?}), but not found in provided modules: {:?}",
module_name, modules_dir, modules.keys().collect::<Vec<_>>()
))
})?;
})?;
let fce_module_config =
crate::misc::make_fce_config(Some(module_config), call_parameters.clone())?;

View File

@ -179,7 +179,7 @@ impl REPL {
.unwrap_or_default();
config.service_base_dir = Some(tmp_path);
let app_service = AppService::new(config, &service_id, HashMap::new())?;
let app_service = AppService::new_with_empty_facade(config, &service_id, HashMap::new())?;
let duration = start.elapsed();