Add API to save anomaly data (#277)

This PR adds two new methods in DataStore to determine anomaly and collect necessary data. Additionally
    - a generic parameter of DataStore turned to be associative
    - AVMOutcome contains additionally memory_delta and execution_time
This commit is contained in:
Mike Voronov 2022-06-23 15:24:00 +03:00 committed by GitHub
parent 090eceef85
commit 3a84ceb863
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 156 additions and 18 deletions

7
Cargo.lock generated
View File

@ -233,11 +233,14 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]] [[package]]
name = "avm-data-store" name = "avm-data-store"
version = "0.1.0" version = "0.2.0"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "avm-server" name = "avm-server"
version = "0.20.2" version = "0.21.0"
dependencies = [ dependencies = [
"air-interpreter-interface", "air-interpreter-interface",
"avm-data-store", "avm-data-store",

View File

@ -1,7 +1,7 @@
[package] [package]
name = "avm-server" name = "avm-server"
description = "Fluence AIR VM" description = "Fluence AIR VM"
version = "0.20.2" version = "0.21.0"
authors = ["Fluence Labs"] authors = ["Fluence Labs"]
edition = "2018" edition = "2018"
license = "Apache-2.0" license = "Apache-2.0"
@ -17,7 +17,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
air-interpreter-interface = { version = "0.10.0", path = "../../crates/air-lib/interpreter-interface" } air-interpreter-interface = { version = "0.10.0", path = "../../crates/air-lib/interpreter-interface" }
avm-data-store = { version = "0.1.0", path = "../../crates/data-store" } avm-data-store = { version = "0.2.0", path = "../../crates/data-store" }
fluence-faas = "0.16.2" fluence-faas = "0.16.2"
polyplets = { version = "0.2.0", path = "../../crates/air-lib/polyplets" } polyplets = { version = "0.2.0", path = "../../crates/air-lib/polyplets" }

View File

@ -21,11 +21,15 @@ use super::AVMMemoryStats;
use super::AVMOutcome; use super::AVMOutcome;
use super::CallResults; use super::CallResults;
use crate::config::AVMConfig; use crate::config::AVMConfig;
use crate::interface::raw_outcome::RawAVMOutcome;
use crate::interface::ParticleParameters; use crate::interface::ParticleParameters;
use crate::AVMResult; use crate::AVMResult;
use avm_data_store::AnomalyData;
use std::ops::Deref; use std::ops::Deref;
use std::ops::DerefMut; use std::ops::DerefMut;
use std::time::Duration;
use std::time::Instant;
/// A newtype needed to mark it as `unsafe impl Send` /// A newtype needed to mark it as `unsafe impl Send`
struct SendSafeRunner(AVMRunner); struct SendSafeRunner(AVMRunner);
@ -81,23 +85,38 @@ impl<E> AVM<E> {
) -> AVMResult<AVMOutcome, E> { ) -> AVMResult<AVMOutcome, E> {
let particle_id = particle_parameters.particle_id.as_str(); let particle_id = particle_parameters.particle_id.as_str();
let prev_data = self.data_store.read_data(particle_id)?; let prev_data = self.data_store.read_data(particle_id)?;
let current_data = data.into();
let execution_start_time = Instant::now();
let memory_size_before = self.memory_stats().memory_size;
let outcome = self let outcome = self
.runner .runner
.call( .call(
air, air,
prev_data, prev_data,
data, current_data.clone(),
particle_parameters.init_peer_id.into_owned(), particle_parameters.init_peer_id.clone().into_owned(),
particle_parameters.timestamp, particle_parameters.timestamp,
particle_parameters.ttl, particle_parameters.ttl,
call_results, call_results,
) )
.map_err(AVMError::RunnerError)?; .map_err(AVMError::RunnerError)?;
let execution_time = execution_start_time.elapsed();
let memory_delta = self.memory_stats().memory_size - memory_size_before;
if self.data_store.detect_anomaly(execution_time, memory_delta) {
self.save_anomaly_data(
&current_data,
&particle_parameters,
&outcome,
execution_time,
memory_delta,
)?;
}
// persist resulted data // persist resulted data
self.data_store.store_data(&outcome.data, particle_id)?; self.data_store.store_data(&outcome.data, particle_id)?;
let outcome = AVMOutcome::from_raw_outcome(outcome)?; let outcome = AVMOutcome::from_raw_outcome(outcome, memory_delta, execution_time)?;
Ok(outcome) Ok(outcome)
} }
@ -112,4 +131,34 @@ impl<E> AVM<E> {
pub fn memory_stats(&self) -> AVMMemoryStats { pub fn memory_stats(&self) -> AVMMemoryStats {
self.runner.memory_stats() self.runner.memory_stats()
} }
fn save_anomaly_data(
&mut self,
current_data: &[u8],
particle_parameters: &ParticleParameters<'_, '_>,
avm_outcome: &RawAVMOutcome,
execution_time: Duration,
memory_delta: usize,
) -> AVMResult<(), E> {
let prev_data = self
.data_store
.read_data(particle_parameters.particle_id.as_str())?;
let ser_particle =
serde_json::to_vec(particle_parameters).map_err(AVMError::AnomalyDataSeError)?;
let ser_avm_outcome =
serde_json::to_vec(avm_outcome).map_err(AVMError::AnomalyDataSeError)?;
let anomaly_data = AnomalyData::new(
&ser_particle,
&prev_data,
&current_data,
&ser_avm_outcome,
execution_time,
memory_delta,
);
self.data_store
.collect_anomaly_data(&particle_parameters.particle_id, anomaly_data)
.map_err(Into::into)
}
} }

