air-trace util for measuring AquaVM performance with tracing crate.

`air-trace run` subcommand allows to run AquaVM on any data, it allows to define most AquaVM inputs, providing defaults for most of them, and sets up either human-readable or JSON tracing output, the latter can be later processed by `air-trace stats`.

Anomaly data input is also supported, that is useful for slow data investigation.

Native execution mode can be used for native profiling.  Please note, however, that current version cannot be built natively on Apple Sillicon processor yet, as invariably depends on the `avm-server` because of leaking types that should be refactored or hidden.  The `--repeat` option can repeat the execution several times for the execution to dominate on input data reading and output.

High-level or rare calls have "info" trace level, instructions are "debug", and sub-instruction calls are "tracing".  Over-detailed tracing can induce overhead that spoils timing data.
This commit is contained in:
Ivan Boldyrev 2022-07-07 14:44:58 +03:00 committed by GitHub
parent 4201ecce6b
commit c3cea695c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1384 additions and 45 deletions

177
Cargo.lock generated
View File

@ -24,6 +24,7 @@ dependencies = [
"air-parser",
"air-test-utils",
"air-trace-handler",
"air-utils",
"boolinator",
"concat-idents",
"criterion",
@ -43,6 +44,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror",
"tracing",
"wasm-bindgen",
]
@ -78,6 +80,8 @@ dependencies = [
"marine-rs-sdk",
"serde",
"serde_json",
"tracing",
"tracing-subscriber",
"wasm-bindgen",
]
@ -85,10 +89,12 @@ dependencies = [
name = "air-interpreter-data"
version = "0.2.2"
dependencies = [
"air-utils",
"once_cell",
"semver 1.0.7",
"serde",
"serde_json",
"tracing",
]
[[package]]
@ -150,6 +156,7 @@ dependencies = [
"serde",
"serde_json",
"thiserror",
"tracing",
]
[[package]]
@ -166,6 +173,22 @@ dependencies = [
"serde_json",
]
[[package]]
name = "air-trace"
version = "0.1.0"
dependencies = [
"air",
"air-interpreter-interface",
"air-test-utils",
"anyhow",
"avm-server",
"clap 3.1.18",
"itertools 0.10.3",
"serde",
"serde_json",
"tracing-subscriber",
]
[[package]]
name = "air-trace-handler"
version = "0.1.0"
@ -176,8 +199,13 @@ dependencies = [
"log",
"serde_json",
"thiserror",
"tracing",
]
[[package]]
name = "air-utils"
version = "0.1.0"
[[package]]
name = "ansi_term"
version = "0.11.0"
@ -233,16 +261,19 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "avm-data-store"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"serde",
"serde_bytes",
"serde_json",
]
[[package]]
name = "avm-server"
version = "0.22.0"
version = "0.23.0"
dependencies = [
"air-interpreter-interface",
"air-utils",
"avm-data-store",
"eyre",
"fluence-faas",
@ -253,6 +284,7 @@ dependencies = [
"serde",
"serde_json",
"thiserror",
"tracing",
]
[[package]]
@ -386,7 +418,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"time",
"time 0.1.44",
"winapi",
]
@ -1461,6 +1493,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cff7a23a7f3925a712c34dfb9cd87994012d7743f016fd1533e12ab5a8335ca"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "memchr"
version = "2.4.1"
@ -1563,6 +1604,15 @@ dependencies = [
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]]
name = "object-pool"
version = "0.5.4"
@ -1574,9 +1624,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.10.0"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225"
[[package]]
name = "oorandom"
@ -1727,6 +1777,12 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db8bcd96cb740d03149cbad5518db9fd87126a10ab519c011893b1754134c468"
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "plotters"
version = "0.3.0"
@ -1925,6 +1981,9 @@ name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
@ -2046,9 +2105,9 @@ dependencies = [
[[package]]
name = "serde_bytes"
version = "0.11.5"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9"
checksum = "212e73464ebcde48d723aa02eb270ba62eff38a9b732df31f33f1b4e145f3a54"
dependencies = [
"serde",
]
@ -2076,9 +2135,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.79"
version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95"
checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7"
dependencies = [
"itoa 1.0.1",
"ryu",
@ -2108,6 +2167,15 @@ dependencies = [
"syn",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "siphasher"
version = "0.3.10"
@ -2241,6 +2309,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.1.44"
@ -2252,6 +2329,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "time"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72c91f41dcb2f096c05f0873d667dceec1087ce5bcf984ec8ffb19acddbb3217"
dependencies = [
"itoa 1.0.1",
"libc",
"num_threads",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
@ -2280,6 +2368,69 @@ dependencies = [
"serde",
]
[[package]]
name = "tracing"
version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7709595b8878a4965ce5e87ebf880a7d39c9afc6837721b21a5a816a8117d921"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596"
dependencies = [
"lazy_static",
"matchers",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"time 0.3.11",
"tracing",
"tracing-core",
"tracing-serde",
]
[[package]]
name = "typenum"
version = "1.15.0"
@ -2343,6 +2494,12 @@ dependencies = [
"getrandom 0.2.6",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "variant_count"
version = "1.1.0"
@ -2627,7 +2784,7 @@ dependencies = [
"log",
"serde",
"thiserror",
"time",
"time 0.1.44",
"typetag",
"wasmer-runtime-core-fl",
"winapi",

View File

@ -13,9 +13,11 @@ members = [
"crates/air-lib/polyplets",
"crates/air-lib/test-utils",
"crates/air-lib/trace-handler",
"crates/air-lib/utils",
"crates/beautifier",
"crates/data-store",
"tools/cli/air-beautify",
"tools/cli/air-trace",
]
exclude = [

View File

@ -29,6 +29,9 @@ wasm-bindgen = "=0.2.65"
log = "0.4.11"
serde = { version = "1.0.118", features = [ "derive", "rc" ] }
serde_json = "1.0.61"
tracing = "0.1.35"
# exclude tracing-log feature that interferes with the log crate:
tracing-subscriber = { version = "0.3.11", default-features = false, features = [ "env-filter", "json", "smallvec", "time", "fmt" ] }
[features]
# indicates that this library should be compiled for the marine target,

View File

@ -32,3 +32,19 @@ pub fn init_logger(default_level: Option<LevelFilter>) {
builder.build().unwrap();
}
#[allow(dead_code)]
// TODO it worth moving it to marine_rs_sdk
pub fn init_tracing(tracing_params: String, trace_mode: u8) {
use tracing_subscriber::fmt::format::FmtSpan;
let builder = tracing_subscriber::fmt()
.with_env_filter(tracing_params)
.with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE);
if trace_mode == 0 {
builder.json().init();
} else {
// Human-readable output.
builder.init();
}
}

View File

@ -52,6 +52,20 @@ pub fn invoke(
execute_air(air, prev_data, data, params, call_results)
}
#[marine]
pub fn invoke_tracing(
air: String,
prev_data: Vec<u8>,
data: Vec<u8>,
params: RunParameters,
call_results: Vec<u8>,
tracing_params: String,
tracing_output_mode: u8,
) -> InterpreterOutcome {
logger::init_tracing(tracing_params, tracing_output_mode);
execute_air(air, prev_data, data, params, call_results)
}
#[marine]
pub fn ast(script: String) -> String {
ast::ast(script)

View File

@ -41,6 +41,7 @@ pub const DEFAULT_LOG_LEVEL: LevelFilter = LevelFilter::Trace;
pub fn main() {
// it's necessary to initialize it with the minimal allowed log level,
// because otherwise it's impossible to set less level than used during initialization.
const MINIMAL_LOG_LEVEL: LevelFilter = LevelFilter::Trace;
logger::init_logger(Some(MINIMAL_LOG_LEVEL));

View File

@ -23,6 +23,7 @@ air-log-targets = { path = "../crates/air-lib/log-targets" }
air-lambda-ast = { path = "../crates/air-lib/lambda/ast" }
air-lambda-parser = { path = "../crates/air-lib/lambda/parser" }
air-trace-handler = { path = "../crates/air-lib/trace-handler" }
air-utils = { version = "0.1.0", path = "../crates/air-lib/utils" }
polyplets = { path = "../crates/air-lib/polyplets" }
marine-rs-sdk = { version = "0.6.15", features = ["logger"] }
@ -38,6 +39,7 @@ fstrings = "0.2.3"
thiserror = "1.0.23"
strum = "0.21"
strum_macros = "0.21"
tracing = "0.1.35"
# Keep 0.2.65 until this is resolved https://github.com/rustwasm/wasm-pack/issues/886
wasm-bindgen = "=0.2.65"

View File

@ -37,6 +37,7 @@ use air_trace_handler::MergerApResult;
use std::rc::Rc;
impl<'i> super::ExecutableInstruction<'i> for Ap<'i> {
#[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))]
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(call, exec_ctx, trace_ctx);
let should_touch_trace = should_touch_trace(self);

View File

@ -35,6 +35,7 @@ use air_parser::ast::Call;
use std::rc::Rc;
impl<'i> super::ExecutableInstruction<'i> for Call<'i> {
#[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))]
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(call, exec_ctx, trace_ctx);
exec_ctx.tracker.meet_call();

View File

@ -32,6 +32,7 @@ use air_interpreter_interface::CallRequestParams;
use air_parser::ast;
use air_trace_handler::MergerCallResult;
use air_trace_handler::TraceHandler;
use air_utils::measure;
use std::rc::Rc;
@ -51,6 +52,7 @@ struct ResolvedArguments {
impl<'i> ResolvedCall<'i> {
/// Build `ResolvedCall` from `Call` by transforming `PeerPart` & `FunctionPart` into `ResolvedTriplet`.
#[tracing::instrument(level = "trace", skip_all)]
pub(super) fn new(raw_call: &Call<'i>, exec_ctx: &ExecutionCtx<'i>) -> ExecutionResult<Self> {
let triplet = resolve(&raw_call.triplet, exec_ctx)?;
let tetraplet = SecurityTetraplet::from_triplet(triplet);
@ -66,6 +68,7 @@ impl<'i> ResolvedCall<'i> {
}
/// Executes resolved instruction, updates contexts based on a execution_step result.
#[tracing::instrument(level = "trace", skip_all)]
pub(super) fn execute(
&self,
raw_call: &Call<'i>,
@ -119,6 +122,7 @@ impl<'i> ResolvedCall<'i> {
self.tetraplet.clone()
}
#[tracing::instrument(level = "trace", skip_all)]
fn prepare_request_params(
&self,
exec_ctx: &ExecutionCtx<'_>,
@ -129,7 +133,11 @@ impl<'i> ResolvedCall<'i> {
tetraplets,
} = self.resolve_args(exec_ctx)?;
let serialized_tetraplets = serde_json::to_string(&tetraplets).expect("default serializer shouldn't fail");
let serialized_tetraplets = measure!(
serde_json::to_string(&tetraplets).expect("default serializer shouldn't fail"),
tracing::Level::INFO,
"serde_json::to_string(tetraplets)",
);
let request_params = CallRequestParams::new(
tetraplet.service_id.to_string(),

View File

@ -22,6 +22,7 @@ use crate::JValue;
use air_parser::ast;
#[tracing::instrument(skip_all)]
pub(crate) fn are_matchable_eq<'ctx>(
left: &ast::Value<'_>,
right: &ast::Value<'_>,

View File

@ -87,6 +87,7 @@ pub(crate) fn prepare_last_error<'i>(
Ok((jvalue.clone(), tetraplets))
}
#[tracing::instrument(level = "trace", skip(ctx))]
pub(crate) fn resolve_variable<'ctx, 'i>(
variable: Variable<'_>,
ctx: &'ctx ExecutionCtx<'i>,
@ -113,6 +114,7 @@ pub(crate) fn resolve_variable<'ctx, 'i>(
}
}
#[tracing::instrument(level = "trace", skip(exec_ctx))]
pub(crate) fn resolve_ast_variable_wl<'ctx, 'i>(
ast_variable: &ast::VariableWithLambda<'_>,
exec_ctx: &'ctx ExecutionCtx<'i>,
@ -131,6 +133,7 @@ pub(crate) fn resolve_ast_variable_wl<'ctx, 'i>(
}
}
#[tracing::instrument(level = "trace", skip(exec_ctx))]
pub(crate) fn resolve_ast_scalar_wl<'ctx, 'i>(
ast_scalar: &ast::ScalarWithLambda<'_>,
exec_ctx: &'ctx ExecutionCtx<'i>,
@ -140,6 +143,7 @@ pub(crate) fn resolve_ast_scalar_wl<'ctx, 'i>(
resolve_ast_variable_wl(&variable, exec_ctx)
}
#[tracing::instrument(level = "trace", skip(exec_ctx))]
pub(crate) fn apply_lambda<'i>(
variable: Variable<'_>,
lambda: &LambdaAST<'i>,

View File

@ -23,12 +23,15 @@ use crate::INTERPRETER_SUCCESS;
use air_interpreter_data::InterpreterData;
use air_interpreter_interface::CallRequests;
use air_utils::measure;
use std::fmt::Debug;
use std::hash::Hash;
use std::rc::Rc;
/// Create InterpreterOutcome from supplied execution context and trace handler,
/// set ret_code to INTERPRETER_SUCCESS.
#[tracing::instrument(skip_all)]
pub(crate) fn from_success_result(
exec_ctx: ExecutionCtx<'_>,
trace_handler: TraceHandler,
@ -46,9 +49,10 @@ pub(crate) fn from_success_result(
/// Create InterpreterOutcome from supplied data and error,
/// set ret_code based on the error.
#[tracing::instrument]
pub(crate) fn from_uncatchable_error(
data: impl Into<Vec<u8>>,
error: impl ToErrorCode + ToString,
data: impl Into<Vec<u8>> + Debug,
error: impl ToErrorCode + ToString + Debug,
) -> InterpreterOutcome {
let ret_code = error.to_error_code();
let data = data.into();
@ -65,14 +69,16 @@ pub(crate) fn from_uncatchable_error(
/// Create InterpreterOutcome from supplied execution context, trace handler, and error,
/// set ret_code based on the error.
#[tracing::instrument(skip(exec_ctx, trace_handler))]
pub(crate) fn from_execution_error(
exec_ctx: ExecutionCtx<'_>,
trace_handler: TraceHandler,
error: impl ToErrorCode + ToString,
error: impl ToErrorCode + ToString + Debug,
) -> InterpreterOutcome {
populate_outcome_from_contexts(exec_ctx, trace_handler, error.to_error_code(), error.to_string())
}
#[tracing::instrument(skip(exec_ctx, trace_handler))]
fn populate_outcome_from_contexts(
exec_ctx: ExecutionCtx<'_>,
trace_handler: TraceHandler,
@ -86,9 +92,17 @@ fn populate_outcome_from_contexts(
restricted_streams,
exec_ctx.last_call_request_id,
);
let data = serde_json::to_vec(&data).expect("default serializer shouldn't fail");
let data = measure!(
serde_json::to_vec(&data).expect("default serializer shouldn't fail"),
tracing::Level::INFO,
"serde_json::to_vec(data)"
);
let next_peer_pks = dedup(exec_ctx.next_peer_pks);
let call_requests = serde_json::to_vec(&exec_ctx.call_requests).expect("default serializer shouldn't fail");
let call_requests = measure!(
serde_json::to_vec(&exec_ctx.call_requests).expect("default serializer shouldn't fail"),
tracing::Level::INFO,
"serde_json::to_vec(call_results)",
);
InterpreterOutcome {
ret_code,
@ -100,7 +114,7 @@ fn populate_outcome_from_contexts(
}
/// Deduplicate values in a supplied vector.
fn dedup<T: Eq + Hash>(mut vec: Vec<T>) -> Vec<T> {
fn dedup<T: Eq + Hash + Debug>(mut vec: Vec<T>) -> Vec<T> {
use std::collections::HashSet;
let set: HashSet<_> = vec.drain(..).collect();

View File

@ -33,6 +33,7 @@ pub(crate) struct PreparationDescriptor<'ctx, 'i> {
}
/// Parse and prepare supplied data and AIR script.
#[tracing::instrument(skip_all)]
pub(crate) fn prepare<'i>(
prev_data: &[u8],
current_data: &[u8],
@ -63,6 +64,7 @@ fn try_to_data(raw_data: &[u8]) -> PreparationResult<InterpreterData> {
InterpreterData::try_from_slice(raw_data).map_err(|err| DataDeFailed(err, raw_data.to_vec()))
}
#[tracing::instrument(skip_all)]
fn make_exec_ctx(
prev_data: &InterpreterData,
call_results: &[u8],

View File

@ -22,6 +22,7 @@ use crate::preparation_step::PreparationDescriptor;
use air_interpreter_interface::InterpreterOutcome;
use air_interpreter_interface::RunParameters;
use air_log_targets::RUN_PARAMS;
use air_utils::measure;
pub fn execute_air(
air: String,
@ -64,7 +65,12 @@ fn execute_air_impl(
// match here is used instead of map_err, because the compiler can't determine that
// they are exclusive and would treat exec_ctx and trace_handler as moved
match air.execute(&mut exec_ctx, &mut trace_handler) {
let exec_result = measure!(
air.execute(&mut exec_ctx, &mut trace_handler),
tracing::Level::INFO,
"execute",
);
match exec_result {
Ok(_) => farewell::from_success_result(exec_ctx, trace_handler),
// return new collected trace in case of errors
Err(error) if error.is_catchable() => Err(farewell::from_execution_error(exec_ctx, trace_handler, error)),

View File

@ -1,7 +1,7 @@
[package]
name = "avm-server"
description = "Fluence AIR VM"
version = "0.22.0"
version = "0.23.0"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"
@ -17,7 +17,8 @@ path = "src/lib.rs"
[dependencies]
air-interpreter-interface = { version = "0.10.0", path = "../../crates/air-lib/interpreter-interface" }
avm-data-store = { version = "0.3.0", path = "../../crates/data-store" }
air-utils = { version = "0.1.0", path = "../../crates/air-lib/utils" }
avm-data-store = { version = "0.4.0", path = "../../crates/data-store" }
fluence-faas = "0.16.2"
polyplets = { version = "0.2.0", path = "../../crates/air-lib/polyplets" }
@ -28,3 +29,4 @@ serde_json = "1.0.61"
serde = "1.0.118"
log = "0.4.14"
parking_lot = "0.11.1"
tracing = "0.1.35"

View File

@ -84,7 +84,7 @@ impl<E> AVM<E> {
call_results: CallResults,
) -> AVMResult<AVMOutcome, E> {
let air = air.into();
let particle_id = particle_parameters.particle_id.as_str();
let particle_id = particle_parameters.particle_id.as_ref();
let prev_data = self.data_store.read_data(particle_id)?;
let current_data = data.into();
@ -145,7 +145,7 @@ impl<E> AVM<E> {
) -> AVMResult<(), E> {
let prev_data = self
.data_store
.read_data(particle_parameters.particle_id.as_str())?;
.read_data(&particle_parameters.particle_id)?;
let ser_particle =
serde_json::to_vec(particle_parameters).map_err(AVMError::AnomalyDataSeError)?;
let ser_avm_outcome =

View File

@ -48,7 +48,7 @@ impl CallServiceResult {
}
}
pub(crate) fn into_raw(self) -> air_interpreter_interface::CallServiceResult {
pub fn into_raw(self) -> air_interpreter_interface::CallServiceResult {
let CallServiceResult { ret_code, result } = self;
air_interpreter_interface::CallServiceResult {
@ -58,6 +58,7 @@ impl CallServiceResult {
}
}
#[tracing::instrument(level = "debug", skip(call_results))]
pub fn into_raw_result(call_results: CallResults) -> air_interpreter_interface::CallResults {
call_results
.into_iter()

View File

@ -21,16 +21,16 @@ use std::borrow::Cow;
/// Represents parameters obtained from a particle.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParticleParameters<'init_peer_id, 'particle_id> {
pub init_peer_id: Cow<'init_peer_id, String>,
pub particle_id: Cow<'particle_id, String>,
pub init_peer_id: Cow<'init_peer_id, str>,
pub particle_id: Cow<'particle_id, str>,
pub timestamp: u64,
pub ttl: u32,
}
impl<'init_peer_id, 'particle_id> ParticleParameters<'init_peer_id, 'particle_id> {
pub fn new(
init_peer_id: Cow<'init_peer_id, String>,
particle_id: Cow<'particle_id, String>,
init_peer_id: Cow<'init_peer_id, str>,
particle_id: Cow<'particle_id, str>,
timestamp: u64,
ttl: u32,
) -> Self {

View File

@ -20,6 +20,7 @@ use crate::RunnerError;
use crate::RunnerResult;
use air_interpreter_interface::InterpreterOutcome;
use air_utils::measure;
use fluence_faas::FaaSConfig;
use fluence_faas::FluenceFaaS;
use fluence_faas::IValue;
@ -66,6 +67,7 @@ impl AVMRunner {
Ok(avm)
}
#[tracing::instrument(skip_all)]
pub fn call(
&mut self,
air: impl Into<String>,
@ -87,9 +89,59 @@ impl AVMRunner {
call_results,
);
let result =
let result = measure!(
self.faas
.call_with_ivalues(&self.wasm_filename, "invoke", &args, <_>::default())?;
.call_with_ivalues(&self.wasm_filename, "invoke", &args, <_>::default())?,
tracing::Level::INFO,
"faas.call_with_ivalues",
method = "invoke",
);
let result = try_as_one_value_vec(result)?;
let outcome = InterpreterOutcome::from_ivalue(result)
.map_err(RunnerError::InterpreterResultDeError)?;
let outcome = RawAVMOutcome::from_interpreter_outcome(outcome)?;
Ok(outcome)
}
#[tracing::instrument(skip_all)]
pub fn call_tracing(
&mut self,
air: impl Into<String>,
prev_data: impl Into<Vec<u8>>,
data: impl Into<Vec<u8>>,
init_peer_id: impl Into<String>,
timestamp: u64,
ttl: u32,
call_results: CallResults,
tracing_params: String,
tracing_output_mode: u8,
) -> RunnerResult<RawAVMOutcome> {
let mut args = prepare_args(
air,
prev_data,
data,
self.current_peer_id.clone(),
init_peer_id.into(),
timestamp,
ttl,
call_results,
);
args.push(IValue::String(tracing_params));
args.push(IValue::U8(tracing_output_mode));
let result = measure!(
self.faas.call_with_ivalues(
&self.wasm_filename,
"invoke_tracing",
&args,
<_>::default(),
)?,
tracing::Level::INFO,
"faas.call_with_ivalues",
method = "invoke_tracing",
);
let result = try_as_one_value_vec(result)?;
let outcome = InterpreterOutcome::from_ivalue(result)
@ -117,6 +169,7 @@ impl AVMRunner {
}
}
#[tracing::instrument(skip(air, prev_data, data, call_results))]
fn prepare_args(
air: impl Into<String>,
prev_data: impl Into<Vec<u8>>,
@ -136,8 +189,11 @@ fn prepare_args(
.into_ivalue();
let call_results = crate::interface::into_raw_result(call_results);
let call_results =
serde_json::to_vec(&call_results).expect("the default serializer shouldn't fail");
let call_results = measure!(
serde_json::to_vec(&call_results).expect("the default serializer shouldn't fail"),
tracing::Level::INFO,
"serde_json::to_vec call_results"
);
vec![
IValue::String(air.into()),

View File

@ -27,8 +27,8 @@ serde = { version = "1.0.118", features = ["rc", "derive"] }
serde_json = "1.0.61"
itertools = "0.10.0"
thiserror = "1.0.23"
tracing = "0.1.35"
[dev-dependencies]
fstrings = "0.2.3"

View File

@ -33,6 +33,7 @@ use lalrpop_util::{ErrorRecovery, ParseError};
thread_local!(static PARSER: AIRParser = AIRParser::new());
/// Parse AIR `source_code` to `Box<Instruction>`
#[tracing::instrument(skip_all)]
pub fn parse(air_script: &str) -> Result<Box<Instruction<'_>>, String> {
let mut files = SimpleFiles::new();
let file_id = files.add("script.air", air_script);

View File

@ -14,7 +14,10 @@ name = "air_interpreter_data"
path = "src/lib.rs"
[dependencies]
air-utils = { path = "../utils" }
serde = {version = "1.0.118", features = ["derive", "rc"]}
serde_json = "1.0.61"
semver = { version = "1.0.3", features = ["serde"] }
once_cell = "1.8.0"
tracing = "0.1.35"

View File

@ -18,6 +18,8 @@ use super::GlobalStreamGens;
use super::RestrictedStreamGens;
use super::DATA_FORMAT_VERSION;
use crate::ExecutionTrace;
use air_utils::measure;
use serde::Deserialize;
use serde::Serialize;
use std::ops::Deref;
@ -63,6 +65,7 @@ impl InterpreterData {
}
}
#[tracing::instrument(skip_all)]
pub fn from_execution_result(
trace: ExecutionTrace,
streams: GlobalStreamGens,
@ -86,7 +89,11 @@ impl InterpreterData {
return Ok(Self::default());
}
serde_json::from_slice(slice)
measure!(
serde_json::from_slice(slice),
tracing::Level::INFO,
"serde_json::from_slice"
)
}
}

View File

@ -21,3 +21,4 @@ air-parser = { path = "../air-parser" }
serde_json = "1.0.68"
log = "0.4.14"
thiserror = "1.0.29"
tracing = "0.1.35"

View File

@ -0,0 +1,16 @@
[package]
name = "air-utils"
version = "0.1.0"
description = "AIR helper funcitions and macros"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"
publish = true # this crate is used by avm server that in its turn is used by the node
keywords = ["fluence", "air", "webassembly", "programming-language"]
categories = ["wasm"]
[lib]
name = "air_utils"
path = "src/lib.rs"
[dependencies]

View File

@ -0,0 +1,39 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#[macro_export]
macro_rules! measure {
(target: $target:expr, $expr:expr, $span:literal) => ({
let span = tracing::span!(target=$target, $span);
let _enter = span.enter();
$expr
});
($expr:expr, $level:expr, $span:literal, $($fields:tt)*) => ({
let span = tracing::span!($level, $span, $($fields)*);
let _enter = span.enter();
$expr
});
($expr:expr, $level:expr, $span:literal) => ({
let span = tracing::span!($level, $span);
let _enter = span.enter();
$expr
});
($expr:expr, $span:literal) => ({
let span = tracing::span!($span);
let _enter = span.enter();
$expr
});
}

View File

@ -1,6 +1,6 @@
[package]
name = "avm-data-store"
version = "0.3.0"
version = "0.4.0"
description = "Definition of the AVM DataStore trait"
authors = ["Fluence Labs"]
edition = "2018"
@ -17,3 +17,7 @@ path = "src/lib.rs"
[dependencies]
serde = { version = "1.0.118", features = ["derive"] }
serde_bytes = "0.11.6"
[dev-dependencies]
serde_json = "1.0.79"

View File

@ -16,6 +16,7 @@
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::time::Duration;
/// This trait is used for
@ -47,13 +48,18 @@ pub trait DataStore {
) -> Result<(), Self::Error>;
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AnomalyData<'data> {
pub air_script: &'data str,
pub particle: &'data [u8], // it's byte because of the restriction on trait objects methods
pub prev_data: &'data [u8],
pub current_data: &'data [u8],
pub avm_outcome: &'data [u8], // it's byte because of the restriction on trait objects methods
#[serde(borrow)]
pub air_script: Cow<'data, str>,
#[serde(borrow, with = "serde_bytes")]
pub particle: Cow<'data, [u8]>, // it's byte because of the restriction on trait objects methods
#[serde(borrow, with = "serde_bytes")]
pub prev_data: Cow<'data, [u8]>,
#[serde(borrow, with = "serde_bytes")]
pub current_data: Cow<'data, [u8]>,
#[serde(borrow, with = "serde_bytes")]
pub avm_outcome: Cow<'data, [u8]>, // it's byte because of the restriction on trait objects methods
pub execution_time: Duration,
pub memory_delta: usize,
}
@ -69,13 +75,97 @@ impl<'data> AnomalyData<'data> {
memory_delta: usize,
) -> Self {
Self {
air_script,
particle,
prev_data,
current_data,
avm_outcome,
air_script: air_script.into(),
particle: particle.into(),
prev_data: prev_data.into(),
current_data: current_data.into(),
avm_outcome: avm_outcome.into(),
execution_time,
memory_delta,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn anomaly_json(
air_script: &str,
particle: &[u8],
prev_data: &[u8],
current_data: &[u8],
avm_outcome: &[u8],
) -> String {
format!(
concat!(
r#"{{"air_script":{air_script},"#,
r#""particle":{particle},"#,
r#""prev_data":{prev_data},"#,
r#""current_data":{current_data},"#,
r#""avm_outcome":{avm_outcome},"#,
r#""execution_time":{{"secs":42,"nanos":0}},"#,
r#""memory_delta":123"#,
r#"}}"#
),
air_script = serde_json::to_string(air_script).unwrap(),
particle = serde_json::to_string(particle).unwrap(),
prev_data = serde_json::to_string(prev_data).unwrap(),
current_data = serde_json::to_string(current_data).unwrap(),
avm_outcome = serde_json::to_string(avm_outcome).unwrap(),
)
}
#[test]
fn anomaly_data_se() {
let anomaly = AnomalyData::new(
"(null)",
br#"{"data":"value"}"#, // not real data
br#"{"trace":[]}"#, // not real data
br#"{"trace":[1,2,3]}"#, // not real data
b"{}", // not real data
Duration::from_secs(42),
123,
);
let json_data = serde_json::to_string(&anomaly).expect("JSON serialize anomaly data");
let expected = anomaly_json(
&anomaly.air_script,
&anomaly.particle,
&anomaly.prev_data,
&anomaly.current_data,
&anomaly.avm_outcome,
);
assert_eq!(json_data, expected);
}
#[test]
fn anomaly_data_de() {
let particle = br#"{"particle":"data"}"#;
let current_data = br#"{"data":"current"}"#;
let prev_data = br#"{"data":"prev"}"#;
let avm_outcome = br#"{"avm":[1,2,3]}"#;
let json_data = anomaly_json(
"(null)",
&particle[..],
&prev_data[..],
&current_data[..],
&avm_outcome[..],
);
let anomaly: AnomalyData =
serde_json::from_str(&json_data).expect("deserialize JSON anomaly data");
assert_eq!(
anomaly,
AnomalyData::new(
"(null)",
&particle[..],
&prev_data[..],
&current_data[..],
&avm_outcome[..],
Duration::from_secs(42),
123,
)
)
}
}

View File

@ -0,0 +1,26 @@
[package]
name = "air-trace"
version = "0.1.0"
edition = "2021"
description = "AIR performance tracing tool"
authors = ["Fluence Labs"]
license = "Apache-2.0"
publish = false
keywords = ["fluence", "air", "tracing"]
[dependencies]
air = { path = "../../../air" }
air-test-utils = { path = "../../../crates/air-lib/test-utils" }
air-interpreter-interface = { path = "../../../crates/air-lib/interpreter-interface" }
avm-server = { path = "../../../avm/server" }
anyhow = "1.0.56"
clap = { version = "3.1.18", features = ["derive", "env"] }
itertools = "0.10.0"
serde = { version = "1.0.61", features = ["derive"] }
serde_json = "1.0.61"
tracing-subscriber = { version = "0.3.11", default-features = false, features = [ "env-filter", "json", "smallvec", "time", "fmt" ] }
[features]
default = ["wasm"]
wasm = []

View File

@ -0,0 +1,42 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod run;
mod stats;
mod utils;
use clap::Parser;
#[derive(Parser)]
struct Cli {
#[clap(subcommand)]
subcommand: Subcommand,
}
#[derive(clap::Subcommand)]
#[allow(clippy::large_enum_variant)]
enum Subcommand {
Run(crate::run::Args),
Stats(crate::stats::Args),
}
fn main() -> anyhow::Result<()> {
let args = Cli::parse();
match args.subcommand {
Subcommand::Run(args) => crate::run::run(args),
Subcommand::Stats(args) => crate::stats::stats(args),
}
}

View File

@ -0,0 +1,168 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod data;
mod native;
mod runner;
#[cfg(feature = "wasm")]
mod wasm;
use self::runner::AirRunner;
use air_test_utils::CallResults;
use anyhow::Context as _;
use clap::{Parser, Subcommand};
use std::path::{Path, PathBuf};
#[derive(Parser, Debug)]
#[clap(about = "Run AIR script with AquaVM")]
pub(crate) struct Args {
#[clap(long)]
current_peer_id: Option<String>,
#[clap(long = "call_results")]
call_results_path: Option<PathBuf>,
#[clap(long)]
max_heap_size: Option<u64>,
#[clap(long, default_value = "info")]
tracing_params: String,
#[clap(long)]
native: bool,
#[clap(
long = "runtime",
env = "AIR_WASM_RUNTIME_PATH",
default_value = "target/wasm32-wasi/release/air_interpreter_server.wasm"
)]
air_wasm_runtime_path: PathBuf,
#[clap(long, help = "Execute several times; great for native profiling")]
repeat: Option<u32>,
#[clap(long, help = "Output JSON tracing info")]
json: bool,
#[clap(subcommand)]
source: Source,
}
#[derive(Subcommand, Debug)]
#[allow(clippy::large_enum_variant)]
enum Source {
#[clap(name = "--anomaly")]
Anomaly(self::data::anomaly::AnomalyDataArgs),
#[clap(name = "--plain")]
PlainData(self::data::plain::PlainDataArgs),
}
pub(crate) fn run(args: Args) -> anyhow::Result<()> {
let tracing_json = (!args.json) as u8;
init_tracing(args.tracing_params.clone(), tracing_json);
let current_peer_id = args.current_peer_id.as_deref().unwrap_or("some_id");
let mut runner = get_runner(
args.native,
current_peer_id,
&args.air_wasm_runtime_path,
args.max_heap_size,
)?;
let execution_data = match &args.source {
Source::Anomaly(anomaly) => data::anomaly::load(anomaly)?,
Source::PlainData(raw) => data::plain::load(raw)?,
};
let particle = execution_data.particle;
let call_results = read_call_results(args.call_results_path.as_deref())?;
let repeat = args.repeat.unwrap_or(1);
for _ in 0..repeat {
let result = runner
.call_tracing(
execution_data.air_script.clone(),
execution_data.prev_data.clone().into(),
execution_data.current_data.clone().into(),
particle.init_peer_id.clone().into_owned(),
particle.timestamp,
particle.ttl,
call_results.clone(),
args.tracing_params.clone(),
tracing_json,
)
.context("Failed to execute the script")?;
if args.repeat.is_none() {
println!("{:?}", result);
}
}
Ok(())
}
#[cfg(feature = "wasm")]
fn get_runner(
native: bool,
current_peer_id: impl Into<String>,
air_wasm_runtime_path: &Path,
max_heap_size: Option<u64>,
) -> anyhow::Result<Box<dyn AirRunner>> {
if native {
self::native::create_native_avm_runner(current_peer_id)
.context("Failed to instantiate a native AVM")
} else {
self::wasm::create_wasm_avm_runner(current_peer_id, air_wasm_runtime_path, max_heap_size)
.context("Failed to instantiate WASM AVM")
}
}
#[cfg(not(feature = "wasm"))]
fn get_runner(
native: bool,
current_peer_id: impl Into<String>,
air_wasm_runtime_path: &Path,
max_heap_size: Option<u64>,
) -> anyhow::Result<Box<dyn AirRunner>> {
self::native::create_native_avm_runner(current_peer_id)
.context("Failed to instantiate a native AVM")
}
// TODO This is a copy of function from air_interpreter/marine.rs. It should be moved to the marine_rs_sdk.
pub fn init_tracing(tracing_params: String, trace_mode: u8) {
use tracing_subscriber::fmt::format::FmtSpan;
let builder = tracing_subscriber::fmt()
.with_env_filter(tracing_params)
.with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE);
if trace_mode == 0 {
builder.json().init();
} else {
// Human-readable output.
builder.init();
}
}
fn read_call_results(call_results_path: Option<&Path>) -> anyhow::Result<CallResults> {
match call_results_path {
None => Ok(CallResults::default()),
Some(call_results_path) => {
let call_results_json =
load_data(call_results_path).context("failed to read call_results")?;
Ok(serde_json::from_str(&call_results_json)
.context("failed to parse call_results data")?)
}
}
}
fn load_data(data_path: &Path) -> anyhow::Result<String> {
Ok(std::fs::read_to_string(data_path)?)
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionData;
use crate::run::load_data;
use air_test_utils::ParticleParameters;
use avm_server::AnomalyData;
use anyhow::Context;
use clap::Parser;
use std::path::PathBuf;
#[derive(Parser, Debug)]
pub(crate) struct AnomalyDataArgs {
anomaly_data_path: PathBuf,
}
pub(crate) fn load(args: &AnomalyDataArgs) -> anyhow::Result<super::ExecutionData> {
let anomaly_json = load_data(&args.anomaly_data_path).context("Failed to read anomaly data")?;
let anomaly_data: AnomalyData =
serde_json::from_str(&anomaly_json).context("Failed to parse anomaly data")?;
let air_script = anomaly_data.air_script.to_string();
let prev_data = String::from_utf8(anomaly_data.prev_data.to_vec())
.context("Anomaly current_data is not a valid string")?;
let current_data = String::from_utf8(anomaly_data.current_data.to_vec())
.context("Anomaly current_data is not a valid string")?;
let particle: ParticleParameters<'static, 'static> =
serde_json::from_reader(&*anomaly_data.particle.to_vec())
.context("Anomaly particle is not a valid JSON")?;
Ok(ExecutionData {
air_script,
prev_data,
current_data,
particle,
})
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub(crate) mod anomaly;
pub(crate) mod plain;
use avm_server::ParticleParameters;
pub(crate) struct ExecutionData<'ctx> {
pub(crate) air_script: String,
pub(crate) current_data: String,
pub(crate) prev_data: String,
pub(crate) particle: ParticleParameters<'ctx, 'ctx>,
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionData;
use crate::utils::unix_timestamp_now;
use air_test_utils::ParticleParameters;
use anyhow::Context;
use std::path::{Path, PathBuf};
const DEFAULT_DATA: &str =
r#"{"trace":[],"streams":{},"version":"0.2.2","lcid":0,"r_streams":{"$nodes":{}}}"#;
#[derive(clap::Args, Debug)]
pub(crate) struct PlainDataArgs {
#[clap(long)]
init_peer_id: Option<String>,
#[clap(long, help = "default: current time")]
timestamp: Option<u64>,
#[clap(long, help = "default: max possible ttl")]
ttl: Option<u32>,
#[clap(long = "script", help = "read from stdin by default")]
air_script_path: Option<PathBuf>,
#[clap(long = "prev_data")]
prev_data_path: Option<PathBuf>,
#[clap(long = "data")]
data_path: PathBuf,
}
pub(crate) fn load(args: &PlainDataArgs) -> anyhow::Result<ExecutionData> {
use crate::run::load_data;
let air_script =
read_air_script(args.air_script_path.as_deref()).context("failed to read AIR script")?;
let prev_data = match &args.prev_data_path {
None => DEFAULT_DATA.to_owned(),
Some(prev_data_path) => load_data(prev_data_path).context("failed to read prev_data")?,
};
let current_data = load_data(&args.data_path).context("failed to read data")?;
let timestamp = args.timestamp.unwrap_or_else(unix_timestamp_now);
let ttl = args.ttl.unwrap_or(u32::MAX);
let init_peer_id = args.init_peer_id.as_deref().unwrap_or("some_id");
let particle = ParticleParameters::new(init_peer_id.into(), "".into(), timestamp, ttl);
Ok(ExecutionData {
air_script,
prev_data,
current_data,
particle,
})
}
fn read_air_script(air_input: Option<&Path>) -> anyhow::Result<String> {
use std::io::Read;
let air_script = match air_input {
Some(in_path) => std::fs::read_to_string(in_path)?,
None => {
let mut buffer = String::new();
let mut stdin = std::io::stdin().lock();
stdin.read_to_string(&mut buffer)?;
buffer
}
};
Ok(air_script)
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::runner::AirRunner;
use air_interpreter_interface::RunParameters;
struct NativeAvmRunner {
current_peer_id: String,
}
impl AirRunner for NativeAvmRunner {
fn call_tracing(
&mut self,
air: String,
prev_data: Vec<u8>,
data: Vec<u8>,
init_peer_id: String,
timestamp: u64,
ttl: u32,
call_results: air_test_utils::CallResults,
// We use externally configured logger.
_tracing_params: String,
_tracing_output_mode: u8,
) -> anyhow::Result<air_test_utils::RawAVMOutcome> {
use air_test_utils::into_raw_result;
// some inner parts transformations
let raw_call_results = into_raw_result(call_results);
let raw_call_results = serde_json::to_vec(&raw_call_results).unwrap();
let outcome = air::execute_air(
air,
prev_data,
data,
RunParameters {
init_peer_id,
current_peer_id: self.current_peer_id.clone(),
timestamp,
ttl,
},
raw_call_results,
);
let outcome = air_test_utils::RawAVMOutcome::from_interpreter_outcome(outcome)?;
Ok(outcome)
}
}
pub(crate) fn create_native_avm_runner(
current_peer_id: impl Into<String>,
) -> anyhow::Result<Box<dyn AirRunner>> {
Ok(Box::new(NativeAvmRunner {
current_peer_id: current_peer_id.into(),
}))
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use avm_server::avm_runner::*;
use avm_server::CallResults;
pub(crate) trait AirRunner {
fn call_tracing(
&mut self,
air: String,
prev_data: Vec<u8>,
data: Vec<u8>,
init_peer_id: String,
timestamp: u64,
ttl: u32,
call_results: CallResults,
tracing_params: String,
tracing_output_mode: u8,
) -> anyhow::Result<RawAVMOutcome>;
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::runner::AirRunner;
use avm_server::avm_runner::AVMRunner;
use std::path::Path;
pub(crate) struct WasmAvmRunner(AVMRunner);
impl AirRunner for WasmAvmRunner {
fn call_tracing(
&mut self,
air: String,
prev_data: Vec<u8>,
data: Vec<u8>,
init_peer_id: String,
timestamp: u64,
ttl: u32,
call_results: avm_server::CallResults,
tracing_params: String,
tracing_output_mode: u8,
) -> anyhow::Result<air_test_utils::RawAVMOutcome> {
Ok(self.0.call_tracing(
air,
prev_data,
data,
init_peer_id,
timestamp,
ttl,
call_results,
tracing_params,
tracing_output_mode,
)?)
}
}
pub(crate) fn create_wasm_avm_runner(
current_peer_id: impl Into<String>,
air_wasm_runtime_path: &Path,
max_heap_size: Option<u64>,
) -> anyhow::Result<Box<dyn AirRunner>> {
let current_peer_id = current_peer_id.into();
Ok(Box::new(WasmAvmRunner(AVMRunner::new(
air_wasm_runtime_path.to_owned(),
current_peer_id,
max_heap_size,
0,
)?)))
}

View File

@ -0,0 +1,111 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod log_data;
mod report;
use self::log_data::{LogRecord, Message};
use clap::Parser;
#[derive(Parser)]
#[clap(about = "Pretty-print AquaVM JSON trace or provide execution stats")]
pub(crate) struct Args {
#[clap(long)]
pretty: bool,
#[clap(long)]
stats: bool,
#[clap(long)]
sort_stats_by_duration: bool,
}
pub(crate) fn stats(mut args: Args) -> anyhow::Result<()> {
use std::io::Write;
if !args.pretty && !args.stats {
args.pretty = true;
args.stats = true;
}
let stderr = std::io::stderr();
let mut stderr = stderr.lock();
let stdin = std::io::stdin();
let stdin = stdin.lock();
let mut stats = self::report::StatsReport::new();
for rec in read_logs(stdin) {
let rec = rec?;
if args.pretty {
print_log_record(&mut stderr, &rec)?;
}
if args.stats {
stats.consider(rec)?;
}
}
if args.stats {
if args.pretty {
writeln!(stderr)?;
}
stats.report(&mut stderr, args.sort_stats_by_duration)?;
}
Ok(())
}
fn read_logs<R: std::io::BufRead>(input: R) -> impl Iterator<Item = anyhow::Result<LogRecord>> {
input.lines().filter_map(|r| match r {
Ok(line) => {
let line = line.trim();
if line.is_empty() {
None
} else {
Some(serde_json::from_str(line).map_err(anyhow::Error::from))
}
}
Err(err) => Some(Err(err.into())),
})
}
fn print_log_record<W: std::io::Write>(mut out: W, log_record: &LogRecord) -> std::io::Result<()> {
use itertools::Itertools as _;
let val = &log_record.value;
write!(
out,
"{timestamp} {level} ",
timestamp = val.timestamp,
level = val.level,
)?;
if !val.spans.is_empty() {
write!(out, "{spans}", spans = val.spans.iter().join(":"),)?;
}
if matches!(&val.fields, Message::Close(_)) {
if !val.spans.is_empty() {
write!(out, ":")?;
}
write!(out, "{span}", span = log_record.span)?;
}
writeln!(
out,
": {target}: {fields}",
target = log_record.target,
fields = val.fields,
)
}

View File

@ -0,0 +1,117 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use serde::Deserialize;
use std::collections::HashMap;
#[derive(Deserialize)]
pub(crate) struct LogRecord {
pub(crate) target: String,
pub(crate) span: Span,
#[serde(flatten)]
pub(crate) value: LogValue,
}
#[derive(Debug, Deserialize)]
pub(crate) struct Span {
pub(crate) name: String,
#[serde(flatten)]
pub(crate) args: HashMap<String, serde_json::Value>,
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct LogKey {
pub(crate) target: String,
pub(crate) span_name: String,
}
#[derive(Deserialize)]
pub(crate) struct LogValue {
pub(crate) timestamp: String,
pub(crate) fields: Message,
pub(crate) level: String,
pub(crate) spans: Vec<Span>,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "message", rename_all = "lowercase")]
pub(crate) enum Message {
New,
Enter,
Close(CloseMessage),
}
#[derive(Debug, Deserialize)]
pub(crate) struct CloseMessage {
#[serde(rename = "time.busy")]
pub(crate) time_busy: String,
#[serde(rename = "time.idle")]
pub(crate) time_idle: String,
}
impl LogRecord {
pub(crate) fn get_key(&self) -> LogKey {
LogKey {
target: self.target.clone(),
span_name: self.span.name.clone(),
}
}
}
impl std::fmt::Display for Message {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Message::New => write!(f, "new"),
Message::Enter => write!(f, "enter"),
Message::Close(c) => write!(f, "close {}", c),
}
}
}
impl std::fmt::Display for CloseMessage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "idle={}, busy={}", self.time_idle, self.time_busy)
}
}
fn format_argument(val: &serde_json::Value) -> String {
match val {
serde_json::Value::String(s) => s.to_owned(),
_ => val.to_string(),
}
}
impl std::fmt::Display for Span {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
use itertools::Itertools as _;
self.name.fmt(f)?;
if !self.args.is_empty() {
"{".fmt(f)?;
write!(
f,
"{}",
self.args
.iter()
.map(|(k, v)| format!("{}={}", k, format_argument(v)))
.format(", ")
)?;
"}".fmt(f)?;
}
Ok(())
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::log_data::{LogKey, LogRecord, Message};
use crate::utils::parse_tracing_duration;
use std::{collections::HashMap, time::Duration};
#[derive(Default)]
pub(crate) struct StatsReport {
data: HashMap<LogKey, Duration>,
}
impl StatsReport {
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn report<W: std::io::Write>(
self,
mut out: W,
sort_stats_by_duration: bool,
) -> std::io::Result<()> {
writeln!(out, "*** Statistics ***")?;
let mut stats_data: Vec<_> = self.data.into_iter().collect();
if sort_stats_by_duration {
stats_data.sort_unstable_by(|a, b| (a.1, &a.0).cmp(&(b.1, &b.0)).reverse());
} else {
stats_data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
for (k, v) in stats_data {
writeln!(out, "{} {}: {:?}", k.target, k.span_name, v)?;
}
Ok(())
}
pub(crate) fn consider(&mut self, rec: LogRecord) -> anyhow::Result<()> {
if let Message::Close(close) = &rec.value.fields {
let time_busy = parse_tracing_duration(&close.time_busy)?;
*self.data.entry(rec.get_key()).or_default() += time_busy;
}
Ok(())
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::time::Duration;
// unfortunately, external crates don't seem to provide required functionality:
// some do not handle floats, others do not handle suffixes
pub(crate) fn parse_tracing_duration(input: &str) -> Result<Duration, anyhow::Error> {
for (suffix, scale) in [("ns", 1e-9), ("µs", 1e-6), ("ms", 1e-3), ("s", 1e0)] {
if let Some(num_str) = input.strip_suffix(suffix) {
if let Ok(num) = num_str.parse::<f64>() {
return Ok(Duration::from_secs_f64(num * scale));
} else {
break;
}
}
}
return Err(anyhow::anyhow!("malformed duration {:?}", input));
}
pub(crate) fn unix_timestamp_now() -> u64 {
use std::time::SystemTime;
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time befor Unix epoch")
.as_secs()
}