diff --git a/Cargo.lock b/Cargo.lock index 12c1384d..fe10d836 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,11 +233,14 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "avm-data-store" -version = "0.1.0" +version = "0.2.0" +dependencies = [ + "serde", +] [[package]] name = "avm-server" -version = "0.20.2" +version = "0.21.0" dependencies = [ "air-interpreter-interface", "avm-data-store", diff --git a/avm/server/Cargo.toml b/avm/server/Cargo.toml index 8fb75f69..72914da6 100644 --- a/avm/server/Cargo.toml +++ b/avm/server/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "avm-server" description = "Fluence AIR VM" -version = "0.20.2" +version = "0.21.0" authors = ["Fluence Labs"] edition = "2018" license = "Apache-2.0" @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] air-interpreter-interface = { version = "0.10.0", path = "../../crates/air-lib/interpreter-interface" } -avm-data-store = { version = "0.1.0", path = "../../crates/data-store" } +avm-data-store = { version = "0.2.0", path = "../../crates/data-store" } fluence-faas = "0.16.2" polyplets = { version = "0.2.0", path = "../../crates/air-lib/polyplets" } diff --git a/avm/server/src/avm.rs b/avm/server/src/avm.rs index 083d2411..bb3e43c4 100644 --- a/avm/server/src/avm.rs +++ b/avm/server/src/avm.rs @@ -21,11 +21,15 @@ use super::AVMMemoryStats; use super::AVMOutcome; use super::CallResults; use crate::config::AVMConfig; +use crate::interface::raw_outcome::RawAVMOutcome; use crate::interface::ParticleParameters; use crate::AVMResult; +use avm_data_store::AnomalyData; use std::ops::Deref; use std::ops::DerefMut; +use std::time::Duration; +use std::time::Instant; /// A newtype needed to mark it as `unsafe impl Send` struct SendSafeRunner(AVMRunner); @@ -81,23 +85,38 @@ impl AVM { ) -> AVMResult { let particle_id = particle_parameters.particle_id.as_str(); let prev_data = self.data_store.read_data(particle_id)?; + let current_data = data.into(); + let execution_start_time = Instant::now(); + let memory_size_before = self.memory_stats().memory_size; let outcome = self .runner .call( air, prev_data, - data, - particle_parameters.init_peer_id.into_owned(), + current_data.clone(), + particle_parameters.init_peer_id.clone().into_owned(), particle_parameters.timestamp, particle_parameters.ttl, call_results, ) .map_err(AVMError::RunnerError)?; + let execution_time = execution_start_time.elapsed(); + let memory_delta = self.memory_stats().memory_size - memory_size_before; + if self.data_store.detect_anomaly(execution_time, memory_delta) { + self.save_anomaly_data( + ¤t_data, + &particle_parameters, + &outcome, + execution_time, + memory_delta, + )?; + } + // persist resulted data self.data_store.store_data(&outcome.data, particle_id)?; - let outcome = AVMOutcome::from_raw_outcome(outcome)?; + let outcome = AVMOutcome::from_raw_outcome(outcome, memory_delta, execution_time)?; Ok(outcome) } @@ -112,4 +131,34 @@ impl AVM { pub fn memory_stats(&self) -> AVMMemoryStats { self.runner.memory_stats() } + + fn save_anomaly_data( + &mut self, + current_data: &[u8], + particle_parameters: &ParticleParameters<'_, '_>, + avm_outcome: &RawAVMOutcome, + execution_time: Duration, + memory_delta: usize, + ) -> AVMResult<(), E> { + let prev_data = self + .data_store + .read_data(particle_parameters.particle_id.as_str())?; + let ser_particle = + serde_json::to_vec(particle_parameters).map_err(AVMError::AnomalyDataSeError)?; + let ser_avm_outcome = + serde_json::to_vec(avm_outcome).map_err(AVMError::AnomalyDataSeError)?; + + let anomaly_data = AnomalyData::new( + &ser_particle, + &prev_data, + ¤t_data, + &ser_avm_outcome, + execution_time, + memory_delta, + ); + + self.data_store + .collect_anomaly_data(&particle_parameters.particle_id, anomaly_data) + .map_err(Into::into) + } } diff --git a/avm/server/src/errors.rs b/avm/server/src/errors.rs index c6766f5c..c34005ff 100644 --- a/avm/server/src/errors.rs +++ b/avm/server/src/errors.rs @@ -38,6 +38,10 @@ pub enum AVMError { /// This errors are encountered from a data store object. #[error(transparent)] DataStoreError(#[from] E), + + /// This errors are encountered from serialization of data tracked during an anomaly. + #[error(transparent)] + AnomalyDataSeError(SerdeError), } #[derive(Debug, ThisError)] diff --git a/avm/server/src/interface/outcome.rs b/avm/server/src/interface/outcome.rs index 981f80fa..33fdcfd3 100644 --- a/avm/server/src/interface/outcome.rs +++ b/avm/server/src/interface/outcome.rs @@ -22,6 +22,8 @@ use crate::AVMResult; use serde::Deserialize; use serde::Serialize; +use std::time::Duration; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AVMOutcome { /// Contains script data that should be preserved in an executor of this interpreter @@ -33,6 +35,13 @@ pub struct AVMOutcome { /// Public keys of peers that should receive data. pub next_peer_pks: Vec, + + /// Memory in bytes AVM linear heap was extended during execution by. + pub memory_delta: usize, + + /// Time of a particle execution + /// (it counts only execution time without operations with DataStore and so on) + pub execution_time: Duration, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -47,15 +56,23 @@ impl AVMOutcome { data: Vec, call_requests: CallRequests, next_peer_pks: Vec, + memory_delta: usize, + execution_time: Duration, ) -> Self { Self { data, call_requests, next_peer_pks, + memory_delta, + execution_time, } } - pub(crate) fn from_raw_outcome(raw_outcome: RawAVMOutcome) -> AVMResult { + pub(crate) fn from_raw_outcome( + raw_outcome: RawAVMOutcome, + memory_delta: usize, + execution_time: Duration, + ) -> AVMResult { use air_interpreter_interface::INTERPRETER_SUCCESS; let RawAVMOutcome { @@ -66,7 +83,13 @@ impl AVMOutcome { next_peer_pks, } = raw_outcome; - let avm_outcome = AVMOutcome::new(data, call_requests, next_peer_pks); + let avm_outcome = AVMOutcome::new( + data, + call_requests, + next_peer_pks, + memory_delta, + execution_time, + ); if ret_code == INTERPRETER_SUCCESS { return Ok(avm_outcome); diff --git a/avm/server/src/interface/particle_parameters.rs b/avm/server/src/interface/particle_parameters.rs index ef790df1..e4549a51 100644 --- a/avm/server/src/interface/particle_parameters.rs +++ b/avm/server/src/interface/particle_parameters.rs @@ -14,9 +14,12 @@ * limitations under the License. */ +use serde::Deserialize; +use serde::Serialize; use std::borrow::Cow; /// Represents parameters obtained from a particle. +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ParticleParameters<'init_peer_id, 'particle_id> { pub init_peer_id: Cow<'init_peer_id, String>, pub particle_id: Cow<'particle_id, String>, diff --git a/avm/server/src/lib.rs b/avm/server/src/lib.rs index 8719791a..511acb1e 100644 --- a/avm/server/src/lib.rs +++ b/avm/server/src/lib.rs @@ -52,9 +52,10 @@ pub use fluence_faas::IValue; pub use polyplets::SecurityTetraplet; +pub use avm_data_store::AnomalyData; pub use avm_data_store::DataStore; -pub type AVMDataStore = Box + Send + Sync + 'static>; +pub type AVMDataStore = Box + Send + Sync + 'static>; pub type AVMResult = std::result::Result>; diff --git a/crates/data-store/Cargo.toml b/crates/data-store/Cargo.toml index 8a79c688..df00786c 100644 --- a/crates/data-store/Cargo.toml +++ b/crates/data-store/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "avm-data-store" -version = "0.1.0" +version = "0.2.0" description = "Definition of the AVM DataStore trait" authors = ["Fluence Labs"] edition = "2018" @@ -14,3 +14,6 @@ categories = ["wasm"] [lib] name = "avm_data_store" path = "src/lib.rs" + +[dependencies] +serde = "1.0" diff --git a/crates/data-store/src/lib.rs b/crates/data-store/src/lib.rs index b92fa59e..b9ab19fc 100644 --- a/crates/data-store/src/lib.rs +++ b/crates/data-store/src/lib.rs @@ -1,5 +1,5 @@ /* - * Copyright 2021 Fluence Labs Limited + * Copyright 2022 Fluence Labs Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,13 +14,65 @@ * limitations under the License. */ -/// This trait should be used to persist prev_data between successive calls of an interpreter o. -pub trait DataStore { - fn initialize(&mut self) -> Result<(), E>; +use serde::Deserialize; +use serde::Serialize; +use std::time::Duration; - fn store_data(&mut self, data: &[u8], key: &str) -> Result<(), E>; +/// This trait is used for +/// - persisting prev_data between successive calls of an interpreter +/// - logging previous, current, and new data in case of spikes +pub trait DataStore { + type Error; - fn read_data(&mut self, key: &str) -> Result, E>; + fn initialize(&mut self) -> Result<(), Self::Error>; - fn cleanup_data(&mut self, key: &str) -> Result<(), E>; + fn store_data(&mut self, data: &[u8], key: &str) -> Result<(), Self::Error>; + + fn read_data(&mut self, key: &str) -> Result, Self::Error>; + + /// Cleanup data that become obsolete. + fn cleanup_data(&mut self, key: &str) -> Result<(), Self::Error>; + + /// Returns true if an anomaly happened and it's necessary to save execution data + /// for debugging purposes. + /// execution_time - is time taken by the interpreter to execute provided script + /// memory_delta - is a count of bytes on which an interpreter heap has been extended + /// during execution of a particle + fn detect_anomaly(&self, execution_time: Duration, memory_delta: usize) -> bool; + + fn collect_anomaly_data( + &mut self, + key: &str, + anomaly_data: AnomalyData<'_>, + ) -> Result<(), Self::Error>; +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct AnomalyData<'data> { + pub particle: &'data [u8], // it's byte because of the restriction on trait objects methods + pub prev_data: &'data [u8], + pub current_data: &'data [u8], + pub avm_outcome: &'data [u8], // it's byte because of the restriction on trait objects methods + pub execution_time: Duration, + pub memory_delta: usize, +} + +impl<'data> AnomalyData<'data> { + pub fn new( + particle: &'data [u8], + prev_data: &'data [u8], + current_data: &'data [u8], + avm_outcome: &'data [u8], + execution_time: Duration, + memory_delta: usize, + ) -> Self { + Self { + particle, + prev_data, + current_data, + avm_outcome, + execution_time, + memory_delta, + } + } }