View File

@ -38,6 +38,10 @@ pub enum AVMError<E> {
/// This errors are encountered from a data store object. /// This errors are encountered from a data store object.
#[error(transparent)] #[error(transparent)]
DataStoreError(#[from] E), DataStoreError(#[from] E),
/// This errors are encountered from serialization of data tracked during an anomaly.
#[error(transparent)]
AnomalyDataSeError(SerdeError),
} }
#[derive(Debug, ThisError)] #[derive(Debug, ThisError)]

View File

@ -22,6 +22,8 @@ use crate::AVMResult;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AVMOutcome { pub struct AVMOutcome {
/// Contains script data that should be preserved in an executor of this interpreter /// Contains script data that should be preserved in an executor of this interpreter
@ -33,6 +35,13 @@ pub struct AVMOutcome {
/// Public keys of peers that should receive data. /// Public keys of peers that should receive data.
pub next_peer_pks: Vec<String>, pub next_peer_pks: Vec<String>,
/// Memory in bytes AVM linear heap was extended during execution by.
pub memory_delta: usize,
/// Time of a particle execution
/// (it counts only execution time without operations with DataStore and so on)
pub execution_time: Duration,
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -47,15 +56,23 @@ impl AVMOutcome {
data: Vec<u8>, data: Vec<u8>,
call_requests: CallRequests, call_requests: CallRequests,
next_peer_pks: Vec<String>, next_peer_pks: Vec<String>,
memory_delta: usize,
execution_time: Duration,
) -> Self { ) -> Self {
Self { Self {
data, data,
call_requests, call_requests,
next_peer_pks, next_peer_pks,
memory_delta,
execution_time,
} }
} }
pub(crate) fn from_raw_outcome<E>(raw_outcome: RawAVMOutcome) -> AVMResult<Self, E> { pub(crate) fn from_raw_outcome<E>(
raw_outcome: RawAVMOutcome,
memory_delta: usize,
execution_time: Duration,
) -> AVMResult<Self, E> {
use air_interpreter_interface::INTERPRETER_SUCCESS; use air_interpreter_interface::INTERPRETER_SUCCESS;
let RawAVMOutcome { let RawAVMOutcome {
@ -66,7 +83,13 @@ impl AVMOutcome {
next_peer_pks, next_peer_pks,
} = raw_outcome; } = raw_outcome;
let avm_outcome = AVMOutcome::new(data, call_requests, next_peer_pks); let avm_outcome = AVMOutcome::new(
data,
call_requests,
next_peer_pks,
memory_delta,
execution_time,
);
if ret_code == INTERPRETER_SUCCESS { if ret_code == INTERPRETER_SUCCESS {
return Ok(avm_outcome); return Ok(avm_outcome);

View File

@ -14,9 +14,12 @@
* limitations under the License. * limitations under the License.
*/ */
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow; use std::borrow::Cow;
/// Represents parameters obtained from a particle. /// Represents parameters obtained from a particle.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParticleParameters<'init_peer_id, 'particle_id> { pub struct ParticleParameters<'init_peer_id, 'particle_id> {
pub init_peer_id: Cow<'init_peer_id, String>, pub init_peer_id: Cow<'init_peer_id, String>,
pub particle_id: Cow<'particle_id, String>, pub particle_id: Cow<'particle_id, String>,

View File

@ -52,9 +52,10 @@ pub use fluence_faas::IValue;
pub use polyplets::SecurityTetraplet; pub use polyplets::SecurityTetraplet;
pub use avm_data_store::AnomalyData;
pub use avm_data_store::DataStore; pub use avm_data_store::DataStore;
pub type AVMDataStore<E> = Box<dyn DataStore<E> + Send + Sync + 'static>; pub type AVMDataStore<E> = Box<dyn DataStore<Error = E> + Send + Sync + 'static>;
pub type AVMResult<T, E> = std::result::Result<T, AVMError<E>>; pub type AVMResult<T, E> = std::result::Result<T, AVMError<E>>;

View File

@ -1,6 +1,6 @@
[package] [package]
name = "avm-data-store" name = "avm-data-store"
version = "0.1.0" version = "0.2.0"
description = "Definition of the AVM DataStore trait" description = "Definition of the AVM DataStore trait"
authors = ["Fluence Labs"] authors = ["Fluence Labs"]
edition = "2018" edition = "2018"
@ -14,3 +14,6 @@ categories = ["wasm"]
[lib] [lib]
name = "avm_data_store" name = "avm_data_store"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies]
serde = "1.0"

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2021 Fluence Labs Limited * Copyright 2022 Fluence Labs Limited
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -14,13 +14,65 @@
* limitations under the License. * limitations under the License.
*/ */
/// This trait should be used to persist prev_data between successive calls of an interpreter o. use serde::Deserialize;
pub trait DataStore<E> { use serde::Serialize;
fn initialize(&mut self) -> Result<(), E>; use std::time::Duration;
fn store_data(&mut self, data: &[u8], key: &str) -> Result<(), E>; /// This trait is used for
/// - persisting prev_data between successive calls of an interpreter
/// - logging previous, current, and new data in case of spikes
pub trait DataStore {
type Error;
fn read_data(&mut self, key: &str) -> Result<Vec<u8>, E>; fn initialize(&mut self) -> Result<(), Self::Error>;
fn cleanup_data(&mut self, key: &str) -> Result<(), E>; fn store_data(&mut self, data: &[u8], key: &str) -> Result<(), Self::Error>;
fn read_data(&mut self, key: &str) -> Result<Vec<u8>, Self::Error>;
/// Cleanup data that become obsolete.
fn cleanup_data(&mut self, key: &str) -> Result<(), Self::Error>;
/// Returns true if an anomaly happened and it's necessary to save execution data
/// for debugging purposes.
/// execution_time - is time taken by the interpreter to execute provided script
/// memory_delta - is a count of bytes on which an interpreter heap has been extended
/// during execution of a particle
fn detect_anomaly(&self, execution_time: Duration, memory_delta: usize) -> bool;
fn collect_anomaly_data(
&mut self,
key: &str,
anomaly_data: AnomalyData<'_>,
) -> Result<(), Self::Error>;
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct AnomalyData<'data> {
pub particle: &'data [u8], // it's byte because of the restriction on trait objects methods
pub prev_data: &'data [u8],
pub current_data: &'data [u8],
pub avm_outcome: &'data [u8], // it's byte because of the restriction on trait objects methods
pub execution_time: Duration,
pub memory_delta: usize,
}
impl<'data> AnomalyData<'data> {
pub fn new(
particle: &'data [u8],
prev_data: &'data [u8],
current_data: &'data [u8],
avm_outcome: &'data [u8],
execution_time: Duration,
memory_delta: usize,
) -> Self {
Self {
particle,
prev_data,
current_data,
avm_outcome,
execution_time,
memory_delta,
}
}
} }