feat!(execution-engine): Store call executed values as CIDs in the data (#401)

The trace stores CID strings for call result values.  These strings are to be resolved to real values with `InterpreterData::cid_store` map.
This commit is contained in:
Ivan Boldyrev 2022-12-26 15:45:14 +07:00 committed by GitHub
parent 004ce10abd
commit 0226c062f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 942 additions and 172 deletions

View File

@ -1,3 +1,9 @@
## Version 0.33.0 (2022-12-21)
[PR 401](https://github.com/fluencelabs/aquavm/pull/401):
Call result values are stored as CIDs in the data trace. These CIDs refer
to a new `cid_store` data's field that maps a CID string to a value.
## Version 0.32.0 (2022-11-25)
[PR 298](https://github.com/fluencelabs/aquavm/pull/298):

201
Cargo.lock generated
View File

@ -13,9 +13,10 @@ dependencies = [
[[package]]
name = "air"
version = "0.32.0"
version = "0.33.0"
dependencies = [
"air-execution-info-collector",
"air-interpreter-cid",
"air-interpreter-data",
"air-interpreter-interface",
"air-lambda-ast",
@ -84,7 +85,7 @@ dependencies = [
[[package]]
name = "air-interpreter"
version = "0.32.0"
version = "0.33.0"
dependencies = [
"air",
"air-interpreter-interface",
@ -99,9 +100,21 @@ dependencies = [
]
[[package]]
name = "air-interpreter-data"
version = "0.4.1"
name = "air-interpreter-cid"
version = "0.1.0"
dependencies = [
"cid",
"multihash",
"serde",
"serde_json",
]
[[package]]
name = "air-interpreter-data"
version = "0.5.0"
dependencies = [
"air-interpreter-cid",
"air-interpreter-interface",
"air-parser",
"air-utils",
"once_cell",
@ -113,8 +126,9 @@ dependencies = [
[[package]]
name = "air-interpreter-interface"
version = "0.11.2"
version = "0.12.0"
dependencies = [
"air-interpreter-cid",
"fluence-it-types",
"marine-rs-sdk",
"serde",
@ -179,6 +193,8 @@ name = "air-test-utils"
version = "0.4.0"
dependencies = [
"air",
"air-interpreter-cid",
"air-interpreter-data",
"air-interpreter-interface",
"avm-interface",
"avm-server",
@ -227,7 +243,9 @@ dependencies = [
name = "air-trace-handler"
version = "0.1.0"
dependencies = [
"air-interpreter-cid",
"air-interpreter-data",
"air-interpreter-interface",
"air-log-targets",
"air-parser",
"bimap",
@ -320,7 +338,7 @@ dependencies = [
[[package]]
name = "avm-interface"
version = "0.27.0"
version = "0.28.0"
dependencies = [
"air-interpreter-interface",
"air-utils",
@ -336,7 +354,7 @@ dependencies = [
[[package]]
name = "avm-server"
version = "0.27.0"
version = "0.28.0"
dependencies = [
"air-interpreter-interface",
"air-utils",
@ -354,6 +372,12 @@ dependencies = [
"tracing",
]
[[package]]
name = "base-x"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270"
[[package]]
name = "base64"
version = "0.13.0"
@ -411,6 +435,15 @@ dependencies = [
"digest 0.9.0",
]
[[package]]
name = "block-buffer"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e"
dependencies = [
"generic-array 0.14.6",
]
[[package]]
name = "boolinator"
version = "2.4.0"
@ -523,6 +556,19 @@ dependencies = [
"half",
]
[[package]]
name = "cid"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9b68e3193982cd54187d71afdb2a271ad4cf8af157858e9cb911b91321de143"
dependencies = [
"core2",
"multibase",
"multihash",
"serde",
"unsigned-varint",
]
[[package]]
name = "clap"
version = "2.34.0"
@ -642,6 +688,24 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "core2"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505"
dependencies = [
"memchr",
]
[[package]]
name = "cpufeatures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320"
dependencies = [
"libc",
]
[[package]]
name = "cranelift-bforest"
version = "0.59.0"
@ -823,6 +887,16 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array 0.14.6",
"typenum",
]
[[package]]
name = "crypto-mac"
version = "0.8.0"
@ -944,6 +1018,32 @@ dependencies = [
"syn",
]
[[package]]
name = "data-encoding"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
[[package]]
name = "data-encoding-macro"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86927b7cd2fe88fa698b87404b287ab98d1a0063a34071d92e575b72d3029aca"
dependencies = [
"data-encoding",
"data-encoding-macro-internal",
]
[[package]]
name = "data-encoding-macro-internal"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5bbed42daaa95e780b60a50546aa345b8413a1e46f9a40a12907d3598f038db"
dependencies = [
"data-encoding",
"syn",
]
[[package]]
name = "diff"
version = "0.1.13"
@ -974,6 +1074,16 @@ dependencies = [
"generic-array 0.14.6",
]
[[package]]
name = "digest"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c"
dependencies = [
"block-buffer",
"crypto-common",
]
[[package]]
name = "dirs-next"
version = "2.0.0"
@ -1788,6 +1898,44 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "multibase"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404"
dependencies = [
"base-x",
"data-encoding",
"data-encoding-macro",
]
[[package]]
name = "multihash"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "835d6ff01d610179fbce3de1694d007e500bf33a7f29689838941d6bf783ae40"
dependencies = [
"core2",
"digest 0.10.5",
"multihash-derive",
"sha2",
"unsigned-varint",
]
[[package]]
name = "multihash-derive"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d6d4752e6230d8ef7adf7bd5d8c4b1f6561c1014c5ba9a37445ccefe18aa1db"
dependencies = [
"proc-macro-crate",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "multimap"
version = "0.8.3"
@ -2129,6 +2277,16 @@ dependencies = [
"yansi",
]
[[package]]
name = "proc-macro-crate"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e17d47ce914bf4de440332250b0edd23ce48c005f59fab39d3335866b114f11a"
dependencies = [
"thiserror",
"toml",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@ -2455,6 +2613,17 @@ dependencies = [
"syn",
]
[[package]]
name = "sha2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
dependencies = [
"cfg-if 1.0.0",
"cpufeatures",
"digest 0.10.5",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
@ -2540,6 +2709,18 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "synstructure"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"unicode-xid",
]
[[package]]
name = "target-lexicon"
version = "0.10.0"
@ -2778,6 +2959,12 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "unsigned-varint"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d86a8dc7f45e4c1b0d30e43038c38f274e77af056aa5f74b93c2cf9eb3c1c836"
[[package]]
name = "valuable"
version = "0.1.0"

View File

@ -6,6 +6,7 @@ members = [
"avm/server",
"crates/air-lib/air-parser",
"crates/air-lib/execution-info-collector",
"crates/air-lib/interpreter-cid",
"crates/air-lib/interpreter-data",
"crates/air-lib/interpreter-interface",
"crates/air-lib/lambda/ast",

View File

@ -1,6 +1,6 @@
[package]
name = "air-interpreter"
version = "0.32.0"
version = "0.33.0"
description = "Crate-wrapper for air"
authors = ["Fluence Labs"]
edition = "2018"

View File

@ -1,6 +1,6 @@
[package]
name = "air"
version = "0.32.0"
version = "0.33.0"
description = "Interpreter of AIR scripts intended to coordinate request flow in the Fluence network"
authors = ["Fluence Labs"]
edition = "2018"
@ -17,6 +17,7 @@ doctest = false
[dependencies]
air-parser = { path = "../crates/air-lib/air-parser" }
air-execution-info-collector = { path = "../crates/air-lib/execution-info-collector" }
air-interpreter-cid = { version = "0.1.0", path = "../crates/air-lib/interpreter-cid" }
air-interpreter-data = { path = "../crates/air-lib/interpreter-data" }
air-interpreter-interface = { path = "../crates/air-lib/interpreter-interface", default-features = false }
air-log-targets = { path = "../crates/air-lib/log-targets" }
@ -24,7 +25,7 @@ air-lambda-ast = { path = "../crates/air-lib/lambda/ast" }
air-lambda-parser = { path = "../crates/air-lib/lambda/parser" }
air-trace-handler = { path = "../crates/air-lib/trace-handler" }
air-utils = { version = "0.1.0", path = "../crates/air-lib/utils" }
polyplets = { path = "../crates/air-lib/polyplets" }
polyplets = { version = "0.3.2", path = "../crates/air-lib/polyplets" }
serde = { version = "1.0.147", features = [ "derive", "rc" ] }
serde_json = "1.0.89"

View File

@ -22,7 +22,7 @@ use crate::UncatchableError;
use air_interpreter_data::CallResult;
use air_interpreter_data::TracePos;
use air_interpreter_data::Value;
use air_interpreter_data::ValueRef;
use air_parser::ast::CallOutputValue;
use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler;
@ -32,11 +32,15 @@ pub(crate) fn populate_context_from_peer_service_result<'i>(
output: &CallOutputValue<'i>,
exec_ctx: &mut ExecutionCtx<'i>,
) -> ExecutionResult<CallResult> {
let result_value = executed_result.result.clone();
let cid = exec_ctx
.cid_tracker
.record_value(executed_result.result.clone())
.map_err(UncatchableError::from)?;
match output {
CallOutputValue::Scalar(scalar) => {
exec_ctx.scalars.set_scalar_value(scalar.name, executed_result)?;
Ok(CallResult::executed_scalar(result_value))
Ok(CallResult::executed_scalar(cid))
}
CallOutputValue::Stream(stream) => {
let value_descriptor = StreamValueDescriptor::new(
@ -47,30 +51,36 @@ pub(crate) fn populate_context_from_peer_service_result<'i>(
stream.position,
);
let generation = exec_ctx.streams.add_stream_value(value_descriptor)?;
Ok(CallResult::executed_stream(result_value, generation))
Ok(CallResult::executed_stream(cid, generation))
}
// by the internal conventions if call has no output value,
// corresponding data should have scalar type
CallOutputValue::None => Ok(CallResult::executed_scalar(result_value)),
CallOutputValue::None => Ok(CallResult::executed_scalar(cid)),
}
}
pub(crate) fn populate_context_from_data<'i>(
value: Value,
value: ValueRef,
tetraplet: RcSecurityTetraplet,
trace_pos: TracePos,
value_source: ValueSource,
output: &CallOutputValue<'i>,
exec_ctx: &mut ExecutionCtx<'i>,
) -> ExecutionResult<Value> {
) -> ExecutionResult<ValueRef> {
match (output, value) {
(CallOutputValue::Scalar(scalar), Value::Scalar(value)) => {
let result = ValueAggregate::new(value.clone(), tetraplet, trace_pos);
(CallOutputValue::Scalar(scalar), ValueRef::Scalar(cid)) => {
let value = exec_ctx
.get_value_by_cid(&cid)
.ok_or_else(|| UncatchableError::ValueForCidNotFound(cid.clone()))?;
let result = ValueAggregate::new(value, tetraplet, trace_pos);
exec_ctx.scalars.set_scalar_value(scalar.name, result)?;
Ok(Value::Scalar(value))
Ok(ValueRef::Scalar(cid))
}
(CallOutputValue::Stream(stream), Value::Stream { value, generation }) => {
let result = ValueAggregate::new(value.clone(), tetraplet, trace_pos);
(CallOutputValue::Stream(stream), ValueRef::Stream { cid, generation }) => {
let value = exec_ctx
.get_value_by_cid(&cid)
.ok_or_else(|| UncatchableError::ValueForCidNotFound(cid.clone()))?;
let result = ValueAggregate::new(value, tetraplet, trace_pos);
let value_descriptor = StreamValueDescriptor::new(
result,
stream.name,
@ -80,15 +90,15 @@ pub(crate) fn populate_context_from_data<'i>(
);
let resulted_generation = exec_ctx.streams.add_stream_value(value_descriptor)?;
let result = Value::Stream {
value,
let result = ValueRef::Stream {
cid,
generation: resulted_generation,
};
Ok(result)
}
// by the internal conventions if call has no output value,
// corresponding data should have scalar type
(CallOutputValue::None, value @ Value::Scalar(_)) => Ok(value),
(CallOutputValue::None, value @ ValueRef::Scalar(_)) => Ok(value),
(_, value) => Err(ExecutionError::Uncatchable(
UncatchableError::CallResultNotCorrespondToInstr(value),
)),

View File

@ -17,8 +17,10 @@
use crate::JValue;
use crate::ToErrorCode;
use air_interpreter_cid::CidCalculationError;
use air_interpreter_cid::CID;
use air_interpreter_data::TracePos;
use air_interpreter_data::Value;
use air_interpreter_data::ValueRef;
use air_trace_handler::merger::MergerApResult;
use air_trace_handler::GenerationCompatificationError;
use air_trace_handler::TraceHandlerError;
@ -27,6 +29,8 @@ use strum_macros::EnumDiscriminants;
use strum_macros::EnumIter;
use thiserror::Error as ThisError;
use std::rc::Rc;
/// Uncatchable errors arisen during AIR script execution. Uncatchable here means that these errors
/// couldn't be handled by a xor instruction and their error_code couldn't be used in a match
/// instruction. They are similar to JVM runtime errors and some of them could be caught only
@ -66,7 +70,7 @@ pub enum UncatchableError {
/// Errors occurred when result from data doesn't match to a call instruction, f.e. a call
/// could be applied to a stream, but result doesn't contain generation in a source position.
#[error("call result value {0:?} doesn't match with corresponding instruction")]
CallResultNotCorrespondToInstr(Value),
CallResultNotCorrespondToInstr(ValueRef),
/// Variable shadowing is not allowed, usually it's thrown when a AIR tries to assign value
/// for a variable not in a fold block or in a global scope but not right after new.
@ -90,6 +94,12 @@ pub enum UncatchableError {
canonicalized_stream: JValue,
de_error: serde_json::Error,
},
#[error("failed to calculate value's CID")]
CidError(#[from] CidCalculationError),
#[error("value for CID {0:?} not found")]
ValueForCidNotFound(Rc<CID>),
}
impl ToErrorCode for UncatchableError {

View File

@ -18,9 +18,14 @@ use super::LastError;
use super::LastErrorDescriptor;
use super::Scalars;
use super::Streams;
use crate::JValue;
use air_execution_info_collector::InstructionTracker;
use air_interpreter_data::InterpreterData;
use air_interpreter_cid::CID;
use air_interpreter_data::CidStore;
use air_interpreter_data::CidTracker;
use air_interpreter_data::GlobalStreamGens;
use air_interpreter_data::RestrictedStreamGens;
use air_interpreter_interface::*;
use std::rc::Rc;
@ -63,29 +68,35 @@ pub(crate) struct ExecutionCtx<'i> {
/// Tracks all functions that should be called from services.
pub(crate) call_requests: CallRequests,
/// Merged CID-to-value dictionaries
pub(crate) cid_tracker: CidTracker,
}
impl<'i> ExecutionCtx<'i> {
pub(crate) fn new(
prev_data: &InterpreterData,
current_data: &InterpreterData,
prev_ingredients: ExecCtxIngredients,
current_ingredients: ExecCtxIngredients,
call_results: CallResults,
run_parameters: RunParameters,
) -> Self {
let run_parameters = RcRunParameters::from_run_parameters(run_parameters);
let streams = Streams::from_data(
&prev_data.global_streams,
&current_data.global_streams,
prev_data.restricted_streams.clone(),
current_data.restricted_streams.clone(),
prev_ingredients.global_streams,
current_ingredients.global_streams,
prev_ingredients.restricted_streams,
current_ingredients.restricted_streams,
);
let cid_tracker = CidTracker::from_cid_stores(prev_ingredients.cid_store, current_ingredients.cid_store);
Self {
run_parameters,
subgraph_complete: true,
last_call_request_id: prev_data.last_call_request_id,
last_call_request_id: prev_ingredients.last_call_request_id,
call_results,
streams,
cid_tracker,
..<_>::default()
}
}
@ -98,6 +109,19 @@ impl<'i> ExecutionCtx<'i> {
self.last_call_request_id += 1;
self.last_call_request_id
}
pub(crate) fn get_value_by_cid(&self, cid: &CID) -> Option<Rc<JValue>> {
self.cid_tracker.get(cid)
}
}
/// Helper struct for ExecCtx construction.
#[derive(Debug, Clone)]
pub(crate) struct ExecCtxIngredients {
pub(crate) global_streams: GlobalStreamGens,
pub(crate) last_call_request_id: u32,
pub(crate) restricted_streams: RestrictedStreamGens,
pub(crate) cid_store: CidStore<JValue>,
}
use serde::Deserialize;

View File

@ -55,8 +55,8 @@ pub(crate) struct Streams {
impl Streams {
pub(crate) fn from_data(
previous_global_streams: &GlobalStreamGens,
current_global_streams: &GlobalStreamGens,
previous_global_streams: GlobalStreamGens,
current_global_streams: GlobalStreamGens,
previous_restricted_stream_gens: RestrictedStreamGens,
current_restricted_stream_gens: RestrictedStreamGens,
) -> Self {

View File

@ -22,8 +22,8 @@ use air_interpreter_data::GlobalStreamGens;
use std::collections::HashMap;
pub(super) fn merge_global_streams(
previous_global_streams: &GlobalStreamGens,
current_global_streams: &GlobalStreamGens,
previous_global_streams: GlobalStreamGens,
current_global_streams: GlobalStreamGens,
) -> HashMap<String, Vec<StreamDescriptor>> {
let mut global_streams = previous_global_streams
.iter()
@ -35,14 +35,14 @@ pub(super) fn merge_global_streams(
})
.collect::<HashMap<_, _>>();
for (stream_name, &current_gens_count) in current_global_streams {
if previous_global_streams.contains_key(stream_name) {
for (stream_name, current_gens_count) in current_global_streams {
if previous_global_streams.contains_key(&stream_name) {
continue;
}
let global_stream = Stream::from_generations_count(0, current_gens_count as usize);
let descriptor = StreamDescriptor::global(global_stream);
global_streams.insert(stream_name.clone(), vec![descriptor]);
global_streams.insert(stream_name, vec![descriptor]);
}
global_streams

View File

@ -59,13 +59,7 @@ pub(crate) fn from_uncatchable_error(
let data = data.into();
let call_requests = serde_json::to_vec(&CallRequests::new()).expect("default serializer shouldn't fail");
InterpreterOutcome {
ret_code,
error_message: error.to_string(),
data,
next_peer_pks: vec![],
call_requests,
}
InterpreterOutcome::new(ret_code, error.to_string(), data, vec![], call_requests)
}
/// Create InterpreterOutcome from supplied execution context, trace handler, and error,
@ -99,6 +93,7 @@ fn populate_outcome_from_contexts(
trace_handler.into_result_trace(),
global_streams,
restricted_streams,
exec_ctx.cid_tracker,
exec_ctx.last_call_request_id,
semver::Version::parse(env!("CARGO_PKG_VERSION")).expect("cargo version is valid"),
);

View File

@ -15,6 +15,7 @@
*/
use super::PreparationError;
use crate::execution_step::execution_context::ExecCtxIngredients;
use crate::execution_step::ExecutionCtx;
use crate::execution_step::TraceHandler;
@ -47,8 +48,22 @@ pub(crate) fn prepare<'i>(
let air: Instruction<'i> = *air_parser::parse(raw_air).map_err(PreparationError::AIRParseError)?;
let exec_ctx = make_exec_ctx(&prev_data, &current_data, call_results, run_parameters)?;
let trace_handler = TraceHandler::from_data(prev_data, current_data);
let prev_ingredients = ExecCtxIngredients {
global_streams: prev_data.global_streams,
last_call_request_id: prev_data.last_call_request_id,
restricted_streams: prev_data.restricted_streams,
cid_store: prev_data.cid_store,
};
let current_ingredients = ExecCtxIngredients {
global_streams: current_data.global_streams,
last_call_request_id: current_data.last_call_request_id,
restricted_streams: current_data.restricted_streams,
cid_store: current_data.cid_store,
};
let exec_ctx = make_exec_ctx(prev_ingredients, current_ingredients, call_results, run_parameters)?;
let trace_handler = TraceHandler::from_trace(prev_data.trace, current_data.trace);
let result = PreparationDescriptor {
exec_ctx,
@ -68,15 +83,15 @@ fn try_to_data(raw_data: &[u8]) -> PreparationResult<InterpreterData> {
#[tracing::instrument(skip_all)]
fn make_exec_ctx(
prev_data: &InterpreterData,
current_data: &InterpreterData,
prev_ingredients: ExecCtxIngredients,
current_ingredients: ExecCtxIngredients,
call_results: &[u8],
run_parameters: RunParameters,
) -> PreparationResult<ExecutionCtx<'static>> {
let call_results = serde_json::from_slice(call_results)
.map_err(|e| PreparationError::CallResultsDeFailed(e, call_results.to_vec()))?;
let ctx = ExecutionCtx::new(prev_data, current_data, call_results, run_parameters);
let ctx = ExecutionCtx::new(prev_ingredients, current_ingredients, call_results, run_parameters);
Ok(ctx)
}

View File

@ -46,6 +46,7 @@ pub fn execute_air(
execute_air_impl(air, prev_data, data, params, call_results).unwrap_or_else(identity)
}
#[allow(clippy::result_large_err)]
fn execute_air_impl(
air: String,
prev_data: Vec<u8>,

View File

@ -0,0 +1,121 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air_interpreter_data::CidTracker;
use air_test_framework::AirScriptExecutor;
use air_test_utils::prelude::*;
#[test]
fn test_missing_cid() {
let vm_peer_id = "vm_peer_id";
let mut vm = create_avm(echo_call_service(), vm_peer_id);
let air_script = r#"
(seq
(call "peer_id" ("service" "call1") [] x)
(call "peer_id" ("service" "call2") []))"#;
let trace = vec![scalar_number(42), scalar_number(43)];
let mut tracker = CidTracker::<JValue>::new();
tracker.record_value(json!(43)).unwrap();
let cur_data = raw_data_from_trace(trace, tracker);
let result = call_vm!(vm, <_>::default(), air_script, vec![], cur_data);
assert_eq!(result.ret_code, 20012);
assert_eq!(
result.error_message,
"value for CID CID(\"bagaaieraondvznakk2hi3kfaixhnceatpykz7cikytniqo3lc7ogkgz2qbeq\") not found"
);
}
#[test]
fn test_correct_cid() {
let vm_peer_id = "vm_peer_id";
let mut vm = create_avm(echo_call_service(), vm_peer_id);
let air_script = r#"
(seq
(call "peer_id" ("service" "call1") [] x)
(call "peer_id" ("service" "call2") [] y))"#;
let trace = vec![scalar_number(42), scalar_number(43)];
let mut tracker = CidTracker::<JValue>::new();
tracker.record_value(json!(43)).unwrap();
tracker.record_value(json!(42)).unwrap();
let cur_data = raw_data_from_trace(trace, tracker);
let result = call_vm!(vm, <_>::default(), air_script, vec![], cur_data);
assert_eq!(result.ret_code, 0);
}
#[test]
fn test_scalar_cid() {
let vm_peer_id = "vm_peer_id";
let annotated_air_script = format!(
r#"
(seq
(call "{vm_peer_id}" ("service" "call1") [] x) ; ok="hi"
(call "{vm_peer_id}" ("service" "call2") [] y) ; ok="ipld"
)"#
);
let executor = AirScriptExecutor::new(
TestRunParameters::from_init_peer_id(vm_peer_id),
vec![],
std::iter::empty(),
&annotated_air_script,
)
.unwrap();
let result = executor.execute_one(vm_peer_id).unwrap();
let data = data_from_result(&result);
let mut tracker = CidTracker::<JValue>::new();
let expected_trace = vec![scalar_tracked("hi", &mut tracker), scalar_tracked("ipld", &mut tracker)];
assert_eq!(result.ret_code, 0);
assert_eq!(data.trace, expected_trace);
assert_eq!(data.cid_store, tracker.into());
}
#[test]
fn test_stream_cid() {
let vm_peer_id = "vm_peer_id";
let annotated_air_script = format!(
r#"
(seq
(call "{vm_peer_id}" ("service" "call1") [] $x) ; ok="hi"
(call "{vm_peer_id}" ("service" "call2") [] $x) ; ok="ipld"
)"#
);
let executor = AirScriptExecutor::new(
TestRunParameters::from_init_peer_id(vm_peer_id),
vec![],
std::iter::empty(),
&annotated_air_script,
)
.unwrap();
let result = executor.execute_one(vm_peer_id).unwrap();
let data = data_from_result(&result);
let mut tracker = CidTracker::<JValue>::new();
let expected_trace = vec![
stream_tracked("hi", 0, &mut tracker),
stream_tracked("ipld", 1, &mut tracker),
];
assert_eq!(result.ret_code, 0);
assert_eq!(data.trace, expected_trace);
assert_eq!(data.cid_store, tracker.into());
}

View File

@ -200,8 +200,6 @@ fn stream_merge() {
#[test]
fn fold_merge() {
use std::ops::Deref;
let set_variable_vm_id = "set_variable";
let local_vm_id = "local_vm";
@ -312,14 +310,15 @@ fn fold_merge() {
for subtrace_lore in fold.lore.iter() {
let value_pos = subtrace_lore.value_pos;
if let ExecutedState::Call(CallResult::Executed(value)) = &data.trace[value_pos] {
let value = match value {
Value::Scalar(value) => value,
Value::Stream { value, .. } => value,
let cid = match value {
ValueRef::Scalar(cid) => cid,
ValueRef::Stream { cid, .. } => cid,
};
if let JValue::String(var_name) = value.deref() {
let value = data.cid_store.get(cid).unwrap().clone();
if let JValue::String(ref var_name) = &*value {
let current_count: usize = calls_count.get(var_name).copied().unwrap_or_default();
calls_count.insert(var_name, current_count + 1);
calls_count.insert(var_name.to_owned(), current_count + 1);
}
}
}

View File

@ -14,6 +14,7 @@
* limitations under the License.
*/
use air_interpreter_data::CidTracker;
use air_test_utils::prelude::*;
#[test]
@ -30,13 +31,14 @@ fn executed_trace_seq_par_call() {
(call "{local_peer_id}" ("local_service_id" "local_fn_name") [] result_2)
)"#);
let mut tracker = CidTracker::new();
let unit_call_service_result = "result from unit_call_service";
let initial_trace = vec![
par(1, 1),
scalar_string(unit_call_service_result),
scalar_string(unit_call_service_result),
scalar_tracked(unit_call_service_result, &mut tracker),
scalar_tracked(unit_call_service_result, &mut tracker),
];
let initial_data = raw_data_from_trace(initial_trace);
let initial_data = raw_data_from_trace(initial_trace, tracker);
let result = checked_call_vm!(vm, <_>::default(), script, "", initial_data);
let actual_trace = trace_from_result(&result);
@ -67,14 +69,15 @@ fn executed_trace_par_par_call() {
)"#);
let unit_call_service_result = "result from unit_call_service";
let mut tracker = CidTracker::new();
let initial_state = vec![
par(2, 1),
par(1, 0),
request_sent_by("peer_id_1"),
scalar_string(unit_call_service_result),
scalar_tracked(unit_call_service_result, &mut tracker),
];
let initial_data = raw_data_from_trace(initial_state);
let initial_data = raw_data_from_trace(initial_state, tracker);
let result = checked_call_vm!(vm, <_>::default(), &script, "", initial_data);
let actual_trace = trace_from_result(&result);
@ -97,7 +100,7 @@ fn executed_trace_par_par_call() {
request_sent_by(local_peer_id),
];
let initial_data = raw_data_from_trace(initial_state);
let initial_data = raw_data_from_trace(initial_state, <_>::default());
let result = checked_call_vm!(vm, <_>::default(), script, "", initial_data);
let actual_trace = trace_from_result(&result);
@ -179,19 +182,21 @@ fn executed_trace_create_service() {
let script = include_str!("./scripts/create_service.air");
let add_module_response = String::from("add_module response");
let add_blueprint_response = String::from("add_blueprint response");
let create_response = String::from("create response");
let mut cid_tracker = CidTracker::new();
let add_module_response = "add_module response";
let add_blueprint_response = "add_blueprint response";
let create_response = "create response";
let expected_trace = vec![
scalar(module_bytes),
scalar(module_config),
scalar(blueprint),
scalar_string(add_module_response),
scalar_string(add_blueprint_response),
scalar_string(create_response),
scalar_string("test"),
scalar_tracked(module_bytes.clone(), &mut cid_tracker),
scalar_tracked(module_config.clone(), &mut cid_tracker),
scalar_tracked(blueprint.clone(), &mut cid_tracker),
scalar_tracked(add_module_response, &mut cid_tracker),
scalar_tracked(add_blueprint_response, &mut cid_tracker),
scalar_tracked(create_response, &mut cid_tracker),
scalar_tracked("test", &mut cid_tracker),
];
let initial_data = raw_data_from_trace(expected_trace.clone());
let initial_data = raw_data_from_trace(expected_trace.clone(), cid_tracker);
let result = checked_call_vm!(vm, <_>::default(), script, "", initial_data);

View File

@ -14,6 +14,7 @@
* limitations under the License.
*/
mod cid;
mod data_merging;
mod errors;
mod join_behaviour;

View File

@ -15,6 +15,9 @@
*/
use air::UncatchableError;
use air_interpreter_cid::value_to_json_cid;
use air_interpreter_data::CidTracker;
use air_interpreter_data::ValueRef;
use air_test_utils::prelude::*;
use air_trace_handler::merger::CallResultError;
use air_trace_handler::merger::MergeError;
@ -127,8 +130,9 @@ fn par_early_exit() {
];
assert_eq!(actual_trace_3, expected_trace);
let mut setter_3_tracker = CidTracker::new();
let setter_3_malicious_trace = vec![
executed_state::scalar_string("result from unit_call_service"),
executed_state::scalar_tracked("result from unit_call_service", &mut setter_3_tracker),
executed_state::par(10, 0),
executed_state::par(9, 0),
executed_state::par(7, 1),
@ -137,12 +141,12 @@ fn par_early_exit() {
executed_state::par(1, 1),
executed_state::request_sent_by(init_peer_id),
executed_state::request_sent_by(init_peer_id),
executed_state::stream_string("non_exist_value", 0),
executed_state::stream_string("success result from fallible_call_service", 0),
executed_state::stream_tracked("non_exist_value", 0, &mut setter_3_tracker),
executed_state::stream_tracked("success result from fallible_call_service", 0, &mut setter_3_tracker),
executed_state::service_failed(1, "failed result from fallible_call_service"),
executed_state::request_sent_by(setter_3_id),
];
let setter_3_malicious_data = raw_data_from_trace(setter_3_malicious_trace);
let setter_3_malicious_data = raw_data_from_trace(setter_3_malicious_trace, setter_3_tracker);
let init_result_3 = call_vm!(
init,
<_>::default(),
@ -151,16 +155,18 @@ fn par_early_exit() {
setter_3_malicious_data
);
let prev_value = ValueRef::Stream {
cid: value_to_json_cid(&json!("1")).unwrap().into(),
generation: 1,
};
let current_value = ValueRef::Stream {
cid: value_to_json_cid(&json!("non_exist_value")).unwrap().into(),
generation: 0,
};
let expected_error = UncatchableError::TraceError {
trace_error: TraceHandlerError::MergeError(MergeError::IncorrectCallResult(CallResultError::ValuesNotEqual {
prev_value: Value::Stream {
value: rc!(json!("1")),
generation: 1,
},
current_value: Value::Stream {
value: rc!(json!("non_exist_value")),
generation: 0,
},
prev_value,
current_value,
})),
instruction: r#"call "setter_1" ("" "") [] $stream"#.to_string(),
};

View File

@ -14,11 +14,16 @@
* limitations under the License.
*/
use air_interpreter_data::CidTracker;
use air_test_utils::prelude::*;
use std::rc::Rc;
#[test]
fn seq_remote_remote() {
let mut vm = create_avm(unit_call_service(), "");
let mut cid_tracker = CidTracker::new();
cid_tracker.record_value(Rc::new("".into())).unwrap();
let script = r#"
(seq
@ -30,7 +35,7 @@ fn seq_remote_remote() {
assert_eq!(result.next_peer_pks, vec![String::from("remote_peer_id_1")]);
let initial_trace = vec![executed_state::scalar_string("")];
let initial_data = raw_data_from_trace(initial_trace);
let initial_data = raw_data_from_trace(initial_trace, cid_tracker.into());
let result = checked_call_vm!(vm, <_>::default(), script, "", initial_data);

View File

@ -15,6 +15,8 @@
*/
use air::UncatchableError;
use air_interpreter_cid::value_to_json_cid;
use air_interpreter_data::CidTracker;
use air_test_utils::prelude::*;
use air_trace_handler::merger::MergeError;
use air_trace_handler::TraceHandlerError;
@ -34,16 +36,19 @@ fn issue_295() {
)
"#);
let mut cid_tracker = CidTracker::new();
cid_tracker.record_value(Rc::new("".into())).unwrap();
let prev_trace = vec![executed_state::scalar_string(""), executed_state::ap(1)];
let current_trace = vec![executed_state::scalar_string(""), executed_state::scalar_string("")];
let prev_data = raw_data_from_trace(prev_trace);
let current_data = raw_data_from_trace(current_trace);
let prev_data = raw_data_from_trace(prev_trace, cid_tracker.clone().into());
let current_data = raw_data_from_trace(current_trace, cid_tracker.into());
let result = call_vm!(vm, <_>::default(), &script, prev_data, current_data);
let cid = value_to_json_cid(&json!("")).unwrap().into();
let expected_error = UncatchableError::TraceError {
trace_error: TraceHandlerError::MergeError(MergeError::IncompatibleExecutedStates(
ExecutedState::Ap(ApResult::new(1)),
ExecutedState::Call(CallResult::Executed(Value::Scalar(Rc::new(json!(""))))),
ExecutedState::Call(CallResult::Executed(ValueRef::Scalar(cid))),
)),
instruction: "ap scalar $stream".to_string(),
};

View File

@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.28.0] - 2022-12-21
### Changed
- Bump version of `avm-interpreter-interface` to 0.12.0:
it has a new `cid` field in the `InterpreterOutcome`.
## [0.27.0] - 2022-11-22
### Added

View File

@ -1,7 +1,7 @@
[package]
name = "avm-interface"
description = "Fluence AIR VM interfacing"
version = "0.27.0"
version = "0.28.0"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"
@ -15,7 +15,7 @@ name = "avm_interface"
path = "src/lib.rs"
[dependencies]
air-interpreter-interface = { version = "0.11.2", path = "../../crates/air-lib/interpreter-interface", default-features = false }
air-interpreter-interface = { version = "0.12.0", path = "../../crates/air-lib/interpreter-interface", default-features = false }
air-utils = { version = "0.1.0", path = "../../crates/air-lib/utils" }
avm-data-store = { version = "0.4.1", path = "../../crates/data-store" }
polyplets = { version = "0.3.2", path = "../../crates/air-lib/polyplets" }

View File

@ -41,6 +41,7 @@ impl RawAVMOutcome {
data,
call_requests,
next_peer_pks,
cid: _,
} = outcome;
let call_requests = crate::from_raw_call_requests(call_requests)?;

View File

@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.28.0] - 2022-12-21
+ Update `avm-interface` version after `air-interpreter-interface` version bump.
## [0.27.0] - 2022-11-22
### Changed

View File

@ -1,7 +1,7 @@
[package]
name = "avm-server"
description = "Fluence AIR VM"
version = "0.27.0"
version = "0.28.0"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"
@ -15,12 +15,12 @@ name = "avm_server"
path = "src/lib.rs"
[dependencies]
air-interpreter-interface = { version = "0.11.2", path = "../../crates/air-lib/interpreter-interface" }
air-interpreter-interface = { version = "0.12.0", path = "../../crates/air-lib/interpreter-interface" }
air-utils = { version = "0.1.0", path = "../../crates/air-lib/utils" }
avm-data-store = { version = "0.4.1", path = "../../crates/data-store" }
marine-runtime = "0.23.1"
polyplets = { version = "0.3.2", path = "../../crates/air-lib/polyplets" }
avm-interface = { version = "0.27.0", path = "../../avm/interface" }
avm-interface = { version = "0.28.0", path = "../../avm/interface" }
eyre = "0.6.8"
thiserror = "1.0.37"

View File

@ -0,0 +1,17 @@
[package]
name = "air-interpreter-cid"
description = "AIR interpreter CID util module"
version = "0.1.0"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"
documentation = "https://docs.rs/air-interpreter-cid"
repository = "https://github.com/fluencelabs/aquavm/tree/master/crates/air-lib/interpreter-cid"
keywords = ["fluence", "air", "webassembly", "programming-language", "cid", "ipld"]
categories = ["wasm"]
[dependencies]
cid = { version = "0.9.0", default-features = false, features = ["std"] }
multihash = { version = "0.17.0", default-features = false, features = ["multihash-impl", "std", "sha2"] }
serde = "1.0.147"
serde_json = "1.0.89"

View File

@ -0,0 +1,100 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#![forbid(unsafe_code)]
#![warn(rust_2018_idioms)]
#![deny(
dead_code,
nonstandard_style,
unused_imports,
unused_mut,
unused_variables,
unused_unsafe,
unreachable_patterns
)]
use serde::Deserialize;
use serde::Serialize;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
#[serde(transparent)]
pub struct CID(String);
impl CID {
pub fn new(cid: impl Into<String>) -> Self {
CID(cid.into())
}
pub fn into_inner(self) -> String {
self.0
}
}
impl From<CID> for String {
fn from(value: CID) -> Self {
value.0
}
}
// TODO we might refactor this to `SerializationFormat` trait
// that both transform data to binary/text form (be it JSON, CBOR or something else)
// and produces CID too
pub fn json_data_cid(data: &[u8]) -> CID {
use cid::Cid;
use multihash::{Code, MultihashDigest};
// the Sha2_256 is current IPFS default hash
let digest = Code::Sha2_256.digest(data);
// seems to be better than RAW_CODEC = 0x55
const JSON_CODEC: u64 = 0x0200;
let cid = Cid::new_v1(JSON_CODEC, digest);
CID(cid.to_string())
}
pub struct CidCalculationError(serde_json::Error);
use std::fmt;
impl fmt::Debug for CidCalculationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl fmt::Display for CidCalculationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
impl From<serde_json::Error> for CidCalculationError {
fn from(source: serde_json::Error) -> Self {
Self(source)
}
}
impl std::error::Error for CidCalculationError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.0)
}
}
/// Calculate a CID of JSON-serialized value.
pub fn value_to_json_cid<Val: Serialize>(value: &Val) -> Result<CID, CidCalculationError> {
let data = serde_json::to_vec(value)?;
Ok(json_data_cid(&data))
}

View File

@ -1,3 +1,8 @@
## Version 0.5.0
- Call result values are stored as CIDs in the data trace. These CIDs refer
to a new `cid_store` data's field that maps a CID string to a value.
## Version 0.4.1
[PR 367](https://github.com/fluencelabs/aquavm/pull/367):

View File

@ -1,7 +1,7 @@
[package]
name = "air-interpreter-data"
description = "Data format of the AIR interpreter"
version = "0.4.1"
version = "0.5.0"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"
@ -16,6 +16,9 @@ path = "src/lib.rs"
[dependencies]
air-utils = { path = "../utils" }
air-parser = { path = "../air-parser" }
# TODO version?
air-interpreter-interface = { path = "../interpreter-interface" }
air-interpreter-cid = { version = "0.1.0", path = "../interpreter-cid" }
serde = {version = "1.0.147", features = ["derive", "rc"]}
serde_json = "1.0.89"

View File

@ -0,0 +1,205 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::JValue;
use air_interpreter_cid::value_to_json_cid;
use air_interpreter_cid::CidCalculationError;
use air_interpreter_cid::CID;
use serde::Deserialize;
use serde::Serialize;
use std::{collections::HashMap, rc::Rc};
/// Stores CID to Value corresponance.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(transparent)]
pub struct CidStore<Val>(HashMap<Rc<CID>, Rc<Val>>);
impl<Val> CidStore<Val> {
pub fn new() -> Self {
Self::default()
}
pub fn get(&self, cid: &CID) -> Option<Rc<Val>> {
self.0.get(cid).cloned()
}
}
impl<Val> Default for CidStore<Val> {
fn default() -> Self {
Self(Default::default())
}
}
#[derive(Clone, Debug)]
pub struct CidTracker<Val = JValue> {
cids: HashMap<Rc<CID>, Rc<Val>>,
}
impl<Val> CidTracker<Val> {
pub fn new() -> Self {
Self::default()
}
pub fn from_cid_stores(prev_cid_map: CidStore<Val>, current_cid_map: CidStore<Val>) -> Self {
let mut cids = prev_cid_map.0;
for (cid, val) in current_cid_map.0 {
// TODO check that values matches?
cids.insert(cid, val);
}
Self { cids }
}
pub fn get(&self, cid: &CID) -> Option<Rc<Val>> {
self.cids.get(cid).cloned()
}
}
impl<Val: Serialize> CidTracker<Val> {
pub fn record_value(
&mut self,
value: impl Into<Rc<Val>>,
) -> Result<Rc<CID>, CidCalculationError> {
let value = value.into();
let cid = Rc::new(value_to_json_cid(&value)?);
self.cids.insert(cid.clone(), value);
Ok(cid)
}
}
impl<Val> Default for CidTracker<Val> {
fn default() -> Self {
Self {
cids: Default::default(),
}
}
}
impl<Val> From<CidTracker<Val>> for CidStore<Val> {
fn from(value: CidTracker<Val>) -> Self {
Self(value.cids)
}
}
impl<Val> IntoIterator for CidStore<Val> {
type Item = (Rc<CID>, Rc<Val>);
type IntoIter = std::collections::hash_map::IntoIter<Rc<CID>, Rc<Val>>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[cfg(test)]
mod tests {
use std::iter::FromIterator;
use super::*;
use serde_json::json;
#[test]
fn test_iter() {
let mut tracker = CidTracker::new();
tracker.record_value(json!("test")).unwrap();
tracker.record_value(json!(1)).unwrap();
tracker.record_value(json!([1, 2, 3])).unwrap();
tracker
.record_value(json!({
"key": 42,
}))
.unwrap();
let store = CidStore::from(tracker);
assert_eq!(
store.into_iter().collect::<HashMap<_, _>>(),
HashMap::from_iter(vec![
(
CID::new("bagaaierajwlhumardpzj6dv2ahcerm3vyfrjwl7nahg7zq5o3eprwv6v3vpa")
.into(),
json!("test").into()
),
(
CID::new("bagaaierauyk65lxcdxsrphpaqdpiymcszdnjaejyibv2ohbyyaziix35kt2a")
.into(),
json!([1, 2, 3]).into(),
),
(
CID::new("bagaaieranodle477gt6odhllqbhp6wr7k5d23jhkuixr2soadzjn3n4hlnfq")
.into(),
json!(1).into(),
),
(
CID::new("bagaaierad7lci6475zdrps4h6fmcpmqyknz5z6bw6p6tmpjkfyueavqw4kaq")
.into(),
json!({
"key": 42,
})
.into(),
)
])
);
}
#[test]
fn test_store() {
let mut tracker = CidTracker::new();
tracker.record_value(json!("test")).unwrap();
tracker.record_value(json!(1)).unwrap();
tracker.record_value(json!([1, 2, 3])).unwrap();
tracker
.record_value(json!({
"key": 42,
}))
.unwrap();
let store = CidStore::from(tracker);
assert_eq!(
&*store
.get(&CID::new(
"bagaaierajwlhumardpzj6dv2ahcerm3vyfrjwl7nahg7zq5o3eprwv6v3vpa"
))
.unwrap(),
&json!("test"),
);
assert_eq!(
&*store
.get(&CID::new(
"bagaaierauyk65lxcdxsrphpaqdpiymcszdnjaejyibv2ohbyyaziix35kt2a"
))
.unwrap(),
&json!([1, 2, 3]),
);
assert_eq!(
&*store
.get(&CID::new(
"bagaaieranodle477gt6odhllqbhp6wr7k5d23jhkuixr2soadzjn3n4hlnfq"
))
.unwrap(),
&json!(1),
);
assert_eq!(
&*store
.get(&CID::new(
"bagaaierad7lci6475zdrps4h6fmcpmqyknz5z6bw6p6tmpjkfyueavqw4kaq"
))
.unwrap(),
&json!({"key": 42}),
);
assert_eq!(store.get(&CID::new("loremimpsumdolorsitament")), None,);
}
}

View File

@ -17,13 +17,15 @@
mod impls;
mod se_de;
use crate::JValue;
use crate::TracePos;
use air_interpreter_cid::CID;
use se_de::par_serializer;
use se_de::sender_serializer;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JValue;
use std::fmt::Formatter;
use std::rc::Rc;
@ -48,7 +50,7 @@ pub enum CallResult {
RequestSentBy(Sender),
/// A corresponding call's been already executed with such value as a result.
Executed(Value),
Executed(ValueRef),
/// call_service ended with a service error.
#[serde(rename = "failed")]
@ -57,9 +59,9 @@ pub enum CallResult {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Value {
Scalar(Rc<JValue>),
Stream { value: Rc<JValue>, generation: u32 },
pub enum ValueRef {
Scalar(Rc<CID>),
Stream { cid: Rc<CID>, generation: u32 },
}
/// Let's consider an example of trace that could be produces by the following fold:

View File

@ -41,14 +41,14 @@ impl CallResult {
CallResult::RequestSentBy(Sender::PeerIdWithCallId { peer_id, call_id })
}
pub fn executed_scalar(value: Rc<JValue>) -> CallResult {
let value = Value::Scalar(value);
pub fn executed_scalar(cid: Rc<CID>) -> CallResult {
let value = ValueRef::Scalar(cid);
CallResult::Executed(value)
}
pub fn executed_stream(value: Rc<JValue>, generation: u32) -> CallResult {
let value = Value::Stream { value, generation };
pub fn executed_stream(cid: Rc<CID>, generation: u32) -> CallResult {
let value = ValueRef::Stream { cid, generation };
CallResult::Executed(value)
}
@ -136,12 +136,12 @@ impl std::fmt::Display for ExecutedState {
}
}
impl std::fmt::Display for Value {
impl std::fmt::Display for ValueRef {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Value::Scalar(value) => write!(f, "scalar: {value}"),
Value::Stream { value, generation } => {
write!(f, "stream: {value} generation: {generation}")
ValueRef::Scalar(cid) => write!(f, "scalar: {cid:?}"),
ValueRef::Stream { cid, generation } => {
write!(f, "stream: {cid:?} generation: {generation}")
}
}
}

View File

@ -16,7 +16,10 @@
use super::GlobalStreamGens;
use super::RestrictedStreamGens;
use crate::cid_store::CidStore;
use crate::ExecutionTrace;
use crate::JValue;
use air_utils::measure;
use serde::Deserialize;
@ -53,6 +56,9 @@ pub struct InterpreterData {
/// Version of interpreter produced this data.
pub interpreter_version: semver::Version,
/// Map CID to values
pub cid_store: CidStore<JValue>,
}
impl InterpreterData {
@ -64,6 +70,7 @@ impl InterpreterData {
last_call_request_id: 0,
restricted_streams: RestrictedStreamGens::new(),
interpreter_version,
cid_store: <_>::default(),
}
}
@ -71,9 +78,12 @@ impl InterpreterData {
trace: ExecutionTrace,
streams: GlobalStreamGens,
restricted_streams: RestrictedStreamGens,
cid_store: impl Into<CidStore<JValue>>,
last_call_request_id: u32,
interpreter_version: semver::Version,
) -> Self {
let cid_store = cid_store.into();
Self {
trace,
global_streams: streams,
@ -81,6 +91,7 @@ impl InterpreterData {
last_call_request_id,
restricted_streams,
interpreter_version,
cid_store,
}
}

View File

@ -26,12 +26,14 @@
unreachable_patterns
)]
mod cid_store;
mod executed_state;
mod interpreter_data;
mod stream_generations;
mod trace;
mod trace_pos;
pub use cid_store::*;
pub use executed_state::*;
pub use interpreter_data::*;
pub use stream_generations::*;
@ -39,6 +41,8 @@ pub use trace::*;
pub use trace_pos::*;
use once_cell::sync::Lazy;
use serde_json::Value as JValue;
use std::str::FromStr;
static DATA_FORMAT_VERSION: Lazy<semver::Version> = Lazy::new(|| {

View File

@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.12.0] - 2022-12-21
+ New `cid` field of the `InterpreterOutcome` contains CID of the data.
## [0.11.1] - 2022-09-13
### Other

View File

@ -1,7 +1,7 @@
[package]
name = "air-interpreter-interface"
description = "Interface of the AIR interpreter"
version = "0.11.2"
version = "0.12.0"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"
@ -15,6 +15,7 @@ name = "air_interpreter_interface"
path = "src/lib.rs"
[dependencies]
air-interpreter-cid = { version = "0.1.0", path = "../interpreter-cid" }
marine-rs-sdk = { version = "0.7.1", optional = true }
fluence-it-types = { version = "0.3.2", optional = true }

View File

@ -43,6 +43,9 @@ pub struct InterpreterOutcome {
/// Collected parameters of all met call instructions that could be executed on a current peer.
pub call_requests: Vec<u8>,
/// IPLD CID of the data field.
pub cid: String,
}
impl InterpreterOutcome {
@ -53,12 +56,15 @@ impl InterpreterOutcome {
next_peer_pks: Vec<String>,
call_requests: Vec<u8>,
) -> Self {
let cid = air_interpreter_cid::json_data_cid(&data).into();
Self {
ret_code,
error_message,
data,
next_peer_pks,
call_requests,
cid,
}
}
}
@ -81,13 +87,7 @@ impl InterpreterOutcome {
let error_message = try_as_string(record_values.pop().unwrap(), "error_message")?;
let ret_code = try_as_i64(record_values.pop().unwrap(), "ret_code")?;
let outcome = Self {
ret_code,
error_message,
data,
next_peer_pks,
call_requests,
};
let outcome = Self::new(ret_code, error_message, data, next_peer_pks, call_requests);
Ok(outcome)
}

View File

@ -15,6 +15,8 @@ path = "src/lib.rs"
[dependencies]
air = { path = "../../../air" }
air-interpreter-cid = { version = "0.1.0", path = "../interpreter-cid" }
air-interpreter-data = { path = "../interpreter-data" }
air-interpreter-interface = { path = "../interpreter-interface" }
avm-interface = { path = "../../../avm/interface" }
avm-server = { path = "../../../avm/server" }

View File

@ -22,35 +22,58 @@ use super::JValue;
use super::ParResult;
use super::Sender;
use super::TracePos;
use super::Value;
use super::ValueRef;
use crate::FoldLore;
use crate::FoldResult;
use crate::FoldSubTraceLore;
use crate::SubTraceDesc;
use air_interpreter_cid::value_to_json_cid;
use air_interpreter_data::CidTracker;
use std::rc::Rc;
pub fn scalar(result: JValue) -> ExecutedState {
let value = Value::Scalar(Rc::new(result));
let cid = value_to_json_cid(&result)
.unwrap_or_else(|e| panic!("{:?}: failed to compute CID of {:?}", e, result));
let value = ValueRef::Scalar(Rc::new(cid));
ExecutedState::Call(CallResult::Executed(value))
}
pub fn scalar_tracked(result: impl Into<JValue>, tracker: &mut CidTracker) -> ExecutedState {
let cid = tracker.record_value(Rc::new(result.into())).unwrap();
let value = ValueRef::Scalar(cid);
ExecutedState::Call(CallResult::Executed(value))
}
pub fn scalar_number(result: impl Into<serde_json::Number>) -> ExecutedState {
let result = JValue::Number(result.into());
let value = Rc::new(result);
ExecutedState::Call(CallResult::executed_scalar(value))
scalar(result)
}
pub fn stream_call_result(result: JValue, generation: u32) -> CallResult {
let cid = value_to_json_cid(&result)
.unwrap_or_else(|e| panic!("{:?}: failed to compute CID of {:?}", e, result));
CallResult::executed_stream(Rc::new(cid), generation)
}
pub fn stream(result: JValue, generation: u32) -> ExecutedState {
let call_result = CallResult::executed_stream(Rc::new(result), generation);
ExecutedState::Call(call_result)
ExecutedState::Call(stream_call_result(result, generation))
}
pub fn stream_tracked(
value: impl Into<JValue>,
generation: u32,
tracker: &mut CidTracker,
) -> ExecutedState {
let cid = tracker.record_value(Rc::new(value.into())).unwrap();
ExecutedState::Call(CallResult::executed_stream(cid, generation))
}
pub fn scalar_string(result: impl Into<String>) -> ExecutedState {
let result = JValue::String(result.into());
let value = Rc::new(result);
ExecutedState::Call(CallResult::executed_scalar(value))
scalar(result)
}
pub fn scalar_string_array(result: Vec<impl Into<String>>) -> ExecutedState {
@ -58,23 +81,21 @@ pub fn scalar_string_array(result: Vec<impl Into<String>>) -> ExecutedState {
.into_iter()
.map(|s| JValue::String(s.into()))
.collect::<Vec<_>>();
let value = Rc::new(JValue::Array(result));
let value = JValue::Array(result);
ExecutedState::Call(CallResult::executed_scalar(value))
scalar(value)
}
pub fn stream_string(result: impl Into<String>, generation: u32) -> ExecutedState {
let result = JValue::String(result.into());
let value = Rc::new(result);
ExecutedState::Call(CallResult::executed_stream(value, generation))
stream(result, generation)
}
pub fn stream_number(result: impl Into<serde_json::Number>, generation: u32) -> ExecutedState {
let result = JValue::Number(result.into());
let value = Rc::new(result);
ExecutedState::Call(CallResult::executed_stream(value, generation))
stream(result, generation)
}
pub fn stream_string_array(result: Vec<impl Into<String>>, generation: u32) -> ExecutedState {
@ -82,9 +103,9 @@ pub fn stream_string_array(result: Vec<impl Into<String>>, generation: u32) -> E
.into_iter()
.map(|s| JValue::String(s.into()))
.collect::<Vec<_>>();
let value = Rc::new(JValue::Array(result));
let value = JValue::Array(result);
ExecutedState::Call(CallResult::executed_stream(value, generation))
stream(value, generation)
}
pub fn request_sent_by(sender: impl Into<String>) -> ExecutedState {

View File

@ -90,11 +90,12 @@ pub fn data_from_result(result: &RawAVMOutcome) -> InterpreterData {
serde_json::from_slice(&result.data).expect("default serializer shouldn't fail")
}
pub fn raw_data_from_trace(trace: impl Into<ExecutionTrace>) -> Vec<u8> {
pub fn raw_data_from_trace(trace: impl Into<ExecutionTrace>, cid_tracker: CidTracker) -> Vec<u8> {
let data = InterpreterData::from_execution_result(
trace.into(),
<_>::default(),
<_>::default(),
cid_tracker,
0,
semver::Version::new(1, 1, 1),
);

View File

@ -14,7 +14,9 @@ name = "air_trace_handler"
path = "src/lib.rs"
[dependencies]
air-interpreter-cid = { version = "0.1.0", path = "../interpreter-cid" }
air-interpreter-data = { path = "../interpreter-data" }
air-interpreter-interface = { path = "../interpreter-interface" }
air-log-targets = { path = "../log-targets" }
air-parser = { path = "../air-parser" }

View File

@ -19,8 +19,6 @@ use super::MergeCtx;
use super::TraceSlider;
use crate::TracePos;
use air_interpreter_data::InterpreterData;
use bimap::BiHashMap;
/// Keeps all necessary data for merging.
@ -34,9 +32,9 @@ pub(crate) struct DataKeeper {
}
impl DataKeeper {
pub(crate) fn from_data(prev_data: InterpreterData, current_data: InterpreterData) -> Self {
let prev_ctx = MergeCtx::from_data(prev_data);
let current_ctx = MergeCtx::from_data(current_data);
pub(crate) fn from_trace(prev_trace: ExecutionTrace, current_trace: ExecutionTrace) -> Self {
let prev_ctx = MergeCtx::from_trace(prev_trace);
let current_ctx = MergeCtx::from_trace(current_trace);
Self {
prev_ctx,

View File

@ -20,8 +20,6 @@ use super::KeeperResult;
use super::TraceSlider;
use crate::TracePos;
use air_interpreter_data::InterpreterData;
/// Contains all necessary information about data.
#[derive(Debug, Default, PartialEq, Eq)]
pub struct MergeCtx {
@ -29,19 +27,12 @@ pub struct MergeCtx {
}
impl MergeCtx {
#[allow(dead_code)]
pub(crate) fn from_trace(trace: ExecutionTrace) -> Self {
let slider = TraceSlider::new(trace);
Self { slider }
}
pub(crate) fn from_data(data: InterpreterData) -> Self {
let slider = TraceSlider::new(data.trace);
Self { slider }
}
pub(crate) fn try_get_generation(&self, position: TracePos) -> KeeperResult<u32> {
use air_interpreter_data::*;
@ -54,7 +45,7 @@ impl MergeCtx {
})?;
match state {
ExecutedState::Call(CallResult::Executed(Value::Stream { generation, .. })) => Ok(*generation),
ExecutedState::Call(CallResult::Executed(ValueRef::Stream { generation, .. })) => Ok(*generation),
// such Aps are always preceded by Fold where corresponding stream could be used,
// so it's been already checked that res_generation is well-formed
// and accessing 0th element is safe here

View File

@ -17,8 +17,6 @@
use super::*;
use merger::*;
use air_interpreter_data::InterpreterData;
#[derive(Debug, Default)]
pub struct TraceHandler {
data_keeper: DataKeeper,
@ -26,8 +24,8 @@ pub struct TraceHandler {
}
impl TraceHandler {
pub fn from_data(prev_data: InterpreterData, current_data: InterpreterData) -> Self {
let data_keeper = DataKeeper::from_data(prev_data, current_data);
pub fn from_trace(prev_trace: ExecutionTrace, current_trace: ExecutionTrace) -> Self {
let data_keeper = DataKeeper::from_trace(prev_trace, current_trace);
Self {
data_keeper,
@ -69,7 +67,7 @@ impl TraceHandler {
match state {
ExecutedState::Ap(ap_result) => ap_result.res_generations = vec![generation],
ExecutedState::Call(CallResult::Executed(Value::Stream {
ExecutedState::Call(CallResult::Executed(ValueRef::Stream {
generation: call_generation,
..
})) => *call_generation = generation,

View File

@ -16,17 +16,15 @@
use super::*;
type JValue = serde_json::Value;
use air_interpreter_cid::CID;
use std::rc::Rc;
pub(super) fn merge_executed(prev_value: Value, current_value: Value) -> MergeResult<CallResult> {
pub(super) fn merge_executed(prev_value: ValueRef, current_value: ValueRef) -> MergeResult<CallResult> {
match (&prev_value, &current_value) {
(Value::Scalar(_), Value::Scalar(_)) => {
(ValueRef::Scalar(_), ValueRef::Scalar(_)) => {
are_scalars_equal(&prev_value, &current_value)?;
Ok(CallResult::Executed(prev_value))
}
(Value::Stream { value: pr, .. }, Value::Stream { value: cr, .. }) => {
(ValueRef::Stream { cid: pr, .. }, ValueRef::Stream { cid: cr, .. }) => {
are_streams_equal(pr, cr, &prev_value, &current_value)?;
Ok(CallResult::Executed(prev_value))
}
@ -34,7 +32,7 @@ pub(super) fn merge_executed(prev_value: Value, current_value: Value) -> MergeRe
}
}
fn are_scalars_equal(prev_value: &Value, current_value: &Value) -> MergeResult<()> {
fn are_scalars_equal(prev_value: &ValueRef, current_value: &ValueRef) -> MergeResult<()> {
if prev_value == current_value {
return Ok(());
}
@ -46,10 +44,10 @@ fn are_scalars_equal(prev_value: &Value, current_value: &Value) -> MergeResult<(
}
fn are_streams_equal(
prev_result_value: &Rc<JValue>,
current_result_value: &Rc<JValue>,
prev_value: &Value,
current_value: &Value,
prev_result_value: &CID,
current_result_value: &CID,
prev_value: &ValueRef,
current_value: &ValueRef,
) -> MergeResult<()> {
// values from streams could have different generations and it's ok
if prev_result_value == current_result_value {

View File

@ -19,7 +19,7 @@ use super::CallResult;
use super::ExecutedState;
use super::FoldResult;
use super::KeeperError;
use super::Value;
use super::ValueRef;
use air_interpreter_data::CanonResult;
use air_interpreter_data::TracePos;
@ -63,7 +63,10 @@ pub enum ApResultError {
#[derive(ThisError, Debug)]
pub enum CallResultError {
#[error("values in call results are not equal: {prev_value:?} != {current_value:?}")]
ValuesNotEqual { prev_value: Value, current_value: Value },
ValuesNotEqual {
prev_value: ValueRef,
current_value: ValueRef,
},
/// Errors occurred when previous and current call results are incompatible.
#[error("previous and current call results are incompatible: '{prev_call:?}' '{current_call:?}'")]
@ -120,7 +123,7 @@ impl MergeError {
// these impl methods allow construction of MergeError and are used to make code more clean
impl CallResultError {
pub(crate) fn not_equal_values(prev_value: Value, current_value: Value) -> MergeError {
pub(crate) fn not_equal_values(prev_value: ValueRef, current_value: ValueRef) -> MergeError {
let call_result_error = CallResultError::ValuesNotEqual {
prev_value,
current_value,