diff --git a/Cargo.lock b/Cargo.lock index 1d2c0218..7197730b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,7 @@ dependencies = [ "air-parser", "air-test-utils", "air-trace-handler", + "air-utils", "boolinator", "concat-idents", "criterion", @@ -43,6 +44,7 @@ dependencies = [ "strum", "strum_macros", "thiserror", + "tracing", "wasm-bindgen", ] @@ -78,6 +80,8 @@ dependencies = [ "marine-rs-sdk", "serde", "serde_json", + "tracing", + "tracing-subscriber", "wasm-bindgen", ] @@ -85,10 +89,12 @@ dependencies = [ name = "air-interpreter-data" version = "0.2.2" dependencies = [ + "air-utils", "once_cell", "semver 1.0.7", "serde", "serde_json", + "tracing", ] [[package]] @@ -150,6 +156,7 @@ dependencies = [ "serde", "serde_json", "thiserror", + "tracing", ] [[package]] @@ -166,6 +173,22 @@ dependencies = [ "serde_json", ] +[[package]] +name = "air-trace" +version = "0.1.0" +dependencies = [ + "air", + "air-interpreter-interface", + "air-test-utils", + "anyhow", + "avm-server", + "clap 3.1.18", + "itertools 0.10.3", + "serde", + "serde_json", + "tracing-subscriber", +] + [[package]] name = "air-trace-handler" version = "0.1.0" @@ -176,8 +199,13 @@ dependencies = [ "log", "serde_json", "thiserror", + "tracing", ] +[[package]] +name = "air-utils" +version = "0.1.0" + [[package]] name = "ansi_term" version = "0.11.0" @@ -233,16 +261,19 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "avm-data-store" -version = "0.3.0" +version = "0.4.0" dependencies = [ "serde", + "serde_bytes", + "serde_json", ] [[package]] name = "avm-server" -version = "0.22.0" +version = "0.23.0" dependencies = [ "air-interpreter-interface", + "air-utils", "avm-data-store", "eyre", "fluence-faas", @@ -253,6 +284,7 @@ dependencies = [ "serde", "serde_json", "thiserror", + "tracing", ] [[package]] @@ -386,7 +418,7 @@ dependencies = [ "libc", "num-integer", "num-traits", - "time", + "time 0.1.44", "winapi", ] @@ -1461,6 +1493,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cff7a23a7f3925a712c34dfb9cd87994012d7743f016fd1533e12ab5a8335ca" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.4.1" @@ -1563,6 +1604,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "object-pool" version = "0.5.4" @@ -1574,9 +1624,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.10.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" +checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" [[package]] name = "oorandom" @@ -1727,6 +1777,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db8bcd96cb740d03149cbad5518db9fd87126a10ab519c011893b1754134c468" +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + [[package]] name = "plotters" version = "0.3.0" @@ -1925,6 +1981,9 @@ name = "regex-automata" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] [[package]] name = "regex-syntax" @@ -2046,9 +2105,9 @@ dependencies = [ [[package]] name = "serde_bytes" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9" +checksum = "212e73464ebcde48d723aa02eb270ba62eff38a9b732df31f33f1b4e145f3a54" dependencies = [ "serde", ] @@ -2076,9 +2135,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.79" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7" dependencies = [ "itoa 1.0.1", "ryu", @@ -2108,6 +2167,15 @@ dependencies = [ "syn", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "siphasher" version = "0.3.10" @@ -2241,6 +2309,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" +dependencies = [ + "once_cell", +] + [[package]] name = "time" version = "0.1.44" @@ -2252,6 +2329,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72c91f41dcb2f096c05f0873d667dceec1087ce5bcf984ec8ffb19acddbb3217" +dependencies = [ + "itoa 1.0.1", + "libc", + "num_threads", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -2280,6 +2368,69 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160" +dependencies = [ + "cfg-if 1.0.0", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7709595b8878a4965ce5e87ebf880a7d39c9afc6837721b21a5a816a8117d921" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596" +dependencies = [ + "lazy_static", + "matchers", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "time 0.3.11", + "tracing", + "tracing-core", + "tracing-serde", +] + [[package]] name = "typenum" version = "1.15.0" @@ -2343,6 +2494,12 @@ dependencies = [ "getrandom 0.2.6", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "variant_count" version = "1.1.0" @@ -2627,7 +2784,7 @@ dependencies = [ "log", "serde", "thiserror", - "time", + "time 0.1.44", "typetag", "wasmer-runtime-core-fl", "winapi", diff --git a/Cargo.toml b/Cargo.toml index d5fa96eb..9bb9ba0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,9 +13,11 @@ members = [ "crates/air-lib/polyplets", "crates/air-lib/test-utils", "crates/air-lib/trace-handler", + "crates/air-lib/utils", "crates/beautifier", "crates/data-store", "tools/cli/air-beautify", + "tools/cli/air-trace", ] exclude = [ diff --git a/air-interpreter/Cargo.toml b/air-interpreter/Cargo.toml index b4f27d29..bfa79967 100644 --- a/air-interpreter/Cargo.toml +++ b/air-interpreter/Cargo.toml @@ -29,6 +29,9 @@ wasm-bindgen = "=0.2.65" log = "0.4.11" serde = { version = "1.0.118", features = [ "derive", "rc" ] } serde_json = "1.0.61" +tracing = "0.1.35" +# exclude tracing-log feature that interferes with the log crate: +tracing-subscriber = { version = "0.3.11", default-features = false, features = [ "env-filter", "json", "smallvec", "time", "fmt" ] } [features] # indicates that this library should be compiled for the marine target, diff --git a/air-interpreter/src/logger.rs b/air-interpreter/src/logger.rs index a7e62581..017603c6 100644 --- a/air-interpreter/src/logger.rs +++ b/air-interpreter/src/logger.rs @@ -32,3 +32,19 @@ pub fn init_logger(default_level: Option) { builder.build().unwrap(); } + +#[allow(dead_code)] +// TODO it worth moving it to marine_rs_sdk +pub fn init_tracing(tracing_params: String, trace_mode: u8) { + use tracing_subscriber::fmt::format::FmtSpan; + + let builder = tracing_subscriber::fmt() + .with_env_filter(tracing_params) + .with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE); + if trace_mode == 0 { + builder.json().init(); + } else { + // Human-readable output. + builder.init(); + } +} diff --git a/air-interpreter/src/marine.rs b/air-interpreter/src/marine.rs index e5fc1f10..27d5f072 100644 --- a/air-interpreter/src/marine.rs +++ b/air-interpreter/src/marine.rs @@ -52,6 +52,20 @@ pub fn invoke( execute_air(air, prev_data, data, params, call_results) } +#[marine] +pub fn invoke_tracing( + air: String, + prev_data: Vec, + data: Vec, + params: RunParameters, + call_results: Vec, + tracing_params: String, + tracing_output_mode: u8, +) -> InterpreterOutcome { + logger::init_tracing(tracing_params, tracing_output_mode); + execute_air(air, prev_data, data, params, call_results) +} + #[marine] pub fn ast(script: String) -> String { ast::ast(script) diff --git a/air-interpreter/src/wasm_bindgen.rs b/air-interpreter/src/wasm_bindgen.rs index 37d0a738..a02d5b6b 100644 --- a/air-interpreter/src/wasm_bindgen.rs +++ b/air-interpreter/src/wasm_bindgen.rs @@ -41,6 +41,7 @@ pub const DEFAULT_LOG_LEVEL: LevelFilter = LevelFilter::Trace; pub fn main() { // it's necessary to initialize it with the minimal allowed log level, // because otherwise it's impossible to set less level than used during initialization. + const MINIMAL_LOG_LEVEL: LevelFilter = LevelFilter::Trace; logger::init_logger(Some(MINIMAL_LOG_LEVEL)); diff --git a/air/Cargo.toml b/air/Cargo.toml index 863c4d65..aacfe4f2 100644 --- a/air/Cargo.toml +++ b/air/Cargo.toml @@ -23,6 +23,7 @@ air-log-targets = { path = "../crates/air-lib/log-targets" } air-lambda-ast = { path = "../crates/air-lib/lambda/ast" } air-lambda-parser = { path = "../crates/air-lib/lambda/parser" } air-trace-handler = { path = "../crates/air-lib/trace-handler" } +air-utils = { version = "0.1.0", path = "../crates/air-lib/utils" } polyplets = { path = "../crates/air-lib/polyplets" } marine-rs-sdk = { version = "0.6.15", features = ["logger"] } @@ -38,6 +39,7 @@ fstrings = "0.2.3" thiserror = "1.0.23" strum = "0.21" strum_macros = "0.21" +tracing = "0.1.35" # Keep 0.2.65 until this is resolved https://github.com/rustwasm/wasm-pack/issues/886 wasm-bindgen = "=0.2.65" diff --git a/air/src/execution_step/air/ap.rs b/air/src/execution_step/air/ap.rs index 805f2a34..6fe00f0c 100644 --- a/air/src/execution_step/air/ap.rs +++ b/air/src/execution_step/air/ap.rs @@ -37,6 +37,7 @@ use air_trace_handler::MergerApResult; use std::rc::Rc; impl<'i> super::ExecutableInstruction<'i> for Ap<'i> { + #[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))] fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { log_instruction!(call, exec_ctx, trace_ctx); let should_touch_trace = should_touch_trace(self); diff --git a/air/src/execution_step/air/call.rs b/air/src/execution_step/air/call.rs index c63ab693..6d6f9776 100644 --- a/air/src/execution_step/air/call.rs +++ b/air/src/execution_step/air/call.rs @@ -35,6 +35,7 @@ use air_parser::ast::Call; use std::rc::Rc; impl<'i> super::ExecutableInstruction<'i> for Call<'i> { + #[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))] fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { log_instruction!(call, exec_ctx, trace_ctx); exec_ctx.tracker.meet_call(); diff --git a/air/src/execution_step/air/call/resolved_call.rs b/air/src/execution_step/air/call/resolved_call.rs index da18adfc..3f973189 100644 --- a/air/src/execution_step/air/call/resolved_call.rs +++ b/air/src/execution_step/air/call/resolved_call.rs @@ -32,6 +32,7 @@ use air_interpreter_interface::CallRequestParams; use air_parser::ast; use air_trace_handler::MergerCallResult; use air_trace_handler::TraceHandler; +use air_utils::measure; use std::rc::Rc; @@ -51,6 +52,7 @@ struct ResolvedArguments { impl<'i> ResolvedCall<'i> { /// Build `ResolvedCall` from `Call` by transforming `PeerPart` & `FunctionPart` into `ResolvedTriplet`. + #[tracing::instrument(level = "trace", skip_all)] pub(super) fn new(raw_call: &Call<'i>, exec_ctx: &ExecutionCtx<'i>) -> ExecutionResult { let triplet = resolve(&raw_call.triplet, exec_ctx)?; let tetraplet = SecurityTetraplet::from_triplet(triplet); @@ -66,6 +68,7 @@ impl<'i> ResolvedCall<'i> { } /// Executes resolved instruction, updates contexts based on a execution_step result. + #[tracing::instrument(level = "trace", skip_all)] pub(super) fn execute( &self, raw_call: &Call<'i>, @@ -119,6 +122,7 @@ impl<'i> ResolvedCall<'i> { self.tetraplet.clone() } + #[tracing::instrument(level = "trace", skip_all)] fn prepare_request_params( &self, exec_ctx: &ExecutionCtx<'_>, @@ -129,7 +133,11 @@ impl<'i> ResolvedCall<'i> { tetraplets, } = self.resolve_args(exec_ctx)?; - let serialized_tetraplets = serde_json::to_string(&tetraplets).expect("default serializer shouldn't fail"); + let serialized_tetraplets = measure!( + serde_json::to_string(&tetraplets).expect("default serializer shouldn't fail"), + tracing::Level::INFO, + "serde_json::to_string(tetraplets)", + ); let request_params = CallRequestParams::new( tetraplet.service_id.to_string(), diff --git a/air/src/execution_step/air/compare_matchable/comparator.rs b/air/src/execution_step/air/compare_matchable/comparator.rs index 6064077c..f73ce715 100644 --- a/air/src/execution_step/air/compare_matchable/comparator.rs +++ b/air/src/execution_step/air/compare_matchable/comparator.rs @@ -22,6 +22,7 @@ use crate::JValue; use air_parser::ast; +#[tracing::instrument(skip_all)] pub(crate) fn are_matchable_eq<'ctx>( left: &ast::Value<'_>, right: &ast::Value<'_>, diff --git a/air/src/execution_step/resolver/resolve.rs b/air/src/execution_step/resolver/resolve.rs index e50f1e8c..93940e0f 100644 --- a/air/src/execution_step/resolver/resolve.rs +++ b/air/src/execution_step/resolver/resolve.rs @@ -87,6 +87,7 @@ pub(crate) fn prepare_last_error<'i>( Ok((jvalue.clone(), tetraplets)) } +#[tracing::instrument(level = "trace", skip(ctx))] pub(crate) fn resolve_variable<'ctx, 'i>( variable: Variable<'_>, ctx: &'ctx ExecutionCtx<'i>, @@ -113,6 +114,7 @@ pub(crate) fn resolve_variable<'ctx, 'i>( } } +#[tracing::instrument(level = "trace", skip(exec_ctx))] pub(crate) fn resolve_ast_variable_wl<'ctx, 'i>( ast_variable: &ast::VariableWithLambda<'_>, exec_ctx: &'ctx ExecutionCtx<'i>, @@ -131,6 +133,7 @@ pub(crate) fn resolve_ast_variable_wl<'ctx, 'i>( } } +#[tracing::instrument(level = "trace", skip(exec_ctx))] pub(crate) fn resolve_ast_scalar_wl<'ctx, 'i>( ast_scalar: &ast::ScalarWithLambda<'_>, exec_ctx: &'ctx ExecutionCtx<'i>, @@ -140,6 +143,7 @@ pub(crate) fn resolve_ast_scalar_wl<'ctx, 'i>( resolve_ast_variable_wl(&variable, exec_ctx) } +#[tracing::instrument(level = "trace", skip(exec_ctx))] pub(crate) fn apply_lambda<'i>( variable: Variable<'_>, lambda: &LambdaAST<'i>, diff --git a/air/src/farewell_step/outcome.rs b/air/src/farewell_step/outcome.rs index e94c439b..1e49cc07 100644 --- a/air/src/farewell_step/outcome.rs +++ b/air/src/farewell_step/outcome.rs @@ -23,12 +23,15 @@ use crate::INTERPRETER_SUCCESS; use air_interpreter_data::InterpreterData; use air_interpreter_interface::CallRequests; +use air_utils::measure; +use std::fmt::Debug; use std::hash::Hash; use std::rc::Rc; /// Create InterpreterOutcome from supplied execution context and trace handler, /// set ret_code to INTERPRETER_SUCCESS. +#[tracing::instrument(skip_all)] pub(crate) fn from_success_result( exec_ctx: ExecutionCtx<'_>, trace_handler: TraceHandler, @@ -46,9 +49,10 @@ pub(crate) fn from_success_result( /// Create InterpreterOutcome from supplied data and error, /// set ret_code based on the error. +#[tracing::instrument] pub(crate) fn from_uncatchable_error( - data: impl Into>, - error: impl ToErrorCode + ToString, + data: impl Into> + Debug, + error: impl ToErrorCode + ToString + Debug, ) -> InterpreterOutcome { let ret_code = error.to_error_code(); let data = data.into(); @@ -65,14 +69,16 @@ pub(crate) fn from_uncatchable_error( /// Create InterpreterOutcome from supplied execution context, trace handler, and error, /// set ret_code based on the error. +#[tracing::instrument(skip(exec_ctx, trace_handler))] pub(crate) fn from_execution_error( exec_ctx: ExecutionCtx<'_>, trace_handler: TraceHandler, - error: impl ToErrorCode + ToString, + error: impl ToErrorCode + ToString + Debug, ) -> InterpreterOutcome { populate_outcome_from_contexts(exec_ctx, trace_handler, error.to_error_code(), error.to_string()) } +#[tracing::instrument(skip(exec_ctx, trace_handler))] fn populate_outcome_from_contexts( exec_ctx: ExecutionCtx<'_>, trace_handler: TraceHandler, @@ -86,9 +92,17 @@ fn populate_outcome_from_contexts( restricted_streams, exec_ctx.last_call_request_id, ); - let data = serde_json::to_vec(&data).expect("default serializer shouldn't fail"); + let data = measure!( + serde_json::to_vec(&data).expect("default serializer shouldn't fail"), + tracing::Level::INFO, + "serde_json::to_vec(data)" + ); let next_peer_pks = dedup(exec_ctx.next_peer_pks); - let call_requests = serde_json::to_vec(&exec_ctx.call_requests).expect("default serializer shouldn't fail"); + let call_requests = measure!( + serde_json::to_vec(&exec_ctx.call_requests).expect("default serializer shouldn't fail"), + tracing::Level::INFO, + "serde_json::to_vec(call_results)", + ); InterpreterOutcome { ret_code, @@ -100,7 +114,7 @@ fn populate_outcome_from_contexts( } /// Deduplicate values in a supplied vector. -fn dedup(mut vec: Vec) -> Vec { +fn dedup(mut vec: Vec) -> Vec { use std::collections::HashSet; let set: HashSet<_> = vec.drain(..).collect(); diff --git a/air/src/preparation_step/preparation.rs b/air/src/preparation_step/preparation.rs index 9388910a..20144965 100644 --- a/air/src/preparation_step/preparation.rs +++ b/air/src/preparation_step/preparation.rs @@ -33,6 +33,7 @@ pub(crate) struct PreparationDescriptor<'ctx, 'i> { } /// Parse and prepare supplied data and AIR script. +#[tracing::instrument(skip_all)] pub(crate) fn prepare<'i>( prev_data: &[u8], current_data: &[u8], @@ -63,6 +64,7 @@ fn try_to_data(raw_data: &[u8]) -> PreparationResult { InterpreterData::try_from_slice(raw_data).map_err(|err| DataDeFailed(err, raw_data.to_vec())) } +#[tracing::instrument(skip_all)] fn make_exec_ctx( prev_data: &InterpreterData, call_results: &[u8], diff --git a/air/src/runner.rs b/air/src/runner.rs index 260717b7..969b7861 100644 --- a/air/src/runner.rs +++ b/air/src/runner.rs @@ -22,6 +22,7 @@ use crate::preparation_step::PreparationDescriptor; use air_interpreter_interface::InterpreterOutcome; use air_interpreter_interface::RunParameters; use air_log_targets::RUN_PARAMS; +use air_utils::measure; pub fn execute_air( air: String, @@ -64,7 +65,12 @@ fn execute_air_impl( // match here is used instead of map_err, because the compiler can't determine that // they are exclusive and would treat exec_ctx and trace_handler as moved - match air.execute(&mut exec_ctx, &mut trace_handler) { + let exec_result = measure!( + air.execute(&mut exec_ctx, &mut trace_handler), + tracing::Level::INFO, + "execute", + ); + match exec_result { Ok(_) => farewell::from_success_result(exec_ctx, trace_handler), // return new collected trace in case of errors Err(error) if error.is_catchable() => Err(farewell::from_execution_error(exec_ctx, trace_handler, error)), diff --git a/avm/server/Cargo.toml b/avm/server/Cargo.toml index fce576ea..6c725aa2 100644 --- a/avm/server/Cargo.toml +++ b/avm/server/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "avm-server" description = "Fluence AIR VM" -version = "0.22.0" +version = "0.23.0" authors = ["Fluence Labs"] edition = "2018" license = "Apache-2.0" @@ -17,7 +17,8 @@ path = "src/lib.rs" [dependencies] air-interpreter-interface = { version = "0.10.0", path = "../../crates/air-lib/interpreter-interface" } -avm-data-store = { version = "0.3.0", path = "../../crates/data-store" } +air-utils = { version = "0.1.0", path = "../../crates/air-lib/utils" } +avm-data-store = { version = "0.4.0", path = "../../crates/data-store" } fluence-faas = "0.16.2" polyplets = { version = "0.2.0", path = "../../crates/air-lib/polyplets" } @@ -28,3 +29,4 @@ serde_json = "1.0.61" serde = "1.0.118" log = "0.4.14" parking_lot = "0.11.1" +tracing = "0.1.35" diff --git a/avm/server/src/avm.rs b/avm/server/src/avm.rs index 5b2cf83a..9524b5d8 100644 --- a/avm/server/src/avm.rs +++ b/avm/server/src/avm.rs @@ -84,7 +84,7 @@ impl AVM { call_results: CallResults, ) -> AVMResult { let air = air.into(); - let particle_id = particle_parameters.particle_id.as_str(); + let particle_id = particle_parameters.particle_id.as_ref(); let prev_data = self.data_store.read_data(particle_id)?; let current_data = data.into(); @@ -145,7 +145,7 @@ impl AVM { ) -> AVMResult<(), E> { let prev_data = self .data_store - .read_data(particle_parameters.particle_id.as_str())?; + .read_data(&particle_parameters.particle_id)?; let ser_particle = serde_json::to_vec(particle_parameters).map_err(AVMError::AnomalyDataSeError)?; let ser_avm_outcome = diff --git a/avm/server/src/interface/call_service_result.rs b/avm/server/src/interface/call_service_result.rs index 06a13788..faee5b52 100644 --- a/avm/server/src/interface/call_service_result.rs +++ b/avm/server/src/interface/call_service_result.rs @@ -48,7 +48,7 @@ impl CallServiceResult { } } - pub(crate) fn into_raw(self) -> air_interpreter_interface::CallServiceResult { + pub fn into_raw(self) -> air_interpreter_interface::CallServiceResult { let CallServiceResult { ret_code, result } = self; air_interpreter_interface::CallServiceResult { @@ -58,6 +58,7 @@ impl CallServiceResult { } } +#[tracing::instrument(level = "debug", skip(call_results))] pub fn into_raw_result(call_results: CallResults) -> air_interpreter_interface::CallResults { call_results .into_iter() diff --git a/avm/server/src/interface/particle_parameters.rs b/avm/server/src/interface/particle_parameters.rs index e4549a51..670fbd8e 100644 --- a/avm/server/src/interface/particle_parameters.rs +++ b/avm/server/src/interface/particle_parameters.rs @@ -21,16 +21,16 @@ use std::borrow::Cow; /// Represents parameters obtained from a particle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ParticleParameters<'init_peer_id, 'particle_id> { - pub init_peer_id: Cow<'init_peer_id, String>, - pub particle_id: Cow<'particle_id, String>, + pub init_peer_id: Cow<'init_peer_id, str>, + pub particle_id: Cow<'particle_id, str>, pub timestamp: u64, pub ttl: u32, } impl<'init_peer_id, 'particle_id> ParticleParameters<'init_peer_id, 'particle_id> { pub fn new( - init_peer_id: Cow<'init_peer_id, String>, - particle_id: Cow<'particle_id, String>, + init_peer_id: Cow<'init_peer_id, str>, + particle_id: Cow<'particle_id, str>, timestamp: u64, ttl: u32, ) -> Self { diff --git a/avm/server/src/runner.rs b/avm/server/src/runner.rs index 46d7df64..5b868602 100644 --- a/avm/server/src/runner.rs +++ b/avm/server/src/runner.rs @@ -20,6 +20,7 @@ use crate::RunnerError; use crate::RunnerResult; use air_interpreter_interface::InterpreterOutcome; +use air_utils::measure; use fluence_faas::FaaSConfig; use fluence_faas::FluenceFaaS; use fluence_faas::IValue; @@ -66,6 +67,7 @@ impl AVMRunner { Ok(avm) } + #[tracing::instrument(skip_all)] pub fn call( &mut self, air: impl Into, @@ -87,9 +89,59 @@ impl AVMRunner { call_results, ); - let result = + let result = measure!( self.faas - .call_with_ivalues(&self.wasm_filename, "invoke", &args, <_>::default())?; + .call_with_ivalues(&self.wasm_filename, "invoke", &args, <_>::default())?, + tracing::Level::INFO, + "faas.call_with_ivalues", + method = "invoke", + ); + + let result = try_as_one_value_vec(result)?; + let outcome = InterpreterOutcome::from_ivalue(result) + .map_err(RunnerError::InterpreterResultDeError)?; + let outcome = RawAVMOutcome::from_interpreter_outcome(outcome)?; + + Ok(outcome) + } + + #[tracing::instrument(skip_all)] + pub fn call_tracing( + &mut self, + air: impl Into, + prev_data: impl Into>, + data: impl Into>, + init_peer_id: impl Into, + timestamp: u64, + ttl: u32, + call_results: CallResults, + tracing_params: String, + tracing_output_mode: u8, + ) -> RunnerResult { + let mut args = prepare_args( + air, + prev_data, + data, + self.current_peer_id.clone(), + init_peer_id.into(), + timestamp, + ttl, + call_results, + ); + args.push(IValue::String(tracing_params)); + args.push(IValue::U8(tracing_output_mode)); + + let result = measure!( + self.faas.call_with_ivalues( + &self.wasm_filename, + "invoke_tracing", + &args, + <_>::default(), + )?, + tracing::Level::INFO, + "faas.call_with_ivalues", + method = "invoke_tracing", + ); let result = try_as_one_value_vec(result)?; let outcome = InterpreterOutcome::from_ivalue(result) @@ -117,6 +169,7 @@ impl AVMRunner { } } +#[tracing::instrument(skip(air, prev_data, data, call_results))] fn prepare_args( air: impl Into, prev_data: impl Into>, @@ -136,8 +189,11 @@ fn prepare_args( .into_ivalue(); let call_results = crate::interface::into_raw_result(call_results); - let call_results = - serde_json::to_vec(&call_results).expect("the default serializer shouldn't fail"); + let call_results = measure!( + serde_json::to_vec(&call_results).expect("the default serializer shouldn't fail"), + tracing::Level::INFO, + "serde_json::to_vec call_results" + ); vec![ IValue::String(air.into()), diff --git a/crates/air-lib/air-parser/Cargo.toml b/crates/air-lib/air-parser/Cargo.toml index 0c952376..eac04c08 100644 --- a/crates/air-lib/air-parser/Cargo.toml +++ b/crates/air-lib/air-parser/Cargo.toml @@ -27,8 +27,8 @@ serde = { version = "1.0.118", features = ["rc", "derive"] } serde_json = "1.0.61" itertools = "0.10.0" - thiserror = "1.0.23" +tracing = "0.1.35" [dev-dependencies] fstrings = "0.2.3" diff --git a/crates/air-lib/air-parser/src/parser/air_parser.rs b/crates/air-lib/air-parser/src/parser/air_parser.rs index 49826c34..f0251431 100644 --- a/crates/air-lib/air-parser/src/parser/air_parser.rs +++ b/crates/air-lib/air-parser/src/parser/air_parser.rs @@ -33,6 +33,7 @@ use lalrpop_util::{ErrorRecovery, ParseError}; thread_local!(static PARSER: AIRParser = AIRParser::new()); /// Parse AIR `source_code` to `Box` +#[tracing::instrument(skip_all)] pub fn parse(air_script: &str) -> Result>, String> { let mut files = SimpleFiles::new(); let file_id = files.add("script.air", air_script); diff --git a/crates/air-lib/interpreter-data/Cargo.toml b/crates/air-lib/interpreter-data/Cargo.toml index daab6eaa..ae6b5a01 100644 --- a/crates/air-lib/interpreter-data/Cargo.toml +++ b/crates/air-lib/interpreter-data/Cargo.toml @@ -14,7 +14,10 @@ name = "air_interpreter_data" path = "src/lib.rs" [dependencies] +air-utils = { path = "../utils" } + serde = {version = "1.0.118", features = ["derive", "rc"]} serde_json = "1.0.61" semver = { version = "1.0.3", features = ["serde"] } once_cell = "1.8.0" +tracing = "0.1.35" diff --git a/crates/air-lib/interpreter-data/src/interpreter_data.rs b/crates/air-lib/interpreter-data/src/interpreter_data.rs index 36434c97..b739d4a9 100644 --- a/crates/air-lib/interpreter-data/src/interpreter_data.rs +++ b/crates/air-lib/interpreter-data/src/interpreter_data.rs @@ -18,6 +18,8 @@ use super::GlobalStreamGens; use super::RestrictedStreamGens; use super::DATA_FORMAT_VERSION; use crate::ExecutionTrace; +use air_utils::measure; + use serde::Deserialize; use serde::Serialize; use std::ops::Deref; @@ -63,6 +65,7 @@ impl InterpreterData { } } + #[tracing::instrument(skip_all)] pub fn from_execution_result( trace: ExecutionTrace, streams: GlobalStreamGens, @@ -86,7 +89,11 @@ impl InterpreterData { return Ok(Self::default()); } - serde_json::from_slice(slice) + measure!( + serde_json::from_slice(slice), + tracing::Level::INFO, + "serde_json::from_slice" + ) } } diff --git a/crates/air-lib/trace-handler/Cargo.toml b/crates/air-lib/trace-handler/Cargo.toml index 994740f3..a3223ca9 100644 --- a/crates/air-lib/trace-handler/Cargo.toml +++ b/crates/air-lib/trace-handler/Cargo.toml @@ -21,3 +21,4 @@ air-parser = { path = "../air-parser" } serde_json = "1.0.68" log = "0.4.14" thiserror = "1.0.29" +tracing = "0.1.35" diff --git a/crates/air-lib/utils/Cargo.toml b/crates/air-lib/utils/Cargo.toml new file mode 100644 index 00000000..b172dbd7 --- /dev/null +++ b/crates/air-lib/utils/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "air-utils" +version = "0.1.0" +description = "AIR helper funcitions and macros" +authors = ["Fluence Labs"] +edition = "2018" +license = "Apache-2.0" +publish = true # this crate is used by avm server that in its turn is used by the node +keywords = ["fluence", "air", "webassembly", "programming-language"] +categories = ["wasm"] + +[lib] +name = "air_utils" +path = "src/lib.rs" + +[dependencies] diff --git a/crates/air-lib/utils/src/lib.rs b/crates/air-lib/utils/src/lib.rs new file mode 100644 index 00000000..17d49cbe --- /dev/null +++ b/crates/air-lib/utils/src/lib.rs @@ -0,0 +1,39 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[macro_export] +macro_rules! measure { + (target: $target:expr, $expr:expr, $span:literal) => ({ + let span = tracing::span!(target=$target, $span); + let _enter = span.enter(); + $expr + }); + ($expr:expr, $level:expr, $span:literal, $($fields:tt)*) => ({ + let span = tracing::span!($level, $span, $($fields)*); + let _enter = span.enter(); + $expr + }); + ($expr:expr, $level:expr, $span:literal) => ({ + let span = tracing::span!($level, $span); + let _enter = span.enter(); + $expr + }); + ($expr:expr, $span:literal) => ({ + let span = tracing::span!($span); + let _enter = span.enter(); + $expr + }); +} diff --git a/crates/data-store/Cargo.toml b/crates/data-store/Cargo.toml index 48881fc0..08fc457c 100644 --- a/crates/data-store/Cargo.toml +++ b/crates/data-store/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "avm-data-store" -version = "0.3.0" +version = "0.4.0" description = "Definition of the AVM DataStore trait" authors = ["Fluence Labs"] edition = "2018" @@ -17,3 +17,7 @@ path = "src/lib.rs" [dependencies] serde = { version = "1.0.118", features = ["derive"] } +serde_bytes = "0.11.6" + +[dev-dependencies] +serde_json = "1.0.79" diff --git a/crates/data-store/src/lib.rs b/crates/data-store/src/lib.rs index ac7e1404..46c42b23 100644 --- a/crates/data-store/src/lib.rs +++ b/crates/data-store/src/lib.rs @@ -16,6 +16,7 @@ use serde::Deserialize; use serde::Serialize; +use std::borrow::Cow; use std::time::Duration; /// This trait is used for @@ -47,13 +48,18 @@ pub trait DataStore { ) -> Result<(), Self::Error>; } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct AnomalyData<'data> { - pub air_script: &'data str, - pub particle: &'data [u8], // it's byte because of the restriction on trait objects methods - pub prev_data: &'data [u8], - pub current_data: &'data [u8], - pub avm_outcome: &'data [u8], // it's byte because of the restriction on trait objects methods + #[serde(borrow)] + pub air_script: Cow<'data, str>, + #[serde(borrow, with = "serde_bytes")] + pub particle: Cow<'data, [u8]>, // it's byte because of the restriction on trait objects methods + #[serde(borrow, with = "serde_bytes")] + pub prev_data: Cow<'data, [u8]>, + #[serde(borrow, with = "serde_bytes")] + pub current_data: Cow<'data, [u8]>, + #[serde(borrow, with = "serde_bytes")] + pub avm_outcome: Cow<'data, [u8]>, // it's byte because of the restriction on trait objects methods pub execution_time: Duration, pub memory_delta: usize, } @@ -69,13 +75,97 @@ impl<'data> AnomalyData<'data> { memory_delta: usize, ) -> Self { Self { - air_script, - particle, - prev_data, - current_data, - avm_outcome, + air_script: air_script.into(), + particle: particle.into(), + prev_data: prev_data.into(), + current_data: current_data.into(), + avm_outcome: avm_outcome.into(), execution_time, memory_delta, } } } + +#[cfg(test)] +mod tests { + use super::*; + + fn anomaly_json( + air_script: &str, + particle: &[u8], + prev_data: &[u8], + current_data: &[u8], + avm_outcome: &[u8], + ) -> String { + format!( + concat!( + r#"{{"air_script":{air_script},"#, + r#""particle":{particle},"#, + r#""prev_data":{prev_data},"#, + r#""current_data":{current_data},"#, + r#""avm_outcome":{avm_outcome},"#, + r#""execution_time":{{"secs":42,"nanos":0}},"#, + r#""memory_delta":123"#, + r#"}}"# + ), + air_script = serde_json::to_string(air_script).unwrap(), + particle = serde_json::to_string(particle).unwrap(), + prev_data = serde_json::to_string(prev_data).unwrap(), + current_data = serde_json::to_string(current_data).unwrap(), + avm_outcome = serde_json::to_string(avm_outcome).unwrap(), + ) + } + #[test] + fn anomaly_data_se() { + let anomaly = AnomalyData::new( + "(null)", + br#"{"data":"value"}"#, // not real data + br#"{"trace":[]}"#, // not real data + br#"{"trace":[1,2,3]}"#, // not real data + b"{}", // not real data + Duration::from_secs(42), + 123, + ); + + let json_data = serde_json::to_string(&anomaly).expect("JSON serialize anomaly data"); + let expected = anomaly_json( + &anomaly.air_script, + &anomaly.particle, + &anomaly.prev_data, + &anomaly.current_data, + &anomaly.avm_outcome, + ); + assert_eq!(json_data, expected); + } + + #[test] + fn anomaly_data_de() { + let particle = br#"{"particle":"data"}"#; + let current_data = br#"{"data":"current"}"#; + let prev_data = br#"{"data":"prev"}"#; + let avm_outcome = br#"{"avm":[1,2,3]}"#; + let json_data = anomaly_json( + "(null)", + &particle[..], + &prev_data[..], + ¤t_data[..], + &avm_outcome[..], + ); + + let anomaly: AnomalyData = + serde_json::from_str(&json_data).expect("deserialize JSON anomaly data"); + + assert_eq!( + anomaly, + AnomalyData::new( + "(null)", + &particle[..], + &prev_data[..], + ¤t_data[..], + &avm_outcome[..], + Duration::from_secs(42), + 123, + ) + ) + } +} diff --git a/tools/cli/air-trace/Cargo.toml b/tools/cli/air-trace/Cargo.toml new file mode 100644 index 00000000..2eeaafe3 --- /dev/null +++ b/tools/cli/air-trace/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "air-trace" +version = "0.1.0" +edition = "2021" +description = "AIR performance tracing tool" +authors = ["Fluence Labs"] +license = "Apache-2.0" +publish = false +keywords = ["fluence", "air", "tracing"] + +[dependencies] +air = { path = "../../../air" } +air-test-utils = { path = "../../../crates/air-lib/test-utils" } +air-interpreter-interface = { path = "../../../crates/air-lib/interpreter-interface" } +avm-server = { path = "../../../avm/server" } + +anyhow = "1.0.56" +clap = { version = "3.1.18", features = ["derive", "env"] } +itertools = "0.10.0" +serde = { version = "1.0.61", features = ["derive"] } +serde_json = "1.0.61" +tracing-subscriber = { version = "0.3.11", default-features = false, features = [ "env-filter", "json", "smallvec", "time", "fmt" ] } + +[features] +default = ["wasm"] +wasm = [] diff --git a/tools/cli/air-trace/src/main.rs b/tools/cli/air-trace/src/main.rs new file mode 100644 index 00000000..5b4eb55f --- /dev/null +++ b/tools/cli/air-trace/src/main.rs @@ -0,0 +1,42 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod run; +mod stats; +mod utils; + +use clap::Parser; + +#[derive(Parser)] +struct Cli { + #[clap(subcommand)] + subcommand: Subcommand, +} + +#[derive(clap::Subcommand)] +#[allow(clippy::large_enum_variant)] +enum Subcommand { + Run(crate::run::Args), + Stats(crate::stats::Args), +} + +fn main() -> anyhow::Result<()> { + let args = Cli::parse(); + match args.subcommand { + Subcommand::Run(args) => crate::run::run(args), + Subcommand::Stats(args) => crate::stats::stats(args), + } +} diff --git a/tools/cli/air-trace/src/run.rs b/tools/cli/air-trace/src/run.rs new file mode 100644 index 00000000..e6c15b0b --- /dev/null +++ b/tools/cli/air-trace/src/run.rs @@ -0,0 +1,168 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod data; +mod native; +mod runner; +#[cfg(feature = "wasm")] +mod wasm; + +use self::runner::AirRunner; +use air_test_utils::CallResults; + +use anyhow::Context as _; +use clap::{Parser, Subcommand}; + +use std::path::{Path, PathBuf}; + +#[derive(Parser, Debug)] +#[clap(about = "Run AIR script with AquaVM")] +pub(crate) struct Args { + #[clap(long)] + current_peer_id: Option, + + #[clap(long = "call_results")] + call_results_path: Option, + + #[clap(long)] + max_heap_size: Option, + #[clap(long, default_value = "info")] + tracing_params: String, + #[clap(long)] + native: bool, + #[clap( + long = "runtime", + env = "AIR_WASM_RUNTIME_PATH", + default_value = "target/wasm32-wasi/release/air_interpreter_server.wasm" + )] + air_wasm_runtime_path: PathBuf, + #[clap(long, help = "Execute several times; great for native profiling")] + repeat: Option, + #[clap(long, help = "Output JSON tracing info")] + json: bool, + + #[clap(subcommand)] + source: Source, +} + +#[derive(Subcommand, Debug)] +#[allow(clippy::large_enum_variant)] +enum Source { + #[clap(name = "--anomaly")] + Anomaly(self::data::anomaly::AnomalyDataArgs), + #[clap(name = "--plain")] + PlainData(self::data::plain::PlainDataArgs), +} + +pub(crate) fn run(args: Args) -> anyhow::Result<()> { + let tracing_json = (!args.json) as u8; + init_tracing(args.tracing_params.clone(), tracing_json); + + let current_peer_id = args.current_peer_id.as_deref().unwrap_or("some_id"); + let mut runner = get_runner( + args.native, + current_peer_id, + &args.air_wasm_runtime_path, + args.max_heap_size, + )?; + + let execution_data = match &args.source { + Source::Anomaly(anomaly) => data::anomaly::load(anomaly)?, + Source::PlainData(raw) => data::plain::load(raw)?, + }; + let particle = execution_data.particle; + + let call_results = read_call_results(args.call_results_path.as_deref())?; + + let repeat = args.repeat.unwrap_or(1); + for _ in 0..repeat { + let result = runner + .call_tracing( + execution_data.air_script.clone(), + execution_data.prev_data.clone().into(), + execution_data.current_data.clone().into(), + particle.init_peer_id.clone().into_owned(), + particle.timestamp, + particle.ttl, + call_results.clone(), + args.tracing_params.clone(), + tracing_json, + ) + .context("Failed to execute the script")?; + if args.repeat.is_none() { + println!("{:?}", result); + } + } + + Ok(()) +} + +#[cfg(feature = "wasm")] +fn get_runner( + native: bool, + current_peer_id: impl Into, + air_wasm_runtime_path: &Path, + max_heap_size: Option, +) -> anyhow::Result> { + if native { + self::native::create_native_avm_runner(current_peer_id) + .context("Failed to instantiate a native AVM") + } else { + self::wasm::create_wasm_avm_runner(current_peer_id, air_wasm_runtime_path, max_heap_size) + .context("Failed to instantiate WASM AVM") + } +} + +#[cfg(not(feature = "wasm"))] +fn get_runner( + native: bool, + current_peer_id: impl Into, + air_wasm_runtime_path: &Path, + max_heap_size: Option, +) -> anyhow::Result> { + self::native::create_native_avm_runner(current_peer_id) + .context("Failed to instantiate a native AVM") +} + +// TODO This is a copy of function from air_interpreter/marine.rs. It should be moved to the marine_rs_sdk. +pub fn init_tracing(tracing_params: String, trace_mode: u8) { + use tracing_subscriber::fmt::format::FmtSpan; + + let builder = tracing_subscriber::fmt() + .with_env_filter(tracing_params) + .with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE); + if trace_mode == 0 { + builder.json().init(); + } else { + // Human-readable output. + builder.init(); + } +} +fn read_call_results(call_results_path: Option<&Path>) -> anyhow::Result { + match call_results_path { + None => Ok(CallResults::default()), + Some(call_results_path) => { + let call_results_json = + load_data(call_results_path).context("failed to read call_results")?; + Ok(serde_json::from_str(&call_results_json) + .context("failed to parse call_results data")?) + } + } +} + +fn load_data(data_path: &Path) -> anyhow::Result { + Ok(std::fs::read_to_string(data_path)?) +} diff --git a/tools/cli/air-trace/src/run/data/anomaly.rs b/tools/cli/air-trace/src/run/data/anomaly.rs new file mode 100644 index 00000000..ef22d9d6 --- /dev/null +++ b/tools/cli/air-trace/src/run/data/anomaly.rs @@ -0,0 +1,52 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::ExecutionData; +use crate::run::load_data; +use air_test_utils::ParticleParameters; +use avm_server::AnomalyData; + +use anyhow::Context; +use clap::Parser; + +use std::path::PathBuf; + +#[derive(Parser, Debug)] +pub(crate) struct AnomalyDataArgs { + anomaly_data_path: PathBuf, +} + +pub(crate) fn load(args: &AnomalyDataArgs) -> anyhow::Result { + let anomaly_json = load_data(&args.anomaly_data_path).context("Failed to read anomaly data")?; + let anomaly_data: AnomalyData = + serde_json::from_str(&anomaly_json).context("Failed to parse anomaly data")?; + + let air_script = anomaly_data.air_script.to_string(); + let prev_data = String::from_utf8(anomaly_data.prev_data.to_vec()) + .context("Anomaly current_data is not a valid string")?; + let current_data = String::from_utf8(anomaly_data.current_data.to_vec()) + .context("Anomaly current_data is not a valid string")?; + let particle: ParticleParameters<'static, 'static> = + serde_json::from_reader(&*anomaly_data.particle.to_vec()) + .context("Anomaly particle is not a valid JSON")?; + + Ok(ExecutionData { + air_script, + prev_data, + current_data, + particle, + }) +} diff --git a/tools/cli/air-trace/src/run/data/mod.rs b/tools/cli/air-trace/src/run/data/mod.rs new file mode 100644 index 00000000..589bd3eb --- /dev/null +++ b/tools/cli/air-trace/src/run/data/mod.rs @@ -0,0 +1,27 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub(crate) mod anomaly; +pub(crate) mod plain; + +use avm_server::ParticleParameters; + +pub(crate) struct ExecutionData<'ctx> { + pub(crate) air_script: String, + pub(crate) current_data: String, + pub(crate) prev_data: String, + pub(crate) particle: ParticleParameters<'ctx, 'ctx>, +} diff --git a/tools/cli/air-trace/src/run/data/plain.rs b/tools/cli/air-trace/src/run/data/plain.rs new file mode 100644 index 00000000..a579bf37 --- /dev/null +++ b/tools/cli/air-trace/src/run/data/plain.rs @@ -0,0 +1,84 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::ExecutionData; +use crate::utils::unix_timestamp_now; +use air_test_utils::ParticleParameters; + +use anyhow::Context; + +use std::path::{Path, PathBuf}; + +const DEFAULT_DATA: &str = + r#"{"trace":[],"streams":{},"version":"0.2.2","lcid":0,"r_streams":{"$nodes":{}}}"#; + +#[derive(clap::Args, Debug)] +pub(crate) struct PlainDataArgs { + #[clap(long)] + init_peer_id: Option, + #[clap(long, help = "default: current time")] + timestamp: Option, + #[clap(long, help = "default: max possible ttl")] + ttl: Option, + + #[clap(long = "script", help = "read from stdin by default")] + air_script_path: Option, + #[clap(long = "prev_data")] + prev_data_path: Option, + #[clap(long = "data")] + data_path: PathBuf, +} + +pub(crate) fn load(args: &PlainDataArgs) -> anyhow::Result { + use crate::run::load_data; + + let air_script = + read_air_script(args.air_script_path.as_deref()).context("failed to read AIR script")?; + let prev_data = match &args.prev_data_path { + None => DEFAULT_DATA.to_owned(), + Some(prev_data_path) => load_data(prev_data_path).context("failed to read prev_data")?, + }; + let current_data = load_data(&args.data_path).context("failed to read data")?; + + let timestamp = args.timestamp.unwrap_or_else(unix_timestamp_now); + let ttl = args.ttl.unwrap_or(u32::MAX); + let init_peer_id = args.init_peer_id.as_deref().unwrap_or("some_id"); + + let particle = ParticleParameters::new(init_peer_id.into(), "".into(), timestamp, ttl); + Ok(ExecutionData { + air_script, + prev_data, + current_data, + particle, + }) +} + +fn read_air_script(air_input: Option<&Path>) -> anyhow::Result { + use std::io::Read; + + let air_script = match air_input { + Some(in_path) => std::fs::read_to_string(in_path)?, + None => { + let mut buffer = String::new(); + let mut stdin = std::io::stdin().lock(); + + stdin.read_to_string(&mut buffer)?; + buffer + } + }; + + Ok(air_script) +} diff --git a/tools/cli/air-trace/src/run/native.rs b/tools/cli/air-trace/src/run/native.rs new file mode 100644 index 00000000..06dfcd3b --- /dev/null +++ b/tools/cli/air-trace/src/run/native.rs @@ -0,0 +1,68 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::runner::AirRunner; +use air_interpreter_interface::RunParameters; + +struct NativeAvmRunner { + current_peer_id: String, +} + +impl AirRunner for NativeAvmRunner { + fn call_tracing( + &mut self, + air: String, + prev_data: Vec, + data: Vec, + init_peer_id: String, + timestamp: u64, + ttl: u32, + call_results: air_test_utils::CallResults, + // We use externally configured logger. + _tracing_params: String, + _tracing_output_mode: u8, + ) -> anyhow::Result { + use air_test_utils::into_raw_result; + + // some inner parts transformations + let raw_call_results = into_raw_result(call_results); + let raw_call_results = serde_json::to_vec(&raw_call_results).unwrap(); + + let outcome = air::execute_air( + air, + prev_data, + data, + RunParameters { + init_peer_id, + current_peer_id: self.current_peer_id.clone(), + timestamp, + ttl, + }, + raw_call_results, + ); + let outcome = air_test_utils::RawAVMOutcome::from_interpreter_outcome(outcome)?; + + Ok(outcome) + } +} + +pub(crate) fn create_native_avm_runner( + current_peer_id: impl Into, +) -> anyhow::Result> { + Ok(Box::new(NativeAvmRunner { + current_peer_id: current_peer_id.into(), + })) +} diff --git a/tools/cli/air-trace/src/run/runner.rs b/tools/cli/air-trace/src/run/runner.rs new file mode 100644 index 00000000..a21735c4 --- /dev/null +++ b/tools/cli/air-trace/src/run/runner.rs @@ -0,0 +1,33 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use avm_server::avm_runner::*; +use avm_server::CallResults; + +pub(crate) trait AirRunner { + fn call_tracing( + &mut self, + air: String, + prev_data: Vec, + data: Vec, + init_peer_id: String, + timestamp: u64, + ttl: u32, + call_results: CallResults, + tracing_params: String, + tracing_output_mode: u8, + ) -> anyhow::Result; +} diff --git a/tools/cli/air-trace/src/run/wasm.rs b/tools/cli/air-trace/src/run/wasm.rs new file mode 100644 index 00000000..86eebcc5 --- /dev/null +++ b/tools/cli/air-trace/src/run/wasm.rs @@ -0,0 +1,62 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use super::runner::AirRunner; +use avm_server::avm_runner::AVMRunner; +use std::path::Path; + +pub(crate) struct WasmAvmRunner(AVMRunner); + +impl AirRunner for WasmAvmRunner { + fn call_tracing( + &mut self, + air: String, + prev_data: Vec, + data: Vec, + init_peer_id: String, + timestamp: u64, + ttl: u32, + call_results: avm_server::CallResults, + tracing_params: String, + tracing_output_mode: u8, + ) -> anyhow::Result { + Ok(self.0.call_tracing( + air, + prev_data, + data, + init_peer_id, + timestamp, + ttl, + call_results, + tracing_params, + tracing_output_mode, + )?) + } +} + +pub(crate) fn create_wasm_avm_runner( + current_peer_id: impl Into, + air_wasm_runtime_path: &Path, + max_heap_size: Option, +) -> anyhow::Result> { + let current_peer_id = current_peer_id.into(); + + Ok(Box::new(WasmAvmRunner(AVMRunner::new( + air_wasm_runtime_path.to_owned(), + current_peer_id, + max_heap_size, + 0, + )?))) +} diff --git a/tools/cli/air-trace/src/stats.rs b/tools/cli/air-trace/src/stats.rs new file mode 100644 index 00000000..5a4a5c18 --- /dev/null +++ b/tools/cli/air-trace/src/stats.rs @@ -0,0 +1,111 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod log_data; +mod report; + +use self::log_data::{LogRecord, Message}; + +use clap::Parser; + +#[derive(Parser)] +#[clap(about = "Pretty-print AquaVM JSON trace or provide execution stats")] +pub(crate) struct Args { + #[clap(long)] + pretty: bool, + #[clap(long)] + stats: bool, + + #[clap(long)] + sort_stats_by_duration: bool, +} + +pub(crate) fn stats(mut args: Args) -> anyhow::Result<()> { + use std::io::Write; + + if !args.pretty && !args.stats { + args.pretty = true; + args.stats = true; + } + + let stderr = std::io::stderr(); + let mut stderr = stderr.lock(); + let stdin = std::io::stdin(); + let stdin = stdin.lock(); + + let mut stats = self::report::StatsReport::new(); + + for rec in read_logs(stdin) { + let rec = rec?; + + if args.pretty { + print_log_record(&mut stderr, &rec)?; + } + if args.stats { + stats.consider(rec)?; + } + } + + if args.stats { + if args.pretty { + writeln!(stderr)?; + } + stats.report(&mut stderr, args.sort_stats_by_duration)?; + } + Ok(()) +} + +fn read_logs(input: R) -> impl Iterator> { + input.lines().filter_map(|r| match r { + Ok(line) => { + let line = line.trim(); + if line.is_empty() { + None + } else { + Some(serde_json::from_str(line).map_err(anyhow::Error::from)) + } + } + Err(err) => Some(Err(err.into())), + }) +} + +fn print_log_record(mut out: W, log_record: &LogRecord) -> std::io::Result<()> { + use itertools::Itertools as _; + + let val = &log_record.value; + + write!( + out, + "{timestamp} {level} ", + timestamp = val.timestamp, + level = val.level, + )?; + if !val.spans.is_empty() { + write!(out, "{spans}", spans = val.spans.iter().join(":"),)?; + } + if matches!(&val.fields, Message::Close(_)) { + if !val.spans.is_empty() { + write!(out, ":")?; + } + write!(out, "{span}", span = log_record.span)?; + } + writeln!( + out, + ": {target}: {fields}", + target = log_record.target, + fields = val.fields, + ) +} diff --git a/tools/cli/air-trace/src/stats/log_data.rs b/tools/cli/air-trace/src/stats/log_data.rs new file mode 100644 index 00000000..7ce8c356 --- /dev/null +++ b/tools/cli/air-trace/src/stats/log_data.rs @@ -0,0 +1,117 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use serde::Deserialize; +use std::collections::HashMap; + +#[derive(Deserialize)] +pub(crate) struct LogRecord { + pub(crate) target: String, + pub(crate) span: Span, + + #[serde(flatten)] + pub(crate) value: LogValue, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct Span { + pub(crate) name: String, + #[serde(flatten)] + pub(crate) args: HashMap, +} + +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct LogKey { + pub(crate) target: String, + pub(crate) span_name: String, +} + +#[derive(Deserialize)] +pub(crate) struct LogValue { + pub(crate) timestamp: String, + pub(crate) fields: Message, + pub(crate) level: String, + pub(crate) spans: Vec, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "message", rename_all = "lowercase")] +pub(crate) enum Message { + New, + Enter, + Close(CloseMessage), +} + +#[derive(Debug, Deserialize)] +pub(crate) struct CloseMessage { + #[serde(rename = "time.busy")] + pub(crate) time_busy: String, + #[serde(rename = "time.idle")] + pub(crate) time_idle: String, +} + +impl LogRecord { + pub(crate) fn get_key(&self) -> LogKey { + LogKey { + target: self.target.clone(), + span_name: self.span.name.clone(), + } + } +} + +impl std::fmt::Display for Message { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Message::New => write!(f, "new"), + Message::Enter => write!(f, "enter"), + Message::Close(c) => write!(f, "close {}", c), + } + } +} + +impl std::fmt::Display for CloseMessage { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "idle={}, busy={}", self.time_idle, self.time_busy) + } +} + +fn format_argument(val: &serde_json::Value) -> String { + match val { + serde_json::Value::String(s) => s.to_owned(), + _ => val.to_string(), + } +} + +impl std::fmt::Display for Span { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + use itertools::Itertools as _; + + self.name.fmt(f)?; + if !self.args.is_empty() { + "{".fmt(f)?; + write!( + f, + "{}", + self.args + .iter() + .map(|(k, v)| format!("{}={}", k, format_argument(v))) + .format(", ") + )?; + "}".fmt(f)?; + } + Ok(()) + } +} diff --git a/tools/cli/air-trace/src/stats/report.rs b/tools/cli/air-trace/src/stats/report.rs new file mode 100644 index 00000000..6e99f730 --- /dev/null +++ b/tools/cli/air-trace/src/stats/report.rs @@ -0,0 +1,57 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::log_data::{LogKey, LogRecord, Message}; +use crate::utils::parse_tracing_duration; + +use std::{collections::HashMap, time::Duration}; + +#[derive(Default)] +pub(crate) struct StatsReport { + data: HashMap, +} + +impl StatsReport { + pub(crate) fn new() -> Self { + Self::default() + } + + pub(crate) fn report( + self, + mut out: W, + sort_stats_by_duration: bool, + ) -> std::io::Result<()> { + writeln!(out, "*** Statistics ***")?; + let mut stats_data: Vec<_> = self.data.into_iter().collect(); + if sort_stats_by_duration { + stats_data.sort_unstable_by(|a, b| (a.1, &a.0).cmp(&(b.1, &b.0)).reverse()); + } else { + stats_data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + } + for (k, v) in stats_data { + writeln!(out, "{} {}: {:?}", k.target, k.span_name, v)?; + } + Ok(()) + } + + pub(crate) fn consider(&mut self, rec: LogRecord) -> anyhow::Result<()> { + if let Message::Close(close) = &rec.value.fields { + let time_busy = parse_tracing_duration(&close.time_busy)?; + *self.data.entry(rec.get_key()).or_default() += time_busy; + } + Ok(()) + } +} diff --git a/tools/cli/air-trace/src/utils.rs b/tools/cli/air-trace/src/utils.rs new file mode 100644 index 00000000..3c31bb43 --- /dev/null +++ b/tools/cli/air-trace/src/utils.rs @@ -0,0 +1,40 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::time::Duration; + +// unfortunately, external crates don't seem to provide required functionality: +// some do not handle floats, others do not handle suffixes +pub(crate) fn parse_tracing_duration(input: &str) -> Result { + for (suffix, scale) in [("ns", 1e-9), ("µs", 1e-6), ("ms", 1e-3), ("s", 1e0)] { + if let Some(num_str) = input.strip_suffix(suffix) { + if let Ok(num) = num_str.parse::() { + return Ok(Duration::from_secs_f64(num * scale)); + } else { + break; + } + } + } + return Err(anyhow::anyhow!("malformed duration {:?}", input)); +} + +pub(crate) fn unix_timestamp_now() -> u64 { + use std::time::SystemTime; + + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("time befor Unix epoch") + .as_secs() +}