From eca52b7191ef1bc5c4573c62412dc735d830c023 Mon Sep 17 00:00:00 2001 From: Mike Voronov Date: Thu, 3 Aug 2023 21:07:57 +0300 Subject: [PATCH] feat(execution-engine)!: refactor streams [fixes VM-255] (#621) Refactored stream and stream generation a lot, it introduces the following changes: - no generation in data anymore, AquaVM relies on generation inside data to stay valid and places value accordingly to it - stream is internally divided into previous, current, and new values, before, it was one array for all of them - recursive streams cursors are refactored and rely on new generation values instead - the Generation enum was refactored and now contains the source of the generation --- Cargo.lock | 7 + air/Cargo.toml | 1 + .../boxed_value/iterable/canon_stream.rs | 1 + air/src/execution_step/boxed_value/mod.rs | 9 +- air/src/execution_step/boxed_value/scalar.rs | 41 +- air/src/execution_step/boxed_value/stream.rs | 400 ------------- .../execution_step/boxed_value/stream/mod.rs | 26 + .../boxed_value/stream/recursive_stream.rs | 254 ++++++++ .../boxed_value/stream/stream_definition.rs | 554 ++++++++++++++++++ .../boxed_value/stream/values_matrix.rs | 169 ++++++ .../execution_step/boxed_value/stream_map.rs | 279 ++++----- .../errors/uncatchable_errors.rs | 4 +- .../execution_context/context.rs | 11 +- .../stream_maps_variables.rs | 96 +-- .../execution_context/streams_variables.rs | 133 +---- .../stream_value_descriptor.rs | 11 +- .../streams_variables/utils.rs | 49 -- air/src/execution_step/instructions/ap.rs | 24 +- .../instructions/ap/apply_to_arguments.rs | 5 +- .../execution_step/instructions/ap/utils.rs | 26 +- air/src/execution_step/instructions/ap_map.rs | 20 +- .../instructions/call/call_result_setter.rs | 49 +- .../instructions/call/prev_result_handler.rs | 8 +- air/src/execution_step/instructions/canon.rs | 4 +- .../execution_step/instructions/fold/utils.rs | 25 +- .../instructions/fold_scalar.rs | 1 + .../instructions/fold_stream.rs | 2 - .../instructions/fold_stream/stream_cursor.rs | 42 -- .../fold_stream/stream_execute_helpers.rs | 42 +- air/src/execution_step/instructions/new.rs | 29 +- air/src/farewell_step/errors.rs | 13 +- air/src/farewell_step/outcome.rs | 49 +- air/src/preparation_step/preparation.rs | 4 - .../features/data_merging/data_merge.rs | 11 - .../features/streams/compactification.rs | 164 ++++++ air/tests/test_module/features/streams/mod.rs | 1 + .../features/streams/recursive_streams.rs | 14 +- air/tests/test_module/instructions/new.rs | 38 -- air/tests/test_module/issues/issue_173.rs | 10 - air/tests/test_module/issues/issue_642.rs | 1 - air/tests/test_module/issues/issue_644.rs | 1 - .../uncatchable_trace_related.rs | 2 - .../src/executed_state/impls.rs | 12 +- .../interpreter-data/src/generation_idx.rs | 5 + .../interpreter-data/src/interpreter_data.rs | 20 - crates/air-lib/interpreter-data/src/lib.rs | 2 - .../src/stream_generations.rs | 36 -- crates/air-lib/test-utils/src/lib.rs | 4 - crates/air-lib/trace-handler/src/errors.rs | 8 +- crates/air-lib/trace-handler/src/handler.rs | 6 +- crates/air-lib/trace-handler/src/lib.rs | 2 +- 51 files changed, 1536 insertions(+), 1189 deletions(-) delete mode 100644 air/src/execution_step/boxed_value/stream.rs create mode 100644 air/src/execution_step/boxed_value/stream/mod.rs create mode 100644 air/src/execution_step/boxed_value/stream/recursive_stream.rs create mode 100644 air/src/execution_step/boxed_value/stream/stream_definition.rs create mode 100644 air/src/execution_step/boxed_value/stream/values_matrix.rs delete mode 100644 air/src/execution_step/execution_context/streams_variables/utils.rs delete mode 100644 air/src/execution_step/instructions/fold_stream/stream_cursor.rs create mode 100644 air/tests/test_module/features/streams/compactification.rs delete mode 100644 crates/air-lib/interpreter-data/src/stream_generations.rs diff --git a/Cargo.lock b/Cargo.lock index 201691b0..8fc760be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -333,6 +333,7 @@ dependencies = [ "strum_macros", "thiserror", "tracing", + "typed-index-collections", ] [[package]] @@ -3639,6 +3640,12 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "typed-index-collections" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "183496e014253d15abbe6235677b1392dba2d40524c88938991226baa38ac7c4" + [[package]] name = "typenum" version = "1.16.0" diff --git a/air/Cargo.toml b/air/Cargo.toml index f03014ab..cc8afdfd 100644 --- a/air/Cargo.toml +++ b/air/Cargo.toml @@ -36,6 +36,7 @@ serde_json = "1.0.95" concat-idents = "1.1.4" maplit = "1.0.2" non-empty-vec = "0.2.3" +typed-index-collections = "3.1.0" log = "0.4.17" once_cell = "1.17.1" thiserror = "1.0.40" diff --git a/air/src/execution_step/boxed_value/iterable/canon_stream.rs b/air/src/execution_step/boxed_value/iterable/canon_stream.rs index 2ff4805c..0d5efd2a 100644 --- a/air/src/execution_step/boxed_value/iterable/canon_stream.rs +++ b/air/src/execution_step/boxed_value/iterable/canon_stream.rs @@ -17,6 +17,7 @@ use super::Iterable; use super::IterableItem; use crate::execution_step::boxed_value::CanonStream; +use crate::execution_step::boxed_value::TracePosOperate; use crate::foldable_next; use crate::foldable_prev; diff --git a/air/src/execution_step/boxed_value/mod.rs b/air/src/execution_step/boxed_value/mod.rs index 2d32388e..8d7361e5 100644 --- a/air/src/execution_step/boxed_value/mod.rs +++ b/air/src/execution_step/boxed_value/mod.rs @@ -22,6 +22,8 @@ mod stream; mod stream_map; mod utils; +pub type Stream = stream::Stream; + pub(crate) use canon_stream::*; pub(crate) use iterable::*; pub(crate) use jvaluable::*; @@ -29,10 +31,15 @@ pub(crate) use scalar::CanonResultAggregate; pub(crate) use scalar::LiteralAggregate; pub(crate) use scalar::ScalarRef; pub(crate) use scalar::ServiceResultAggregate; +pub(crate) use scalar::TracePosOperate; pub(crate) use scalar::ValueAggregate; + pub(crate) use stream::Generation; -pub(crate) use stream::Stream; +pub(crate) use stream::IterableValue; +pub(crate) use stream::RecursiveCursorState; +pub(crate) use stream::RecursiveStreamCursor; pub(crate) use stream_map::StreamMap; + pub(crate) use utils::populate_tetraplet_with_lambda; use super::ExecutionResult; diff --git a/air/src/execution_step/boxed_value/scalar.rs b/air/src/execution_step/boxed_value/scalar.rs index 6ce44c8a..4915d95e 100644 --- a/air/src/execution_step/boxed_value/scalar.rs +++ b/air/src/execution_step/boxed_value/scalar.rs @@ -55,6 +55,7 @@ pub enum ValueAggregate { provenance_cid: Rc>, }, } + pub(crate) enum ScalarRef<'i> { Value(&'i ValueAggregate), IterableValue(&'i FoldState<'i>), @@ -176,7 +177,29 @@ impl ValueAggregate { } } - pub fn get_trace_pos(&self) -> TracePos { + pub fn get_provenance(&self) -> Provenance { + match self { + ValueAggregate::Literal(_) => Provenance::Literal, + ValueAggregate::ServiceResult { + result: _, + provenance_cid: cid, + } => Provenance::ServiceResult { cid: cid.clone() }, + ValueAggregate::Canon { + result: _, + provenance_cid: cid, + } => Provenance::Canon { cid: cid.clone() }, + } + } +} + +pub trait TracePosOperate { + fn get_trace_pos(&self) -> TracePos; + + fn set_trace_pos(&mut self, pos: TracePos); +} + +impl TracePosOperate for ValueAggregate { + fn get_trace_pos(&self) -> TracePos { match self { ValueAggregate::Literal(literal) => literal.trace_pos, ValueAggregate::ServiceResult { @@ -190,7 +213,7 @@ impl ValueAggregate { } } - pub fn set_trace_pos(&mut self, trace_pos: TracePos) { + fn set_trace_pos(&mut self, trace_pos: TracePos) { let trace_pos_ref = match self { ValueAggregate::Literal(literal) => &mut literal.trace_pos, ValueAggregate::ServiceResult { @@ -204,20 +227,6 @@ impl ValueAggregate { }; *trace_pos_ref = trace_pos; } - - pub fn get_provenance(&self) -> Provenance { - match self { - ValueAggregate::Literal(_) => Provenance::Literal, - ValueAggregate::ServiceResult { - result: _, - provenance_cid: cid, - } => Provenance::ServiceResult { cid: cid.clone() }, - ValueAggregate::Canon { - result: _, - provenance_cid: cid, - } => Provenance::Canon { cid: cid.clone() }, - } - } } use std::fmt; diff --git a/air/src/execution_step/boxed_value/stream.rs b/air/src/execution_step/boxed_value/stream.rs deleted file mode 100644 index 02ea1699..00000000 --- a/air/src/execution_step/boxed_value/stream.rs +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use super::ExecutionResult; -use super::ValueAggregate; -use crate::ExecutionError; -use crate::UncatchableError; - -use air_interpreter_data::GenerationIdx; -use air_trace_handler::merger::ValueSource; -use air_trace_handler::TraceHandler; - -/// Streams are CRDT-like append only data structures. They are guaranteed to have the same order -/// of values on each peer. -#[derive(Debug, Default, Clone)] -pub struct Stream { - /// The first Vec represents generations, the second values in a generation. Generation is a set - /// of values that interpreter obtained from one particle. It means that number of generation on - /// a peer is equal to number of the interpreter runs in context of one particle. And each set of - /// obtained values from a current_data that were not present in prev_data becomes a new generation. - values: Vec>, - - /// Count of values from previous data. - previous_gens_count: usize, -} - -impl Stream { - pub(crate) fn from_generations_count(previous_count: GenerationIdx, current_count: GenerationIdx) -> Self { - let last_generation_count = GenerationIdx::from(1); - // TODO: bubble up an overflow error instead of expect - let overall_count = previous_count - .checked_add(current_count) - .and_then(|value| value.checked_add(last_generation_count)) - .expect("it shouldn't overflow"); - Self { - values: vec![vec![]; overall_count.into()], - previous_gens_count: previous_count.into(), - } - } - - // streams created with this ctor assumed to have only one generation, - // for streams that have values in - pub(crate) fn from_value(value: ValueAggregate) -> Self { - Self { - values: vec![vec![value]], - previous_gens_count: 0, - } - } - - // if generation is None, value would be added to the last generation, otherwise it would - // be added to given generation - pub(crate) fn add_value( - &mut self, - value: ValueAggregate, - generation: Generation, - source: ValueSource, - ) -> ExecutionResult { - let generation_number = match (generation, source) { - (Generation::Last, _) => self.values.len() - 1, - (Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen.into(), - (Generation::Nth(current_gen), ValueSource::CurrentData) => { - self.previous_gens_count + usize::from(current_gen) - } - }; - - if generation_number >= self.values.len() { - return Err(UncatchableError::StreamDontHaveSuchGeneration { - stream: self.clone(), - generation, - } - .into()); - } - - self.values[generation_number].push(value); - Ok(generation_number.into()) - } - - // TODO: remove this function - pub(crate) fn generations_count(&self) -> usize { - // the last generation could be empty due to the logic of from_generations_count ctor - if self.values.last().unwrap().is_empty() { - self.values.len() - 1 - } else { - self.values.len() - } - } - - pub(crate) fn last_non_empty_generation(&self) -> GenerationIdx { - self.values - .iter() - .rposition(|generation| !generation.is_empty()) - // it's safe to add + 1 here, because this function is called when - // there is a new state was added with add_new_generation_if_non_empty - .map(|non_empty_gens| non_empty_gens + 1) - .unwrap_or_else(|| self.generations_count()) - .into() - } - - /// Add a new empty generation if the latest isn't empty. - pub(crate) fn add_new_generation_if_non_empty(&mut self) -> bool { - let should_add_generation = match self.values.last() { - Some(last) => !last.is_empty(), - None => true, - }; - - if should_add_generation { - self.values.push(vec![]); - } - should_add_generation - } - - /// Remove a last generation if it's empty. - pub(crate) fn remove_last_generation_if_empty(&mut self) -> bool { - let should_remove_generation = match self.values.last() { - Some(last) => last.is_empty(), - None => false, - }; - - if should_remove_generation { - self.values.pop(); - } - - should_remove_generation - } - - pub(crate) fn generation_elements_count(&self, generation: Generation) -> Option { - match generation { - Generation::Nth(generation) if generation > self.generations_count() => None, - Generation::Nth(generation) => { - let elements_count = generation.into(); - Some(self.values.iter().take(elements_count).map(|v| v.len()).sum()) - } - Generation::Last => Some(self.values.iter().map(|v| v.len()).sum()), - } - } - - pub(crate) fn is_empty(&self) -> bool { - self.values.iter().all(|v| v.is_empty()) - } - - pub(crate) fn iter(&self, generation: Generation) -> Option> { - let iter: Box> = match generation { - Generation::Nth(generation) if generation >= self.generations_count() => return None, - Generation::Nth(generation) => { - Box::new(self.values.iter().take(generation.next().into()).flat_map(|v| v.iter())) - } - Generation::Last => Box::new(self.values.iter().flat_map(|v| v.iter())), - }; - // unwrap is safe here, because generation's been already checked - let len = self.generation_elements_count(generation).unwrap(); - - let iter = StreamIter { iter, len }; - - Some(iter) - } - - pub(crate) fn slice_iter(&self, start: Generation, end: Generation) -> Option> { - if self.is_empty() { - return None; - } - - let generations_count = self.generations_count() - 1; - let (start, end) = match (start, end) { - (Generation::Nth(start), Generation::Nth(end)) => (usize::from(start), usize::from(end)), - (Generation::Nth(start), Generation::Last) => (start.into(), generations_count), - (Generation::Last, Generation::Nth(end)) => (generations_count, end.into()), - (Generation::Last, Generation::Last) => (generations_count, generations_count), - }; - - if start > end || end > generations_count { - return None; - } - - let len = (end - start) + 1; - let iter: Box> = - Box::new(self.values.iter().skip(start).take(len).map(|v| v.as_slice())); - let iter = StreamSliceIter { iter, len }; - - Some(iter) - } - - /// Removes empty generations updating data and returns final generation count. - pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult { - self.remove_empty_generations(); - - for (generation, values) in self.values.iter().enumerate() { - for value in values.iter() { - trace_ctx - .update_generation(value.get_trace_pos(), generation.into()) - .map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompatificationError(e)))?; - } - } - let last_generation_idx = self.values.len(); - Ok(last_generation_idx.into()) - } - - /// Removes empty generations from current values. - fn remove_empty_generations(&mut self) { - self.values.retain(|values| !values.is_empty()); - } -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum Generation { - Last, - Nth(GenerationIdx), -} - -impl Generation { - pub fn last() -> Self { - Self::Last - } - - #[cfg(test)] - pub fn nth(generation_id: u32) -> Self { - use std::convert::TryFrom; - - let generation_id = usize::try_from(generation_id).unwrap(); - let generation_idx = GenerationIdx::from(generation_id); - Self::Nth(generation_idx) - } -} - -pub(crate) struct StreamIter<'result> { - iter: Box + 'result>, - len: usize, -} - -impl<'result> Iterator for StreamIter<'result> { - type Item = &'result ValueAggregate; - - fn next(&mut self) -> Option { - if self.len > 0 { - self.len -= 1; - } - self.iter.next() - } - - fn size_hint(&self) -> (usize, Option) { - (self.len, Some(self.len)) - } -} - -impl<'result> ExactSizeIterator for StreamIter<'result> {} - -pub(crate) struct StreamSliceIter<'slice> { - iter: Box + 'slice>, - pub len: usize, -} - -impl<'slice> Iterator for StreamSliceIter<'slice> { - type Item = &'slice [ValueAggregate]; - - fn next(&mut self) -> Option { - if self.len > 0 { - self.len -= 1; - } - self.iter.next() - } - - fn size_hint(&self) -> (usize, Option) { - (self.len, Some(self.len)) - } -} - -use std::fmt; - -impl fmt::Display for Stream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if self.values.is_empty() { - return write!(f, "[]"); - } - - writeln!(f, "[")?; - for (id, generation) in self.values.iter().enumerate() { - write!(f, " -- {id}: ")?; - for value in generation.iter() { - write!(f, "{value:?}, ")?; - } - writeln!(f)?; - } - - write!(f, "]") - } -} - -impl fmt::Display for Generation { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Generation::Nth(generation) => write!(f, "{}", generation), - Generation::Last => write!(f, "Last"), - } - } -} - -#[cfg(test)] -mod test { - use super::Generation; - use super::Stream; - use super::ValueAggregate; - use super::ValueSource; - use crate::execution_step::ServiceResultAggregate; - - use air_interpreter_cid::CID; - use serde_json::json; - - use std::rc::Rc; - - #[test] - fn test_slice_iter() { - let value_1 = ValueAggregate::from_service_result( - ServiceResultAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into()), - CID::new("some fake cid").into(), - ); - let value_2 = ValueAggregate::from_service_result( - ServiceResultAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into()), - CID::new("some fake cid").into(), - ); - let mut stream = Stream::from_generations_count(2.into(), 0.into()); - - stream - .add_value(value_1, Generation::nth(0), ValueSource::PreviousData) - .unwrap(); - stream - .add_value(value_2, Generation::nth(1), ValueSource::PreviousData) - .unwrap(); - - let slice = stream.slice_iter(Generation::nth(0), Generation::nth(1)).unwrap(); - assert_eq!(slice.len, 2); - - let slice = stream.slice_iter(Generation::nth(0), Generation::Last).unwrap(); - assert_eq!(slice.len, 2); - - let slice = stream.slice_iter(Generation::nth(0), Generation::nth(0)).unwrap(); - assert_eq!(slice.len, 1); - - let slice = stream.slice_iter(Generation::Last, Generation::Last).unwrap(); - assert_eq!(slice.len, 1); - } - - #[test] - fn test_slice_on_empty_stream() { - let stream = Stream::from_generations_count(2.into(), 0.into()); - - let slice = stream.slice_iter(Generation::nth(0), Generation::nth(1)); - assert!(slice.is_none()); - - let slice = stream.slice_iter(Generation::nth(0), Generation::Last); - assert!(slice.is_none()); - - let slice = stream.slice_iter(Generation::nth(0), Generation::nth(0)); - assert!(slice.is_none()); - - let slice = stream.slice_iter(Generation::Last, Generation::Last); - assert!(slice.is_none()); - } - - #[test] - fn generation_from_current_data() { - let value_1 = ValueAggregate::from_service_result( - ServiceResultAggregate::new(Rc::new(json!("value_1")), <_>::default(), 1.into()), - CID::new("some fake cid").into(), - ); - let value_2 = ValueAggregate::from_service_result( - ServiceResultAggregate::new(Rc::new(json!("value_2")), <_>::default(), 2.into()), - CID::new("some fake cid").into(), - ); - let mut stream = Stream::from_generations_count(5.into(), 5.into()); - - stream - .add_value(value_1.clone(), Generation::nth(2), ValueSource::CurrentData) - .unwrap(); - stream - .add_value(value_2.clone(), Generation::nth(4), ValueSource::PreviousData) - .unwrap(); - - let generations_count = stream.generations_count(); - assert_eq!(generations_count, 10); - - let mut iter = stream.iter(Generation::Last).unwrap(); - let stream_value_1 = iter.next().unwrap(); - let stream_value_2 = iter.next().unwrap(); - - assert_eq!(stream_value_1, &value_2); - assert_eq!(stream_value_2, &value_1); - } -} diff --git a/air/src/execution_step/boxed_value/stream/mod.rs b/air/src/execution_step/boxed_value/stream/mod.rs new file mode 100644 index 00000000..301d423a --- /dev/null +++ b/air/src/execution_step/boxed_value/stream/mod.rs @@ -0,0 +1,26 @@ +/* + * Copyright 2023 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod recursive_stream; +mod stream_definition; +pub(self) mod values_matrix; + +pub(crate) use recursive_stream::IterableValue; +pub(crate) use recursive_stream::RecursiveCursorState; +pub(crate) use recursive_stream::RecursiveStreamCursor; +pub(crate) use recursive_stream::StreamCursor; +pub(crate) use stream_definition::Generation; +pub(crate) use stream_definition::Stream; diff --git a/air/src/execution_step/boxed_value/stream/recursive_stream.rs b/air/src/execution_step/boxed_value/stream/recursive_stream.rs new file mode 100644 index 00000000..2689f816 --- /dev/null +++ b/air/src/execution_step/boxed_value/stream/recursive_stream.rs @@ -0,0 +1,254 @@ +/* + * Copyright 2023 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::Stream; +use crate::execution_step::boxed_value::Iterable; +use crate::execution_step::boxed_value::IterableItem; +use crate::execution_step::boxed_value::IterableVecResolvedCall; +use crate::execution_step::ValueAggregate; + +use air_interpreter_data::GenerationIdx; + +pub(crate) type IterableValue = Box Iterable<'ctx, Item = IterableItem<'ctx>>>; + +/// Tracks a state of a stream by storing last generation of every value type. +#[derive(Debug, Clone, Copy)] +pub struct StreamCursor { + pub previous_start_idx: GenerationIdx, + pub current_start_idx: GenerationIdx, + pub new_start_idx: GenerationIdx, +} + +/// Intended to generate values for recursive stream handling. +/// +/// It could be considered as a simple state machine which should be started with +/// fold_started and then continued with next_iteration: +/// met_fold_start - met_iteration_end - ... met_iteration_end - Exhausted +/// | | +/// Exhausted Exhausted +#[derive(Debug, Clone, Copy)] +pub(crate) struct RecursiveStreamCursor { + cursor: StreamCursor, +} + +pub(crate) enum RecursiveCursorState { + Continue(Vec), + Exhausted, +} + +impl RecursiveStreamCursor { + pub fn new() -> Self { + Self { + cursor: StreamCursor::empty(), + } + } + + pub fn met_fold_start(&mut self, stream: &mut Stream) -> RecursiveCursorState { + let state = self.cursor_state(stream); + self.cursor = stream.cursor(); + + if state.should_continue() { + // add a new generation to made all consequence "new" (meaning that they are just executed on this peer) + // write operation to this stream to write to this new generation + stream.new_values().add_new_empty_generation(); + } + + state + } + + pub fn met_iteration_end(&mut self, stream: &mut Stream) -> RecursiveCursorState { + let state = self.cursor_state(stream); + + // remove last generation if it empty to track cursor state + remove_last_generation_if_empty(stream); + self.cursor = stream.cursor(); + // add new last generation to store new values into this generation + stream.new_values().add_new_empty_generation(); + + state + } + + fn cursor_state(&self, stream: &Stream) -> RecursiveCursorState { + let slice_iter = stream.slice_iter(self.cursor); + let iterable = Self::slice_iter_to_iterable(slice_iter); + + RecursiveCursorState::from_iterable_values(iterable) + } + + fn slice_iter_to_iterable<'value>(iter: impl Iterator) -> Vec { + iter.map(|iterable| { + let foldable = IterableVecResolvedCall::init(iterable.to_vec()); + let foldable: IterableValue = Box::new(foldable); + foldable + }) + .collect::>() + } +} + +fn remove_last_generation_if_empty(stream: &mut Stream) { + if stream.new_values().last_generation_is_empty() { + stream.new_values().remove_last_generation(); + } +} + +impl StreamCursor { + pub(crate) fn empty() -> Self { + Self { + previous_start_idx: GenerationIdx::from(0), + current_start_idx: GenerationIdx::from(0), + new_start_idx: GenerationIdx::from(0), + } + } + + pub(crate) fn new( + previous_start_idx: GenerationIdx, + current_start_idx: GenerationIdx, + new_start_idx: GenerationIdx, + ) -> Self { + Self { + previous_start_idx, + current_start_idx, + new_start_idx, + } + } +} + +impl RecursiveCursorState { + pub(crate) fn from_iterable_values(values: Vec) -> Self { + if values.is_empty() { + Self::Exhausted + } else { + Self::Continue(values) + } + } + + pub(crate) fn should_continue(&self) -> bool { + matches!(self, Self::Continue(_)) + } +} + +#[cfg(test)] +mod test { + use super::IterableValue; + use super::RecursiveCursorState; + use super::RecursiveStreamCursor; + use super::Stream; + use super::ValueAggregate; + use crate::execution_step::Generation; + use crate::execution_step::ServiceResultAggregate; + use crate::JValue; + + use air_interpreter_cid::CID; + use serde_json::json; + + use std::rc::Rc; + + fn create_value(value: JValue) -> ValueAggregate { + ValueAggregate::from_service_result( + ServiceResultAggregate::new(Rc::new(value), <_>::default(), 0.into()), + CID::new("some fake cid").into(), + ) + } + + fn iterables_unwrap(cursor_state: RecursiveCursorState) -> Vec { + match cursor_state { + RecursiveCursorState::Continue(iterables) => iterables, + RecursiveCursorState::Exhausted => panic!("cursor is exhausted"), + } + } + + #[test] + fn fold_started_empty_if_no_values() { + let mut stream = Stream::new(); + let mut recursive_stream = RecursiveStreamCursor::new(); + let cursor_state = recursive_stream.met_fold_start(&mut stream); + + assert!(!cursor_state.should_continue()) + } + + #[test] + fn next_iteration_empty_if_no_values() { + let mut stream = Stream::new(); + let mut recursive_stream = RecursiveStreamCursor::new(); + let cursor_state = recursive_stream.met_iteration_end(&mut stream); + + assert!(!cursor_state.should_continue()) + } + + #[test] + fn next_iteration_empty_if_no_values_added() { + let mut stream = Stream::new(); + let mut recursive_stream = RecursiveStreamCursor::new(); + + let value = create_value(json!("1")); + stream.add_value(value, Generation::Current(0.into())); + + let cursor_state = recursive_stream.met_fold_start(&mut stream); + let iterables = iterables_unwrap(cursor_state); + assert_eq!(iterables.len(), 1); + + let cursor_state = recursive_stream.met_iteration_end(&mut stream); + assert!(!cursor_state.should_continue()); + } + + #[test] + fn one_recursive_iteration() { + let mut stream = Stream::new(); + let mut recursive_stream = RecursiveStreamCursor::new(); + + let value = create_value(json!("1")); + stream.add_value(value.clone(), Generation::Current(0.into())); + + let cursor_state = recursive_stream.met_fold_start(&mut stream); + let iterables = iterables_unwrap(cursor_state); + assert_eq!(iterables.len(), 1); + + stream.add_value(value.clone(), Generation::New); + stream.add_value(value, Generation::New); + + let cursor_state = recursive_stream.met_iteration_end(&mut stream); + let iterables = iterables_unwrap(cursor_state); + assert_eq!(iterables.len(), 1); + + let cursor_state = recursive_stream.met_iteration_end(&mut stream); + assert!(!cursor_state.should_continue()); + } + + #[test] + fn add_value_into_prev_and_current() { + let mut stream = Stream::new(); + let mut recursive_stream = RecursiveStreamCursor::new(); + + let value = create_value(json!("1")); + stream.add_value(value.clone(), Generation::Current(0.into())); + + let cursor_state = recursive_stream.met_fold_start(&mut stream); + assert!(cursor_state.should_continue()); + + stream.add_value(value.clone(), Generation::Previous(0.into())); + + let cursor_state = recursive_stream.met_iteration_end(&mut stream); + assert!(cursor_state.should_continue()); + + stream.add_value(value, Generation::Current(1.into())); + + let cursor_state = recursive_stream.met_iteration_end(&mut stream); + assert!(cursor_state.should_continue()); + + let cursor_state = recursive_stream.met_iteration_end(&mut stream); + assert!(!cursor_state.should_continue()); + } +} diff --git a/air/src/execution_step/boxed_value/stream/stream_definition.rs b/air/src/execution_step/boxed_value/stream/stream_definition.rs new file mode 100644 index 00000000..e82a3141 --- /dev/null +++ b/air/src/execution_step/boxed_value/stream/stream_definition.rs @@ -0,0 +1,554 @@ +/* + * Copyright 2023 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::values_matrix::NewValuesMatrix; +use super::values_matrix::ValuesMatrix; +use super::StreamCursor; +use crate::execution_step::boxed_value::TracePosOperate; +use crate::execution_step::ExecutionResult; + +use air_interpreter_data::GenerationIdx; +use air_trace_handler::TraceHandler; + +/// Streams are CRDT-like append only data structures. They are guaranteed to have locally +/// the same order of values on each peer. +#[derive(Debug, Clone)] +pub struct Stream { + /// Values from previous data. + previous_values: ValuesMatrix, + + /// Values from current data. + current_values: ValuesMatrix, + + /// Values from call results or aps executed on a current peer. + new_values: NewValuesMatrix, +} + +impl<'value, T: 'value> Stream { + pub(crate) fn new() -> Self { + Self { + previous_values: ValuesMatrix::new(), + current_values: ValuesMatrix::new(), + new_values: NewValuesMatrix::new(), + } + } + + pub(crate) fn iter(&self) -> impl Iterator { + self.previous_values + .iter() + .chain(self.current_values.iter()) + .chain(self.new_values.iter()) + } + + // Contract: all slices will be non-empty + pub(crate) fn slice_iter(&self, cursor: StreamCursor) -> impl Iterator { + self.previous_values + .slice_iter(cursor.previous_start_idx) + .chain(self.current_values.slice_iter(cursor.current_start_idx)) + .chain(self.new_values.slice_iter(cursor.new_start_idx)) + } + + pub(crate) fn cursor(&self) -> StreamCursor { + StreamCursor::new( + self.previous_values.generations_count(), + self.current_values.generations_count(), + self.new_values.generations_count(), + ) + } + + pub(super) fn new_values(&mut self) -> &mut NewValuesMatrix { + &mut self.new_values + } +} + +impl<'value, T: 'value + Clone + fmt::Display> Stream { + pub(crate) fn add_value(&mut self, value: T, generation: Generation) { + match generation { + Generation::Previous(previous_gen) => self.previous_values.add_value_to_generation(value, previous_gen), + Generation::Current(current_gen) => self.current_values.add_value_to_generation(value, current_gen), + Generation::New => self.new_values.add_to_last_generation(value), + } + } +} + +impl<'value, T: 'value + TracePosOperate + fmt::Display> Stream { + /// Removes empty generations updating data. + pub(crate) fn compactify(&mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { + self.previous_values.remove_empty_generations(); + self.current_values.remove_empty_generations(); + self.new_values.remove_empty_generations(); + + let start_idx = 0.into(); + Self::update_generations(self.previous_values.slice_iter(0.into()), start_idx, trace_ctx)?; + + let start_idx = self.previous_values.generations_count(); + Self::update_generations(self.current_values.slice_iter(0.into()), start_idx, trace_ctx)?; + + let start_idx = start_idx.checked_add(self.current_values.generations_count()).unwrap(); + Self::update_generations(self.new_values.slice_iter(0.into()), start_idx, trace_ctx)?; + + Ok(()) + } + + fn update_generations( + values: impl Iterator, + start_idx: GenerationIdx, + trace_ctx: &mut TraceHandler, + ) -> ExecutionResult<()> { + use crate::execution_step::errors::UncatchableError; + use crate::execution_step::ExecutionError; + + for (position, values) in values.enumerate() { + // TODO: replace it with error + let generation = start_idx.checked_add(position.into()).unwrap(); + for value in values.iter() { + trace_ctx + .update_generation(value.get_trace_pos(), generation) + .map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompactificationError(e)))?; + } + } + + Ok(()) + } +} + +impl Default for Stream { + fn default() -> Self { + Self { + previous_values: <_>::default(), + current_values: <_>::default(), + new_values: <_>::default(), + } + } +} + +use air_trace_handler::merger::MetApResult; +use air_trace_handler::merger::ValueSource; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum Generation { + Previous(GenerationIdx), + Current(GenerationIdx), + New, +} + +impl Generation { + #[cfg(test)] + pub fn previous(generation_id: u32) -> Self { + use std::convert::TryFrom; + + let generation_id = usize::try_from(generation_id).unwrap(); + let generation_idx = GenerationIdx::from(generation_id); + Self::Previous(generation_idx) + } + + #[cfg(test)] + pub fn current(generation_id: u32) -> Self { + use std::convert::TryFrom; + + let generation_id = usize::try_from(generation_id).unwrap(); + let generation_idx = GenerationIdx::from(generation_id); + Self::Current(generation_idx) + } + + pub fn from_met_result(result: &MetApResult) -> Self { + Self::from_data(result.value_source, result.generation) + } + + pub fn from_data(data_type: ValueSource, generation: GenerationIdx) -> Self { + match data_type { + ValueSource::PreviousData => Generation::Previous(generation), + ValueSource::CurrentData => Generation::Current(generation), + } + } + + pub fn new() -> Self { + Self::New + } +} + +use std::fmt; + +impl fmt::Display for Stream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "previous values:\n{}", self.previous_values)?; + writeln!(f, "current values:\n{}", self.current_values)?; + writeln!(f, "new values:\n{}", self.new_values) + } +} + +impl fmt::Display for Generation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Generation::Previous(generation) => write!(f, "previous({})", generation), + Generation::Current(generation) => write!(f, "current({})", generation), + Generation::New => write!(f, "new"), + } + } +} + +#[cfg(test)] +mod test { + use super::Generation; + use super::StreamCursor; + use super::TraceHandler; + use crate::execution_step::ServiceResultAggregate; + use crate::execution_step::ValueAggregate; + use crate::ExecutionError; + use crate::JValue; + use crate::UncatchableError; + + use air_interpreter_cid::CID; + use air_interpreter_data::ApResult; + use air_interpreter_data::CanonResult; + use air_interpreter_data::ExecutedState; + use air_interpreter_data::ExecutionTrace; + use air_interpreter_data::TracePos; + use air_trace_handler::GenerationCompactificationError; + use serde_json::json; + + use std::rc::Rc; + + type Stream = super::Stream; + + fn create_value(value: JValue) -> ValueAggregate { + ValueAggregate::from_service_result( + ServiceResultAggregate::new(Rc::new(value), <_>::default(), 1.into()), + CID::new("some fake cid").into(), + ) + } + + fn create_value_with_pos(value: JValue, trace_pos: TracePos) -> ValueAggregate { + ValueAggregate::from_service_result( + ServiceResultAggregate::new(Rc::new(value), <_>::default(), trace_pos), + CID::new("some fake cid").into(), + ) + } + + #[test] + fn test_iter() { + let value_1 = create_value(json!("value_1")); + let value_2 = create_value(json!("value_2")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::previous(0)); + stream.add_value(value_2.clone(), Generation::previous(1)); + + let mut iter = stream.iter(); + println!(" after getting iter"); + assert_eq!(iter.next(), Some(&value_1)); + assert_eq!(iter.next(), Some(&value_2)); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_slice_iter_prev() { + let value_1 = create_value(json!("value_1")); + let value_2 = create_value(json!("value_2")); + let value_3 = create_value(json!("value_3")); + let value_4 = create_value(json!("value_4")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::previous(0)); + stream.add_value(value_2.clone(), Generation::previous(0)); + stream.add_value(value_3.clone(), Generation::previous(0)); + stream.add_value(value_4.clone(), Generation::previous(0)); + + let mut slice_iter = stream.slice_iter(StreamCursor::empty()); + assert_eq!( + slice_iter.next(), + Some(vec![value_1, value_2, value_3, value_4].as_slice()) + ); + assert_eq!(slice_iter.next(), None); + } + + #[test] + fn test_slice_iter_current() { + let value_1 = create_value(json!("value_1")); + let value_2 = create_value(json!("value_2")); + let value_3 = create_value(json!("value_3")); + let value_4 = create_value(json!("value_4")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::current(0)); + stream.add_value(value_2.clone(), Generation::current(0)); + stream.add_value(value_3.clone(), Generation::current(0)); + stream.add_value(value_4.clone(), Generation::current(0)); + + let mut slice_iter = stream.slice_iter(StreamCursor::empty()); + assert_eq!( + slice_iter.next(), + Some(vec![value_1, value_2, value_3, value_4].as_slice()) + ); + assert_eq!(slice_iter.next(), None); + } + + #[test] + fn test_slice_iter_new() { + let value_1 = create_value(json!("value_1")); + let value_2 = create_value(json!("value_2")); + let value_3 = create_value(json!("value_3")); + let value_4 = create_value(json!("value_4")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::New); + stream.add_value(value_2.clone(), Generation::New); + stream.add_value(value_3.clone(), Generation::New); + stream.add_value(value_4.clone(), Generation::New); + + let mut slice_iter = stream.slice_iter(StreamCursor::empty()); + assert_eq!( + slice_iter.next(), + Some(vec![value_1, value_2, value_3, value_4].as_slice()) + ); + assert_eq!(slice_iter.next(), None); + } + + #[test] + fn test_iter_on_empty_stream() { + let stream = Stream::new(); + + let mut slice = stream.iter(); + assert_eq!(slice.next(), None); + } + + #[test] + fn test_slice_on_empty_stream() { + let stream = Stream::new(); + + let mut slice = stream.slice_iter(StreamCursor::empty()); + assert_eq!(slice.next(), None); + } + + #[test] + fn generation_from_current_data_after_previous() { + let value_1 = create_value(json!("value_1")); + let value_2 = create_value(json!("value_2")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::current(0)); + stream.add_value(value_2.clone(), Generation::previous(0)); + + let mut iter = stream.iter(); + assert_eq!(iter.next(), Some(&value_2)); + assert_eq!(iter.next(), Some(&value_1)); + assert_eq!(iter.next(), None); + } + + #[test] + fn generation_from_new_data_after_current_and_previous() { + let value_1 = create_value(json!("value_1")); + let value_2 = create_value(json!("value_2")); + let value_3 = create_value(json!("value_3")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::New); + stream.add_value(value_2.clone(), Generation::current(0)); + stream.add_value(value_3.clone(), Generation::previous(0)); + + let mut iter = stream.iter(); + assert_eq!(iter.next(), Some(&value_3)); + assert_eq!(iter.next(), Some(&value_2)); + assert_eq!(iter.next(), Some(&value_1)); + assert_eq!(iter.next(), None); + } + + #[test] + fn empty_generations_skipped_in_slice_iter_prev() { + let value_1 = create_value(json!("value_1")); + let value_2 = create_value(json!("value_2")); + let value_3 = create_value(json!("value_3")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::previous(0)); + stream.add_value(value_2.clone(), Generation::previous(1)); + stream.add_value(value_3.clone(), Generation::previous(3)); + + let mut slice_iter = stream.slice_iter(StreamCursor::empty()); + assert_eq!(slice_iter.next(), Some(vec![value_1].as_slice())); + assert_eq!(slice_iter.next(), Some(vec![value_2].as_slice())); + assert_eq!(slice_iter.next(), Some(vec![value_3].as_slice())); + assert_eq!(slice_iter.next(), None); + } + + #[test] + fn empty_generations_skipped_in_slice_iter_current() { + let value_1 = create_value(json!("value_1")); + let value_2 = create_value(json!("value_2")); + let value_3 = create_value(json!("value_3")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::current(0)); + stream.add_value(value_2.clone(), Generation::current(1)); + stream.add_value(value_3.clone(), Generation::current(3)); + + let mut slice_iter = stream.slice_iter(StreamCursor::empty()); + assert_eq!(slice_iter.next(), Some(vec![value_1].as_slice())); + assert_eq!(slice_iter.next(), Some(vec![value_2].as_slice())); + assert_eq!(slice_iter.next(), Some(vec![value_3].as_slice())); + assert_eq!(slice_iter.next(), None); + } + + #[test] + fn compactification_with_previous_new_generation() { + let value_1 = create_value_with_pos(json!("value_1"), 0.into()); + let value_2 = create_value_with_pos(json!("value_2"), 1.into()); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::previous(0)); + stream.add_value(value_2.clone(), Generation::New); + + let trace = ExecutionTrace::from(vec![]); + let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace); + let ap_result = ApResult::stub(); + trace_ctx.meet_ap_end(ap_result.clone()); + trace_ctx.meet_ap_end(ap_result); + + let compactification_result = stream.compactify(&mut trace_ctx); + assert!(compactification_result.is_ok()); + + let actual_trace = trace_ctx.into_result_trace(); + let expected_trace = vec![ + ExecutedState::Ap(ApResult::new(0.into())), + ExecutedState::Ap(ApResult::new(1.into())), + ]; + let expected_trace = ExecutionTrace::from(expected_trace); + + assert_eq!(actual_trace, expected_trace); + } + + #[test] + fn compactification_with_current_generation() { + let value_1 = create_value_with_pos(json!("value_1"), 0.into()); + let value_2 = create_value_with_pos(json!("value_2"), 1.into()); + let value_3 = create_value_with_pos(json!("value_3"), 2.into()); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::current(0)); + stream.add_value(value_2.clone(), Generation::current(2)); + stream.add_value(value_3.clone(), Generation::current(4)); + + let trace = ExecutionTrace::from(vec![]); + let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace); + let ap_result = ApResult::stub(); + trace_ctx.meet_ap_end(ap_result.clone()); + trace_ctx.meet_ap_end(ap_result.clone()); + trace_ctx.meet_ap_end(ap_result); + + let compactification_result = stream.compactify(&mut trace_ctx); + assert!(compactification_result.is_ok()); + + let actual_trace = trace_ctx.into_result_trace(); + let expected_trace = vec![ + ExecutedState::Ap(ApResult::new(0.into())), + ExecutedState::Ap(ApResult::new(1.into())), + ExecutedState::Ap(ApResult::new(2.into())), + ]; + let expected_trace = ExecutionTrace::from(expected_trace); + + assert_eq!(actual_trace, expected_trace); + } + + #[test] + fn compactification_works_with_mixed_generations() { + let value_1 = create_value_with_pos(json!("value_1"), 0.into()); + let value_2 = create_value_with_pos(json!("value_2"), 1.into()); + let value_3 = create_value_with_pos(json!("value_3"), 2.into()); + let value_4 = create_value_with_pos(json!("value_1"), 3.into()); + let value_5 = create_value_with_pos(json!("value_2"), 4.into()); + let value_6 = create_value_with_pos(json!("value_3"), 5.into()); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::New); + stream.add_value(value_2.clone(), Generation::current(4)); + stream.add_value(value_3.clone(), Generation::current(0)); + stream.add_value(value_4.clone(), Generation::previous(100)); + stream.add_value(value_5.clone(), Generation::New); + stream.add_value(value_6.clone(), Generation::current(2)); + + let trace = ExecutionTrace::from(vec![]); + let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace); + let ap_result = ApResult::stub(); + trace_ctx.meet_ap_end(ap_result.clone()); + trace_ctx.meet_ap_end(ap_result.clone()); + trace_ctx.meet_ap_end(ap_result.clone()); + trace_ctx.meet_ap_end(ap_result.clone()); + trace_ctx.meet_ap_end(ap_result.clone()); + trace_ctx.meet_ap_end(ap_result); + + let compactification_result = stream.compactify(&mut trace_ctx); + assert!(compactification_result.is_ok()); + + let actual_trace = trace_ctx.into_result_trace(); + let expected_trace = vec![ + ExecutedState::Ap(ApResult::new(4.into())), + ExecutedState::Ap(ApResult::new(3.into())), + ExecutedState::Ap(ApResult::new(1.into())), + ExecutedState::Ap(ApResult::new(0.into())), + ExecutedState::Ap(ApResult::new(4.into())), + ExecutedState::Ap(ApResult::new(2.into())), + ]; + let expected_trace = ExecutionTrace::from(expected_trace); + + assert_eq!(actual_trace, expected_trace); + } + + #[test] + fn compactification_invalid_state_error() { + let value_1 = create_value(json!("value_1")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::current(0)); + + let trace = ExecutionTrace::from(vec![]); + let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace); + let canon_result = CanonResult(Rc::new(CID::new("fake canon CID"))); + trace_ctx.meet_canon_end(canon_result.clone()); + trace_ctx.meet_canon_end(canon_result.clone()); + trace_ctx.meet_canon_end(canon_result); + + let compactification_result = stream.compactify(&mut trace_ctx); + assert!(matches!( + compactification_result, + Err(ExecutionError::Uncatchable( + UncatchableError::GenerationCompactificationError( + GenerationCompactificationError::TracePosPointsToInvalidState { .. } + ) + )) + )); + } + + #[test] + fn compactification_points_to_nowhere_error() { + let value_1 = create_value(json!("value_1")); + let mut stream = Stream::new(); + + stream.add_value(value_1.clone(), Generation::current(0)); + + let trace = ExecutionTrace::from(vec![]); + let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace); + + let compactification_result = stream.compactify(&mut trace_ctx); + assert!(matches!( + compactification_result, + Err(ExecutionError::Uncatchable( + UncatchableError::GenerationCompactificationError( + GenerationCompactificationError::TracePosPointsToNowhere { .. } + ) + )) + )); + } +} diff --git a/air/src/execution_step/boxed_value/stream/values_matrix.rs b/air/src/execution_step/boxed_value/stream/values_matrix.rs new file mode 100644 index 00000000..73eaf35b --- /dev/null +++ b/air/src/execution_step/boxed_value/stream/values_matrix.rs @@ -0,0 +1,169 @@ +/* + * Copyright 2023 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use air_interpreter_data::GenerationIdx; + +use typed_index_collections::TiVec; + +/// It is intended to store values according to generations which stream has. Each generation could +/// contain several values, generation are ordered, meaning that values in elder generations are +/// handled by AIR instructions (such as fold) later. Also, there is an order between values in one +/// generation. And being placed by generations values could be considered as a matrix. +/// +/// This matrix is used for values in previous and current data. +#[derive(Debug, Clone)] +pub(crate) struct ValuesMatrix { + /// The first Vec represents generations, the second values in a generation. Generation is a set + /// of values that interpreter obtained from one particle. It means that number of generation on + /// a peer is equal to number of the interpreter runs in context of one particle. + values: TiVec>, +} + +impl ValuesMatrix { + pub fn new() -> Self { + Self { values: TiVec::new() } + } + + pub fn remove_empty_generations(&mut self) { + self.values.retain(|generation| !generation.is_empty()) + } + + pub fn generations_count(&self) -> GenerationIdx { + self.values.len().into() + } + + pub fn iter(&self) -> impl Iterator { + self.values.iter().flat_map(|generation| generation.iter()) + } + + pub fn slice_iter(&self, skip: GenerationIdx) -> impl Iterator { + self.values + .iter() + .filter(|generation| !generation.is_empty()) + .skip(skip.into()) + .map(|generation| generation.as_ref()) + } +} + +impl ValuesMatrix { + pub fn add_value_to_generation(&mut self, value: T, generation_idx: GenerationIdx) { + if generation_idx >= self.values.len() { + // TODO: replace unwrap with error + let new_size = generation_idx.checked_add(1.into()).unwrap(); + self.values.resize(new_size.into(), Vec::new()); + } + + self.values[generation_idx].push(value); + } +} + +/// It's intended to handle new values from call results. +#[derive(Debug, Clone)] +pub(crate) struct NewValuesMatrix(ValuesMatrix); + +impl NewValuesMatrix { + pub fn new() -> Self { + let values = ValuesMatrix::new(); + Self(values) + } + + pub fn add_new_empty_generation(&mut self) { + self.0.values.push(vec![]); + } + + pub fn remove_empty_generations(&mut self) { + self.0.remove_empty_generations(); + } + + pub fn remove_last_generation(&mut self) { + self.0.values.pop(); + } + + pub fn last_generation_is_empty(&mut self) -> bool { + if self.0.values.is_empty() { + return true; + } + + self.0.values[self.last_non_empty_generation_idx()].is_empty() + } + + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } + + pub fn slice_iter(&self, skip: GenerationIdx) -> impl Iterator { + self.0.slice_iter(skip) + } + + pub fn generations_count(&self) -> GenerationIdx { + self.0.generations_count() + } + + pub fn last_non_empty_generation_idx(&self) -> GenerationIdx { + let values_len = self.0.values.len(); + if values_len == 0 { + return 0.into(); + } + + (values_len - 1).into() + } +} + +impl NewValuesMatrix { + pub fn add_to_last_generation(&mut self, value: T) { + let last_generation_idx = self.last_non_empty_generation_idx(); + self.0.add_value_to_generation(value, last_generation_idx); + } +} + +impl Default for ValuesMatrix { + fn default() -> Self { + Self { values: TiVec::new() } + } +} + +impl Default for NewValuesMatrix { + fn default() -> Self { + Self(<_>::default()) + } +} + +use std::fmt; + +impl fmt::Display for ValuesMatrix { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.values.is_empty() { + return write!(f, "[]"); + } + + writeln!(f, "[")?; + for (idx, generation_values) in self.values.iter_enumerated() { + write!(f, " -- {idx}: ")?; + for value in generation_values.iter() { + write!(f, "{value}, ")?; + } + writeln!(f)?; + } + + write!(f, "]") + } +} + +impl fmt::Display for NewValuesMatrix { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/air/src/execution_step/boxed_value/stream_map.rs b/air/src/execution_step/boxed_value/stream_map.rs index 334471ea..1b7d4c70 100644 --- a/air/src/execution_step/boxed_value/stream_map.rs +++ b/air/src/execution_step/boxed_value/stream_map.rs @@ -14,14 +14,14 @@ * limitations under the License. */ -use super::stream::*; +use super::Generation; +use super::Stream; use super::ValueAggregate; +use crate::execution_step::boxed_value::TracePosOperate; use crate::execution_step::execution_context::stream_map_key::StreamMapKey; use crate::execution_step::ExecutionResult; use crate::JValue; -use air_interpreter_data::GenerationIdx; -use air_trace_handler::merger::ValueSource; use air_trace_handler::TraceHandler; use serde_json::json; @@ -37,13 +37,11 @@ pub struct StreamMap { } impl StreamMap { - pub(crate) fn from_generations_count(previous_count: GenerationIdx, current_count: GenerationIdx) -> Self { - Self { - stream: Stream::from_generations_count(previous_count, current_count), - } + pub(crate) fn new() -> Self { + Self { stream: Stream::new() } } - pub(crate) fn from_value(key: StreamMapKey<'_>, value: &ValueAggregate) -> Self { + pub(crate) fn insert(&mut self, key: StreamMapKey<'_>, value: &ValueAggregate, generation: Generation) { let obj = from_key_value(key, value.get_result()); let value = ValueAggregate::new( obj, @@ -51,29 +49,10 @@ impl StreamMap { value.get_trace_pos(), value.get_provenance(), ); - Self { - stream: Stream::from_value(value), - } + self.stream.add_value(value, generation) } - pub(crate) fn insert( - &mut self, - key: StreamMapKey<'_>, - value: &ValueAggregate, - generation: Generation, - source: ValueSource, - ) -> ExecutionResult { - let obj = from_key_value(key, value.get_result()); - let value = ValueAggregate::new( - obj, - value.get_tetraplet(), - value.get_trace_pos(), - value.get_provenance(), - ); - self.stream.add_value(value, generation, source) - } - - pub(crate) fn compactify(self, trace_ctx: &mut TraceHandler) -> ExecutionResult { + pub(crate) fn compactify(&mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { self.stream.compactify(trace_ctx) } @@ -87,8 +66,7 @@ impl StreamMap { let mut met_keys = HashSet::new(); - // it's always possible to go through all values - self.stream.iter(Generation::Last).unwrap().filter(move |value| { + self.stream.iter().filter(move |value| { StreamMapKey::from_kvpair(value) .map(|key| met_keys.insert(key)) .unwrap_or(false) @@ -111,106 +89,149 @@ mod test { use crate::execution_step::boxed_value::stream_map::from_key_value; use crate::execution_step::execution_context::stream_map_key::StreamMapKey; use crate::execution_step::ValueAggregate; - use air_trace_handler::merger::ValueSource; + use crate::ExecutionError; + use crate::JValue; + use crate::UncatchableError; + + use air_interpreter_cid::CID; + use air_interpreter_data::ExecutionTrace; + use air_trace_handler::GenerationCompactificationError; + use air_trace_handler::TraceHandler; use serde_json::json; + use std::borrow::Cow; use std::rc::Rc; + fn create_value_aggregate(value: Rc) -> ValueAggregate { + ValueAggregate::new( + value, + <_>::default(), + 0.into(), + air_interpreter_data::Provenance::literal(), + ) + } + + fn compare_stream_iter<'value>( + mut iter: impl Iterator, + key: StreamMapKey<'_>, + value: &Rc, + ) -> bool { + let actual_value = iter.next().map(|e| e.get_result()).unwrap(); + let expected_value = from_key_value(key, value); + + actual_value == &expected_value + } + #[test] - fn test_from_value() { - let obj = json!([{"top_level": [{"first": 42},{"second": 43}]}]); - let key_str = "some_key"; - let key = StreamMapKey::Str(Cow::Borrowed(key_str)); - let value = Rc::new(obj.clone()); + fn test_from_value_key_str() { + let key = StreamMapKey::Str(Cow::Borrowed("some_key")); + let value = Rc::new(json!("1")); + let value_aggregate = create_value_aggregate(value.clone()); - let generation_idx = 0; - let generation = Generation::Nth(generation_idx.into()); - let value_aggregate: ValueAggregate = ValueAggregate::new( - value.clone(), - <_>::default(), - 0.into(), - air_interpreter_data::Provenance::literal(), - ); - let stream_map = StreamMap::from_value(key.clone(), &value_aggregate); + let mut stream_map = StreamMap::new(); + stream_map.insert(key.clone(), &value_aggregate, Generation::New); + let mut iter = stream_map.stream.iter(); - let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap(); - let v = internal_stream_iter.next().map(|e| e.get_result()).unwrap(); - let examplar = from_key_value(key, value.as_ref()); - assert_eq!(*v, examplar); - assert_eq!(internal_stream_iter.next(), None); + assert!(compare_stream_iter(&mut iter, key, &value)); + assert_eq!(iter.next(), None); + } + #[test] + fn test_from_value_key_int() { let key = StreamMapKey::I64(42.into()); - let value_aggregate = ValueAggregate::new( - value.clone(), - <_>::default(), - 0.into(), - air_interpreter_data::Provenance::literal(), - ); - let stream_map = StreamMap::from_value(key.clone(), &value_aggregate); + let value = Rc::new(json!("1")); + let value_aggregate = create_value_aggregate(value.clone()); - let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap(); - let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap(); - let examplar = from_key_value(key, value.as_ref()); - assert_eq!(*v, *examplar.as_ref()); - assert_eq!(internal_stream_iter.next(), None); + let mut stream_map = StreamMap::new(); + stream_map.insert(key.clone(), &value_aggregate, Generation::New); + let mut iter = stream_map.stream.iter(); + + assert!(compare_stream_iter(&mut iter, key, &value)); + assert_eq!(iter.next(), None); } #[test] fn test_insert() { - let obj = json!([{"top_level": [{"first": 42},{"second": 43}]}]); - let key_str = "some_key"; - let key12 = StreamMapKey::Str(Cow::Borrowed(key_str)); - let value = Rc::new(obj.clone()); - let generation_idx = 0; - let value_aggregate: ValueAggregate = ValueAggregate::new( - value.clone(), - <_>::default(), - 0.into(), - air_interpreter_data::Provenance::literal(), - ); - let mut stream_map = StreamMap::from_value(key12.clone(), &value_aggregate); - let generation = Generation::Nth(generation_idx.into()); - let generation_idx_res = stream_map - .insert(key12.clone(), &value_aggregate, generation, ValueSource::CurrentData) - .unwrap(); - assert_eq!(generation_idx_res, generation_idx); + let key_1_2 = StreamMapKey::Str(Cow::Borrowed("some_key")); + let value_1 = Rc::new(json!("1")); + let value_aggregate_1 = create_value_aggregate(value_1.clone()); - let examplar = from_key_value(key12, value.as_ref()); - let s = stream_map - .stream - .iter(generation) - .unwrap() - .all(|e| *e.get_result().as_ref() == *examplar.as_ref()); - assert!(s); + let value_2 = Rc::new(json!("2")); + let value_aggregate_2 = create_value_aggregate(value_2.clone()); - let key_str = "other_key"; - let key3 = StreamMapKey::Str(Cow::Borrowed(key_str)); - let generation_idx = stream_map - .insert(key3.clone(), &value_aggregate, generation, ValueSource::CurrentData) - .unwrap(); - assert_eq!(generation_idx_res, generation_idx); + let mut stream_map = StreamMap::new(); + stream_map.insert(key_1_2.clone(), &value_aggregate_1, Generation::New); + stream_map.insert(key_1_2.clone(), &value_aggregate_2, Generation::Current(0.into())); - let key4 = StreamMapKey::I64(42.into()); - let generation_idx = stream_map - .insert(key4.clone(), &value_aggregate, generation, ValueSource::CurrentData) - .unwrap(); - assert_eq!(generation_idx_res, generation_idx); + let key_3 = StreamMapKey::Str(Cow::Borrowed("other_key")); + let value_3 = Rc::new(json!("3")); + let value_aggregate_3 = create_value_aggregate(value_3.clone()); + stream_map.insert(key_3.clone(), &value_aggregate_3, Generation::Current(0.into())); - let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap(); - let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap(); - assert_eq!(*v, *examplar.as_ref()); + let key_4 = StreamMapKey::I64(42.into()); + let value_4 = Rc::new(json!("4")); + let value_aggregate_4 = create_value_aggregate(value_4.clone()); + stream_map.insert(key_4.clone(), &value_aggregate_4, Generation::Current(0.into())); - let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap(); - assert_eq!(*v, *examplar.as_ref()); + let mut iter = stream_map.stream.iter(); - let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap(); - let examplar = from_key_value(key3, value.as_ref()); - assert_eq!(*v, *examplar.as_ref()); + assert!(compare_stream_iter(&mut iter, key_1_2.clone(), &value_2)); + assert!(compare_stream_iter(&mut iter, key_3, &value_3)); + assert!(compare_stream_iter(&mut iter, key_4, &value_4)); + assert!(compare_stream_iter(&mut iter, key_1_2, &value_1)); + assert_eq!(iter.next(), None); + } - let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap(); - let examplar = from_key_value(key4, value.as_ref()); - assert_eq!(*v, *examplar.as_ref()); - assert_eq!(internal_stream_iter.next(), None); + #[test] + fn compactification_invalid_state_error() { + use air_interpreter_data::CanonResult; + + let key = StreamMapKey::Str(Cow::Borrowed("some_key")); + let value = Rc::new(json!("1")); + let value_aggregate = create_value_aggregate(value.clone()); + let mut stream_map = StreamMap::new(); + + stream_map.insert(key, &value_aggregate, Generation::Current(0.into())); + + let trace = ExecutionTrace::from(vec![]); + let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace); + let canon_result = CanonResult(Rc::new(CID::new("fake canon CID"))); + trace_ctx.meet_canon_end(canon_result.clone()); + trace_ctx.meet_canon_end(canon_result.clone()); + trace_ctx.meet_canon_end(canon_result); + + let compactification_result = stream_map.compactify(&mut trace_ctx); + assert!(matches!( + compactification_result, + Err(ExecutionError::Uncatchable( + UncatchableError::GenerationCompactificationError( + GenerationCompactificationError::TracePosPointsToInvalidState { .. } + ) + )) + )); + } + + #[test] + fn compactification_points_to_nowhere_error() { + let key = StreamMapKey::Str(Cow::Borrowed("some_key")); + let value = Rc::new(json!("1")); + let value_aggregate = create_value_aggregate(value.clone()); + let mut stream_map = StreamMap::new(); + + stream_map.insert(key, &value_aggregate, Generation::current(0)); + + let trace = ExecutionTrace::from(vec![]); + let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace); + + let compactification_result = stream_map.compactify(&mut trace_ctx); + assert!(matches!( + compactification_result, + Err(ExecutionError::Uncatchable( + UncatchableError::GenerationCompactificationError( + GenerationCompactificationError::TracePosPointsToNowhere { .. } + ) + )) + )); } fn generate_key_values(count: usize) -> Vec<(String, ValueAggregate)> { @@ -230,30 +251,18 @@ mod test { .collect() } - fn insert_into_map( - stream_map: &mut StreamMap, - key_value: &(String, ValueAggregate), - generation: Generation, - source: ValueSource, - ) { - stream_map - .insert(key_value.0.as_str().into(), &key_value.1, generation, source) - .unwrap(); + fn insert_into_map(stream_map: &mut StreamMap, key_value: &(String, ValueAggregate), generation: Generation) { + stream_map.insert(key_value.0.as_str().into(), &key_value.1, generation); } #[test] fn get_unique_map_keys_stream_behaves_correct_with_no_duplicates() { const TEST_DATA_SIZE: usize = 3; let key_values = generate_key_values(TEST_DATA_SIZE); - let mut stream_map = StreamMap::from_generations_count(0.into(), TEST_DATA_SIZE.into()); + let mut stream_map = StreamMap::new(); for id in 0..3 { - insert_into_map( - &mut stream_map, - &key_values[id], - Generation::nth(id as u32), - ValueSource::CurrentData, - ); + insert_into_map(&mut stream_map, &key_values[id], Generation::current(id as u32)); } let mut iter = stream_map.iter_unique_key(); @@ -266,19 +275,17 @@ mod test { #[test] fn get_unique_map_keys_stream_removes_duplicates() { - use ValueSource::CurrentData; - const TEST_DATA_SIZE: usize = 5; let key_values = generate_key_values(TEST_DATA_SIZE); - let mut stream_map = StreamMap::from_generations_count(0.into(), TEST_DATA_SIZE.into()); - insert_into_map(&mut stream_map, &key_values[0], Generation::nth(0), CurrentData); - insert_into_map(&mut stream_map, &key_values[0], Generation::nth(1), CurrentData); - insert_into_map(&mut stream_map, &key_values[2], Generation::nth(1), CurrentData); - insert_into_map(&mut stream_map, &key_values[2], Generation::nth(3), CurrentData); - insert_into_map(&mut stream_map, &key_values[2], Generation::nth(4), CurrentData); - insert_into_map(&mut stream_map, &key_values[1], Generation::nth(4), CurrentData); - insert_into_map(&mut stream_map, &key_values[3], Generation::nth(2), CurrentData); + let mut stream_map = StreamMap::new(); + insert_into_map(&mut stream_map, &key_values[0], Generation::current(0)); + insert_into_map(&mut stream_map, &key_values[0], Generation::current(1)); + insert_into_map(&mut stream_map, &key_values[2], Generation::current(1)); + insert_into_map(&mut stream_map, &key_values[2], Generation::current(3)); + insert_into_map(&mut stream_map, &key_values[2], Generation::current(4)); + insert_into_map(&mut stream_map, &key_values[1], Generation::current(4)); + insert_into_map(&mut stream_map, &key_values[3], Generation::current(2)); let mut iter = stream_map.iter_unique_key(); diff --git a/air/src/execution_step/errors/uncatchable_errors.rs b/air/src/execution_step/errors/uncatchable_errors.rs index 9836babb..d5cd97bf 100644 --- a/air/src/execution_step/errors/uncatchable_errors.rs +++ b/air/src/execution_step/errors/uncatchable_errors.rs @@ -20,7 +20,7 @@ use crate::ToErrorCode; use air_interpreter_cid::CidCalculationError; use air_interpreter_data::ValueRef; -use air_trace_handler::GenerationCompatificationError; +use air_trace_handler::GenerationCompactificationError; use air_trace_handler::IntConversionError; use air_trace_handler::TraceHandlerError; @@ -46,7 +46,7 @@ pub enum UncatchableError { /// These errors are related to internal bug in the interpreter when result trace is corrupted. #[error(transparent)] - GenerationCompatificationError(#[from] GenerationCompatificationError), + GenerationCompactificationError(#[from] GenerationCompactificationError), /// Integer casts, e.g. usize(=u64) to u32, might trigger such errors. #[error(transparent)] diff --git a/air/src/execution_step/execution_context/context.rs b/air/src/execution_step/execution_context/context.rs index 44260a12..7d2773cd 100644 --- a/air/src/execution_step/execution_context/context.rs +++ b/air/src/execution_step/execution_context/context.rs @@ -25,8 +25,6 @@ use air_execution_info_collector::InstructionTracker; use air_interpreter_cid::CID; use air_interpreter_data::CanonResultCidAggregate; use air_interpreter_data::CidInfo; -use air_interpreter_data::GlobalStreamGens; -use air_interpreter_data::RestrictedStreamGens; use air_interpreter_data::ServiceResultCidAggregate; use air_interpreter_interface::*; use air_interpreter_signatures::SignatureStore; @@ -99,12 +97,7 @@ impl<'i> ExecutionCtx<'i> { run_parameters: &RunParameters, ) -> Self { let run_parameters = RcRunParameters::from_run_parameters(run_parameters); - let streams = Streams::from_data( - prev_ingredients.global_streams, - current_ingredients.global_streams, - prev_ingredients.restricted_streams, - current_ingredients.restricted_streams, - ); + let streams = Streams::new(); let cid_state = ExecutionCidState::from_cid_info(prev_ingredients.cid_info, current_ingredients.cid_info); // TODO we might keep both stores and merge them only with signature info collected into SignatureTracker @@ -162,9 +155,7 @@ impl ExecutionCtx<'_> { /// Helper struct for ExecCtx construction. #[derive(Debug, Clone)] pub(crate) struct ExecCtxIngredients { - pub(crate) global_streams: GlobalStreamGens, pub(crate) last_call_request_id: u32, - pub(crate) restricted_streams: RestrictedStreamGens, pub(crate) cid_info: CidInfo, pub(crate) signature_store: SignatureStore, } diff --git a/air/src/execution_step/execution_context/stream_maps_variables.rs b/air/src/execution_step/execution_context/stream_maps_variables.rs index d20241cd..55bb36a6 100644 --- a/air/src/execution_step/execution_context/stream_maps_variables.rs +++ b/air/src/execution_step/execution_context/stream_maps_variables.rs @@ -23,12 +23,8 @@ use crate::execution_step::ExecutionResult; use crate::execution_step::Generation; use crate::execution_step::ValueAggregate; -use air_interpreter_data::GenerationIdx; -use air_interpreter_data::RestrictedStreamGens; -use air_interpreter_data::RestrictedStreamMapGens; use air_parser::ast::Span; use air_parser::AirPos; -use air_trace_handler::merger::ValueSource; use air_trace_handler::TraceHandler; use std::collections::hash_map::Entry::{Occupied, Vacant}; @@ -39,23 +35,15 @@ use std::fmt; pub(crate) struct StreamMapValueDescriptor<'stream_name> { pub value: ValueAggregate, pub name: &'stream_name str, - pub source: ValueSource, pub generation: Generation, pub position: AirPos, } impl<'stream_name> StreamMapValueDescriptor<'stream_name> { - pub fn new( - value: ValueAggregate, - name: &'stream_name str, - source: ValueSource, - generation: Generation, - position: AirPos, - ) -> Self { + pub fn new(value: ValueAggregate, name: &'stream_name str, generation: Generation, position: AirPos) -> Self { Self { value, name, - source, generation, position, } @@ -113,18 +101,6 @@ pub(crate) struct StreamMaps { // that a script could have a lot of new. // TODO: use shared string (Rc) to avoid copying. stream_maps: HashMap>, - - /// Contains stream generations from previous data that a restricted stream - /// should have at the scope start. - previous_restricted_stream_maps_gens: RestrictedStreamMapGens, - - /// Contains stream generations from current data that a restricted stream - /// should have at the scope start. - current_restricted_stream_maps_gens: RestrictedStreamMapGens, - - /// Contains stream generations that each private stream had at the scope end. - /// Then it's placed into data - new_restricted_stream_maps_gens: RestrictedStreamMapGens, } impl StreamMaps { @@ -144,17 +120,16 @@ impl StreamMaps { &mut self, key: StreamMapKey<'_>, value_descriptor: StreamMapValueDescriptor<'_>, - ) -> ExecutionResult { + ) { let StreamMapValueDescriptor { value, name, - source, generation, position, } = value_descriptor; match self.get_mut(name, position) { - Some(stream_map) => stream_map.insert(key, &value, generation, source), + Some(stream_map) => stream_map.insert(key, &value, generation), None => { // streams could be created in three ways: // - after met new instruction with stream name that isn't present in streams @@ -163,20 +138,18 @@ impl StreamMaps { // for global streams // - and by this function, and if there is no such a streams in streams, // it means that a new global one should be created. - let stream_map = StreamMap::from_value(key, &value); + let mut stream_map = StreamMap::new(); + stream_map.insert(key, &value, generation); let descriptor = StreamMapDescriptor::global(stream_map); self.stream_maps.insert(name.to_string(), vec![descriptor]); - let generation = 0; - Ok(generation.into()) } } } - pub(crate) fn meet_scope_start(&mut self, name: impl Into, span: Span, iteration: usize) { + pub(crate) fn meet_scope_start(&mut self, name: impl Into, span: Span) { let name = name.into(); - let (prev_gens_count, current_gens_count) = self.stream_generation_from_data(&name, span.left, iteration); - let new_stream_map = StreamMap::from_generations_count(prev_gens_count, current_gens_count); + let new_stream_map = StreamMap::new(); let new_descriptor = StreamMapDescriptor::restricted(new_stream_map, span); match self.stream_maps.entry(name) { Occupied(mut entry) => { @@ -188,64 +161,27 @@ impl StreamMaps { } } - pub(crate) fn meet_scope_end( - &mut self, - name: String, - position: AirPos, - trace_ctx: &mut TraceHandler, - ) -> ExecutionResult<()> { + pub(crate) fn meet_scope_end(&mut self, name: String, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { // unwraps are safe here because met_scope_end must be called after met_scope_start let stream_map_descriptors = self.stream_maps.get_mut(&name).unwrap(); // delete a stream after exit from a scope - let last_descriptor = stream_map_descriptors.pop().unwrap(); + let mut last_descriptor = stream_map_descriptors.pop().unwrap(); if stream_map_descriptors.is_empty() { // streams should contain only non-empty stream embodiments self.stream_maps.remove(&name); } - let gens_count = last_descriptor.stream_map.compactify(trace_ctx)?; - self.collect_stream_generation(name, position, gens_count); - Ok(()) + last_descriptor.stream_map.compactify(trace_ctx) } - fn stream_generation_from_data( - &self, - name: &str, - position: AirPos, - iteration: usize, - ) -> (GenerationIdx, GenerationIdx) { - let previous_generation = - Self::restricted_stream_generation(&self.previous_restricted_stream_maps_gens, name, position, iteration) - .unwrap_or_default(); - let current_generation = - Self::restricted_stream_generation(&self.current_restricted_stream_maps_gens, name, position, iteration) - .unwrap_or_default(); - - (previous_generation, current_generation) - } - - fn restricted_stream_generation( - restricted_stream_maps_gens: &RestrictedStreamGens, - name: &str, - position: AirPos, - iteration: usize, - ) -> Option { - restricted_stream_maps_gens - .get(name) - .and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration))) - .copied() - } - - fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: GenerationIdx) { - match self.new_restricted_stream_maps_gens.entry(name) { - Occupied(mut streams) => streams.get_mut().entry(position).or_default().push(generation), - Vacant(entry) => { - let iterations = maplit::hashmap! { - position => vec![generation], - }; - entry.insert(iterations); + pub(crate) fn compactify(&mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { + for (_, descriptors) in self.stream_maps.iter_mut() { + for descriptor in descriptors.iter_mut() { + descriptor.stream_map.compactify(trace_ctx)?; } } + + Ok(()) } } diff --git a/air/src/execution_step/execution_context/streams_variables.rs b/air/src/execution_step/execution_context/streams_variables.rs index 9f1d1a77..6262719f 100644 --- a/air/src/execution_step/execution_context/streams_variables.rs +++ b/air/src/execution_step/execution_context/streams_variables.rs @@ -16,18 +16,13 @@ mod stream_descriptor; mod stream_value_descriptor; -mod utils; use crate::execution_step::ExecutionResult; use crate::execution_step::Stream; -use crate::ExecutionError; use stream_descriptor::*; pub(crate) use stream_value_descriptor::StreamValueDescriptor; -use air_interpreter_data::GenerationIdx; -use air_interpreter_data::GlobalStreamGens; -use air_interpreter_data::RestrictedStreamGens; use air_parser::ast::Span; use air_parser::AirPos; use air_trace_handler::TraceHandler; @@ -41,34 +36,12 @@ pub(crate) struct Streams { // that a script could have a lot of new. // TODO: use shared string (Rc) to avoid copying. streams: HashMap>, - - /// Contains stream generations from previous data that a restricted stream - /// should have at the scope start. - previous_restricted_stream_gens: RestrictedStreamGens, - - /// Contains stream generations from current data that a restricted stream - /// should have at the scope start. - current_restricted_stream_gens: RestrictedStreamGens, - - /// Contains stream generations that each private stream had at the scope end. - /// Then it's placed into data - new_restricted_stream_gens: RestrictedStreamGens, } impl Streams { - pub(crate) fn from_data( - previous_global_streams: GlobalStreamGens, - current_global_streams: GlobalStreamGens, - previous_restricted_stream_gens: RestrictedStreamGens, - current_restricted_stream_gens: RestrictedStreamGens, - ) -> Self { - let streams = utils::merge_global_streams(previous_global_streams, current_global_streams); - + pub(crate) fn new() -> Self { Self { - streams, - previous_restricted_stream_gens, - current_restricted_stream_gens, - new_restricted_stream_gens: <_>::default(), + streams: <_>::default(), } } @@ -84,20 +57,16 @@ impl Streams { .and_then(|descriptors| find_closest_mut(descriptors.iter_mut(), position)) } - pub(crate) fn add_stream_value( - &mut self, - value_descriptor: StreamValueDescriptor<'_>, - ) -> ExecutionResult { + pub(crate) fn add_stream_value(&mut self, value_descriptor: StreamValueDescriptor<'_>) { let StreamValueDescriptor { value, name, - source, generation, position, } = value_descriptor; match self.get_mut(name, position) { - Some(stream) => stream.add_value(value, generation, source), + Some(stream) => stream.add_value(value, generation), None => { // streams could be created in three ways: // - after met new instruction with stream name that isn't present in streams @@ -106,20 +75,18 @@ impl Streams { // for global streams // - and by this function, and if there is no such a streams in streams, // it means that a new global one should be created. - let stream = Stream::from_value(value); + let mut stream = Stream::new(); + stream.add_value(value, generation); let descriptor = StreamDescriptor::global(stream); self.streams.insert(name.to_string(), vec![descriptor]); - let generation = 0; - Ok(generation.into()) } } } - pub(crate) fn meet_scope_start(&mut self, name: impl Into, span: Span, iteration: usize) { + pub(crate) fn meet_scope_start(&mut self, name: impl Into, span: Span) { let name = name.into(); - let (prev_gens_count, current_gens_count) = self.stream_generation_from_data(&name, span.left, iteration); - let new_stream = Stream::from_generations_count(prev_gens_count, current_gens_count); + let new_stream = Stream::new(); let new_descriptor = StreamDescriptor::restricted(new_stream, span); match self.streams.entry(name) { Occupied(mut entry) => { @@ -131,93 +98,27 @@ impl Streams { } } - pub(crate) fn meet_scope_end( - &mut self, - name: String, - position: AirPos, - trace_ctx: &mut TraceHandler, - ) -> ExecutionResult<()> { + pub(crate) fn meet_scope_end(&mut self, name: String, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { // unwraps are safe here because met_scope_end must be called after met_scope_start let stream_descriptors = self.streams.get_mut(&name).unwrap(); // delete a stream after exit from a scope - let last_descriptor = stream_descriptors.pop().unwrap(); + let mut last_descriptor = stream_descriptors.pop().unwrap(); if stream_descriptors.is_empty() { // streams should contain only non-empty stream embodiments self.streams.remove(&name); } - let gens_count = last_descriptor.stream.compactify(trace_ctx)?; - self.collect_stream_generation(name, position, gens_count); - Ok(()) + last_descriptor.stream.compactify(trace_ctx) } - /// This method must be called at the end of execution, because it contains logic to collect - /// all global streams depending on their presence in a streams field. - pub(crate) fn into_streams_data( - self, - trace_ctx: &mut TraceHandler, - ) -> ExecutionResult<(GlobalStreamGens, RestrictedStreamGens)> { - // since it's called at the end of execution, streams contains only global ones, - // because all private's been deleted after exiting a scope - let global_streams = self - .streams - .into_iter() - .map(|(name, mut descriptors)| -> Result<_, ExecutionError> { - // unwrap is safe here because of invariant that streams contains non-empty vectors, - // moreover it must contain only one value, because this method is called at the end - // of the execution - let stream = descriptors.pop().unwrap().stream; - let gens_count = stream.compactify(trace_ctx)?; - Ok((name, gens_count)) - }) - .collect::>()?; - - Ok((global_streams, self.new_restricted_stream_gens)) - } - - fn stream_generation_from_data( - &self, - name: &str, - position: AirPos, - iteration: usize, - ) -> (GenerationIdx, GenerationIdx) { - let previous_generation = - Self::restricted_stream_generation(&self.previous_restricted_stream_gens, name, position, iteration) - .unwrap_or_default(); - let current_generation = - Self::restricted_stream_generation(&self.current_restricted_stream_gens, name, position, iteration) - .unwrap_or_default(); - - (previous_generation, current_generation) - } - - fn restricted_stream_generation( - restricted_stream_gens: &RestrictedStreamGens, - name: &str, - position: AirPos, - iteration: usize, - ) -> Option { - restricted_stream_gens - .get(name) - .and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration))) - .copied() - } - - fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: GenerationIdx) { - match self.new_restricted_stream_gens.entry(name) { - Occupied(mut streams) => match streams.get_mut().entry(position) { - Occupied(mut iterations) => iterations.get_mut().push(generation), - Vacant(entry) => { - entry.insert(vec![generation]); - } - }, - Vacant(entry) => { - let iterations = maplit::hashmap! { - position => vec![generation], - }; - entry.insert(iterations); + pub(crate) fn compactify(&mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { + for (_, descriptors) in self.streams.iter_mut() { + for descriptor in descriptors { + descriptor.stream.compactify(trace_ctx)?; } } + + Ok(()) } } diff --git a/air/src/execution_step/execution_context/streams_variables/stream_value_descriptor.rs b/air/src/execution_step/execution_context/streams_variables/stream_value_descriptor.rs index ee784216..e0b0c98d 100644 --- a/air/src/execution_step/execution_context/streams_variables/stream_value_descriptor.rs +++ b/air/src/execution_step/execution_context/streams_variables/stream_value_descriptor.rs @@ -18,28 +18,19 @@ use crate::execution_step::Generation; use crate::execution_step::ValueAggregate; use air_parser::AirPos; -use air_trace_handler::merger::ValueSource; pub(crate) struct StreamValueDescriptor<'stream_name> { pub value: ValueAggregate, pub name: &'stream_name str, - pub source: ValueSource, pub generation: Generation, pub position: AirPos, } impl<'stream_name> StreamValueDescriptor<'stream_name> { - pub fn new( - value: ValueAggregate, - name: &'stream_name str, - source: ValueSource, - generation: Generation, - position: AirPos, - ) -> Self { + pub fn new(value: ValueAggregate, name: &'stream_name str, generation: Generation, position: AirPos) -> Self { Self { value, name, - source, generation, position, } diff --git a/air/src/execution_step/execution_context/streams_variables/utils.rs b/air/src/execution_step/execution_context/streams_variables/utils.rs deleted file mode 100644 index 4cf6197f..00000000 --- a/air/src/execution_step/execution_context/streams_variables/utils.rs +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2022 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use super::StreamDescriptor; -use crate::execution_step::Stream; - -use air_interpreter_data::GlobalStreamGens; - -use std::collections::HashMap; - -pub(super) fn merge_global_streams( - previous_global_streams: GlobalStreamGens, - current_global_streams: GlobalStreamGens, -) -> HashMap> { - let mut global_streams = previous_global_streams - .iter() - .map(|(stream_name, &prev_gens_count)| { - let current_gens_count = current_global_streams.get(stream_name).cloned().unwrap_or_default(); - let global_stream = Stream::from_generations_count(prev_gens_count, current_gens_count); - let descriptor = StreamDescriptor::global(global_stream); - (stream_name.to_string(), vec![descriptor]) - }) - .collect::>(); - - for (stream_name, current_gens_count) in current_global_streams { - if previous_global_streams.contains_key(&stream_name) { - continue; - } - - let global_stream = Stream::from_generations_count(0.into(), current_gens_count); - let descriptor = StreamDescriptor::global(global_stream); - global_streams.insert(stream_name, vec![descriptor]); - } - - global_streams -} diff --git a/air/src/execution_step/instructions/ap.rs b/air/src/execution_step/instructions/ap.rs index d86ef5a2..2ecf52f2 100644 --- a/air/src/execution_step/instructions/ap.rs +++ b/air/src/execution_step/instructions/ap.rs @@ -30,7 +30,6 @@ use crate::JValue; use apply_to_arguments::apply_to_arg; use utils::*; -use air_interpreter_data::GenerationIdx; use air_parser::ast; use air_parser::ast::Ap; use air_trace_handler::merger::MergerApResult; @@ -52,8 +51,8 @@ impl<'i> super::ExecutableInstruction<'i> for Ap<'i> { )?; let merger_ap_result = to_merger_ap_result(self, trace_ctx)?; - let maybe_generation = populate_context(&self.result, &merger_ap_result, result, exec_ctx)?; - maybe_update_trace(maybe_generation, trace_ctx); + populate_context(&self.result, &merger_ap_result, result, exec_ctx)?; + maybe_update_trace(should_touch_trace, trace_ctx); Ok(()) } @@ -80,21 +79,24 @@ fn populate_context<'ctx>( merger_ap_result: &MergerApResult, result: ValueAggregate, exec_ctx: &mut ExecutionCtx<'ctx>, -) -> ExecutionResult> { +) -> ExecutionResult<()> { match ap_result { - ast::ApResult::Scalar(scalar) => exec_ctx.scalars.set_scalar_value(scalar.name, result).map(|_| None), + ast::ApResult::Scalar(scalar) => { + exec_ctx.scalars.set_scalar_value(scalar.name, result)?; + } ast::ApResult::Stream(stream) => { let value_descriptor = generate_value_descriptor(result, stream, merger_ap_result); - exec_ctx.streams.add_stream_value(value_descriptor).map(Some) + exec_ctx.streams.add_stream_value(value_descriptor); } - } + }; + + Ok(()) } -fn maybe_update_trace(maybe_generation: Option, trace_ctx: &mut TraceHandler) { +fn maybe_update_trace(should_touch_trace: bool, trace_ctx: &mut TraceHandler) { use air_interpreter_data::ApResult; - if let Some(generation) = maybe_generation { - let final_ap_result = ApResult::new(generation); - trace_ctx.meet_ap_end(final_ap_result); + if should_touch_trace { + trace_ctx.meet_ap_end(ApResult::stub()); } } diff --git a/air/src/execution_step/instructions/ap/apply_to_arguments.rs b/air/src/execution_step/instructions/ap/apply_to_arguments.rs index 3a369eff..2481167e 100644 --- a/air/src/execution_step/instructions/ap/apply_to_arguments.rs +++ b/air/src/execution_step/instructions/ap/apply_to_arguments.rs @@ -15,8 +15,11 @@ */ use super::*; +use crate::execution_step::boxed_value::TracePosOperate; use crate::execution_step::resolver::Resolvable; -use crate::execution_step::{CanonResultAggregate, LiteralAggregate, PEEK_ALLOWED_ON_NON_EMPTY}; +use crate::execution_step::CanonResultAggregate; +use crate::execution_step::LiteralAggregate; +use crate::execution_step::PEEK_ALLOWED_ON_NON_EMPTY; use crate::UncatchableError; use air_interpreter_data::Provenance; diff --git a/air/src/execution_step/instructions/ap/utils.rs b/air/src/execution_step/instructions/ap/utils.rs index 00f8cd9d..20b8813e 100644 --- a/air/src/execution_step/instructions/ap/utils.rs +++ b/air/src/execution_step/instructions/ap/utils.rs @@ -27,21 +27,12 @@ pub(super) fn generate_value_descriptor<'stream>( stream: &'stream ast::Stream<'_>, ap_result: &MergerApResult, ) -> StreamValueDescriptor<'stream> { - use air_trace_handler::merger::ValueSource; - match ap_result { - MergerApResult::NotMet => StreamValueDescriptor::new( - value, - stream.name, - ValueSource::PreviousData, - Generation::Last, - stream.position, - ), + MergerApResult::NotMet => StreamValueDescriptor::new(value, stream.name, Generation::New, stream.position), MergerApResult::Met(met_result) => StreamValueDescriptor::new( value, stream.name, - met_result.value_source, - Generation::Nth(met_result.generation), + Generation::from_met_result(met_result), stream.position, ), } @@ -52,21 +43,12 @@ pub(crate) fn generate_map_value_descriptor<'stream>( stream: &'stream ast::StreamMap<'_>, ap_result: &MergerApResult, ) -> StreamMapValueDescriptor<'stream> { - use air_trace_handler::merger::ValueSource; - match ap_result { - MergerApResult::NotMet => StreamMapValueDescriptor::new( - value, - stream.name, - ValueSource::PreviousData, - Generation::Last, - stream.position, - ), + MergerApResult::NotMet => StreamMapValueDescriptor::new(value, stream.name, Generation::New, stream.position), MergerApResult::Met(met_result) => StreamMapValueDescriptor::new( value, stream.name, - met_result.value_source, - Generation::Nth(met_result.generation), + Generation::from_met_result(met_result), stream.position, ), } diff --git a/air/src/execution_step/instructions/ap_map.rs b/air/src/execution_step/instructions/ap_map.rs index 72d8dec4..e98f369e 100644 --- a/air/src/execution_step/instructions/ap_map.rs +++ b/air/src/execution_step/instructions/ap_map.rs @@ -28,10 +28,9 @@ use crate::unsupported_map_key_type; use crate::CatchableError; use crate::ExecutionError; -use air_interpreter_data::GenerationIdx; +use air_interpreter_data::ApResult; use air_parser::ast::ApMap; use air_parser::ast::ApMapKey; -use air_parser::ast::Number; use air_parser::ast::StreamMap; use air_trace_handler::merger::MergerApResult; @@ -46,8 +45,8 @@ impl<'i> super::ExecutableInstruction<'i> for ApMap<'i> { let merger_ap_result = to_merger_ap_map_result(&self, trace_ctx)?; let key = resolve_if_needed(&self.key, exec_ctx, self.map.name)?; - let generation = populate_context(key, &self.map, &merger_ap_result, result, exec_ctx)?; - maybe_update_trace(generation, trace_ctx); + populate_context(key, &self.map, &merger_ap_result, result, exec_ctx); + trace_ctx.meet_ap_end(ApResult::stub()); Ok(()) } @@ -64,9 +63,9 @@ fn populate_context<'ctx>( merger_ap_result: &MergerApResult, result: ValueAggregate, exec_ctx: &mut ExecutionCtx<'ctx>, -) -> ExecutionResult { +) { let value_descriptor = generate_map_value_descriptor(result, ap_map_result, merger_ap_result); - exec_ctx.stream_maps.add_stream_map_value(key, value_descriptor) + exec_ctx.stream_maps.add_stream_map_value(key, value_descriptor); } fn resolve_if_needed<'ctx>( @@ -74,6 +73,8 @@ fn resolve_if_needed<'ctx>( exec_ctx: &mut ExecutionCtx<'ctx>, map_name: &str, ) -> Result, ExecutionError> { + use air_parser::ast::Number; + match key { &ApMapKey::Literal(s) => Ok(s.into()), ApMapKey::Number(n) => match n { @@ -94,10 +95,3 @@ fn resolve<'ctx>( let (value, _, _) = resolvable.resolve(exec_ctx)?; StreamMapKey::from_value(value, map_name) } - -fn maybe_update_trace(generation: GenerationIdx, trace_ctx: &mut TraceHandler) { - use air_interpreter_data::ApResult; - - let final_ap_result = ApResult::new(generation); - trace_ctx.meet_ap_end(final_ap_result); -} diff --git a/air/src/execution_step/instructions/call/call_result_setter.rs b/air/src/execution_step/instructions/call/call_result_setter.rs index 49bc32c2..6cd14bd0 100644 --- a/air/src/execution_step/instructions/call/call_result_setter.rs +++ b/air/src/execution_step/instructions/call/call_result_setter.rs @@ -58,16 +58,11 @@ pub(crate) fn populate_context_from_peer_service_result<'i>( let executed_result = ValueAggregate::from_service_result(executed_result, service_result_agg_cid.clone()); - let value_descriptor = StreamValueDescriptor::new( - executed_result, - stream.name, - ValueSource::PreviousData, - Generation::Last, - stream.position, - ); - let generation = exec_ctx.streams.add_stream_value(value_descriptor)?; + let value_descriptor = + StreamValueDescriptor::new(executed_result, stream.name, Generation::New, stream.position); + exec_ctx.streams.add_stream_value(value_descriptor); exec_ctx.record_call_cid(&*peer_id, &service_result_agg_cid); - Ok(CallResult::executed_stream(service_result_agg_cid, generation)) + Ok(CallResult::executed_stream_stub(service_result_agg_cid)) } CallOutputValue::None => { let value_cid = value_to_json_cid(&*executed_result.result) @@ -86,39 +81,31 @@ pub(crate) fn populate_context_from_data<'i>( value_source: ValueSource, output: &CallOutputValue<'i>, exec_ctx: &mut ExecutionCtx<'i>, -) -> ExecutionResult { +) -> ExecutionResult<()> { match (output, value) { (CallOutputValue::Scalar(scalar), ValueRef::Scalar(cid)) => { let value = exec_ctx.cid_state.resolve_service_value(&cid)?; let result = ServiceResultAggregate::new(value, tetraplet, trace_pos); - let result = ValueAggregate::from_service_result(result, cid.clone()); + let result = ValueAggregate::from_service_result(result, cid); exec_ctx.scalars.set_scalar_value(scalar.name, result)?; - Ok(ValueRef::Scalar(cid)) } (CallOutputValue::Stream(stream), ValueRef::Stream { cid, generation }) => { let value = exec_ctx.cid_state.resolve_service_value(&cid)?; let result = ServiceResultAggregate::new(value, tetraplet, trace_pos); - let result = ValueAggregate::from_service_result(result, cid.clone()); - let value_descriptor = StreamValueDescriptor::new( - result, - stream.name, - value_source, - Generation::Nth(generation), - stream.position, - ); - let resulted_generation = exec_ctx.streams.add_stream_value(value_descriptor)?; - - let result = ValueRef::Stream { - cid, - generation: resulted_generation, - }; - Ok(result) + let result = ValueAggregate::from_service_result(result, cid); + let generation = Generation::from_data(value_source, generation); + let value_descriptor = StreamValueDescriptor::new(result, stream.name, generation, stream.position); + exec_ctx.streams.add_stream_value(value_descriptor); + } + (CallOutputValue::None, ValueRef::Unused(_)) => {} + (_, value) => { + return Err(ExecutionError::Uncatchable( + UncatchableError::CallResultNotCorrespondToInstr(value), + )) } - (CallOutputValue::None, value @ ValueRef::Unused(_)) => Ok(value), - (_, value) => Err(ExecutionError::Uncatchable( - UncatchableError::CallResultNotCorrespondToInstr(value), - )), } + + Ok(()) } /// Writes an executed state of a particle being sent to remote node. diff --git a/air/src/execution_step/instructions/call/prev_result_handler.rs b/air/src/execution_step/instructions/call/prev_result_handler.rs index 6434bf30..7eb96733 100644 --- a/air/src/execution_step/instructions/call/prev_result_handler.rs +++ b/air/src/execution_step/instructions/call/prev_result_handler.rs @@ -101,8 +101,8 @@ pub(super) fn handle_prev_state<'i>( Executed(value) => { use air_interpreter_data::ValueRef; - let resulted_value = populate_context_from_data( - value, + populate_context_from_data( + value.clone(), tetraplet.clone(), met_result.trace_pos, met_result.source, @@ -110,14 +110,14 @@ pub(super) fn handle_prev_state<'i>( exec_ctx, )?; - match &resulted_value { + match &value { ValueRef::Scalar(ref cid) | ValueRef::Stream { ref cid, .. } => { exec_ctx.record_call_cid(&*tetraplet.peer_pk, cid); } ValueRef::Unused(_) => {} } - let call_result = CallResult::Executed(resulted_value); + let call_result = CallResult::Executed(value); trace_ctx.meet_call_end(call_result); Ok(StateDescriptor::executed()) diff --git a/air/src/execution_step/instructions/canon.rs b/air/src/execution_step/instructions/canon.rs index d134a394..8b81aff3 100644 --- a/air/src/execution_step/instructions/canon.rs +++ b/air/src/execution_step/instructions/canon.rs @@ -23,7 +23,6 @@ use super::ExecutionResult; use super::TraceHandler; use crate::execution_step::boxed_value::CanonStream; use crate::execution_step::boxed_value::CanonStreamWithProvenance; -use crate::execution_step::Generation; use crate::log_instruction; use crate::trace_to_exec_err; @@ -86,8 +85,7 @@ fn create_canon_stream_producer<'closure, 'name: 'closure>( .map(Cow::Borrowed) .unwrap_or_default(); - // it's always possible to iter over all generations of a stream - let values = stream.iter(Generation::Last).unwrap().cloned().collect::>(); + let values = stream.iter().cloned().collect::>(); CanonStream::from_values(values, peer_pk) }) } diff --git a/air/src/execution_step/instructions/fold/utils.rs b/air/src/execution_step/instructions/fold/utils.rs index ad4c8d88..2b71a0ee 100644 --- a/air/src/execution_step/instructions/fold/utils.rs +++ b/air/src/execution_step/instructions/fold/utils.rs @@ -16,6 +16,7 @@ use super::*; use crate::execution_step::boxed_value::populate_tetraplet_with_lambda; +use crate::execution_step::boxed_value::IterableValue; use crate::execution_step::CatchableError; use crate::execution_step::PEEK_ALLOWED_ON_NON_EMPTY; use crate::JValue; @@ -31,8 +32,6 @@ use std::rc::Rc; // TODO: refactor this file after switching to boxed value -pub(crate) type IterableValue = Box Iterable<'ctx, Item = IterableItem<'ctx>>>; - pub(crate) enum FoldIterableScalar { Empty, ScalarBased(IterableValue), @@ -95,28 +94,6 @@ pub(crate) fn create_canon_stream_iterable_value<'ctx>( Ok(FoldIterableScalar::ScalarBased(iterable)) } -/// Constructs iterable value for given stream iterable. -pub(crate) fn construct_stream_iterable_values( - stream: &Stream, - start: Generation, - end: Generation, -) -> Vec { - let stream_iter = match stream.slice_iter(start, end) { - Some(stream_iter) => stream_iter, - None => return vec![], - }; - - stream_iter - .filter(|iterable| !iterable.is_empty()) - .map(|iterable| { - let call_results = iterable.to_vec(); - let foldable = IterableVecResolvedCall::init(call_results); - let foldable: IterableValue = Box::new(foldable); - foldable - }) - .collect::>() -} - /// Constructs iterable value from resolved call result. fn from_value(call_result: ValueAggregate, variable_name: &str) -> ExecutionResult { let len = match call_result.get_result().deref() { diff --git a/air/src/execution_step/instructions/fold_scalar.rs b/air/src/execution_step/instructions/fold_scalar.rs index 57451843..fdf9bfad 100644 --- a/air/src/execution_step/instructions/fold_scalar.rs +++ b/air/src/execution_step/instructions/fold_scalar.rs @@ -19,6 +19,7 @@ use super::ExecutableInstruction; use super::ExecutionCtx; use super::ExecutionResult; use super::TraceHandler; +use crate::execution_step::boxed_value::IterableValue; use crate::execution_step::Joinable; use crate::joinable; use crate::log_instruction; diff --git a/air/src/execution_step/instructions/fold_stream.rs b/air/src/execution_step/instructions/fold_stream.rs index a9eeda25..c63dba54 100644 --- a/air/src/execution_step/instructions/fold_stream.rs +++ b/air/src/execution_step/instructions/fold_stream.rs @@ -15,10 +15,8 @@ */ pub(super) mod completeness_updater; -mod stream_cursor; pub(super) mod stream_execute_helpers; -use super::fold::*; use super::ExecutableInstruction; use super::ExecutionCtx; use super::ExecutionResult; diff --git a/air/src/execution_step/instructions/fold_stream/stream_cursor.rs b/air/src/execution_step/instructions/fold_stream/stream_cursor.rs deleted file mode 100644 index ae288881..00000000 --- a/air/src/execution_step/instructions/fold_stream/stream_cursor.rs +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2022 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use air_interpreter_data::GenerationIdx; - -use super::construct_stream_iterable_values; -use crate::execution_step::boxed_value::Generation; -use crate::execution_step::boxed_value::Stream; -use crate::execution_step::instructions::fold::IterableValue; - -pub(super) struct StreamCursor { - last_seen_generation: GenerationIdx, -} - -impl StreamCursor { - pub(super) fn new() -> Self { - Self { - last_seen_generation: GenerationIdx::from(0), - } - } - - pub(super) fn construct_iterables(&mut self, stream: &Stream) -> Vec { - let iterables = - construct_stream_iterable_values(stream, Generation::Nth(self.last_seen_generation), Generation::Last); - self.last_seen_generation = stream.last_non_empty_generation(); - - iterables - } -} diff --git a/air/src/execution_step/instructions/fold_stream/stream_execute_helpers.rs b/air/src/execution_step/instructions/fold_stream/stream_execute_helpers.rs index 99a1932d..3b220ed5 100644 --- a/air/src/execution_step/instructions/fold_stream/stream_execute_helpers.rs +++ b/air/src/execution_step/instructions/fold_stream/stream_execute_helpers.rs @@ -15,13 +15,14 @@ */ use super::completeness_updater::FoldGenerationObserver; -use super::stream_cursor::StreamCursor; use super::ExecutionCtx; use super::ExecutionResult; use super::TraceHandler; +use crate::execution_step::boxed_value::IterableValue; +use crate::execution_step::boxed_value::RecursiveCursorState; +use crate::execution_step::boxed_value::RecursiveStreamCursor; use crate::execution_step::boxed_value::Stream; use crate::execution_step::instructions::fold::IterableType; -use crate::execution_step::instructions::fold::IterableValue; use crate::execution_step::instructions::fold_scalar::fold; use crate::trace_to_exec_err; @@ -29,14 +30,14 @@ use air_parser::ast::Instruction; use std::rc::Rc; -struct FoldStreamLikeIngredients<'i> { +struct FoldStreamIngredients<'i> { iterable_name: &'i str, instruction: Rc>, last_instruction: Option>>, fold_id: u32, } -impl<'i> FoldStreamLikeIngredients<'i> { +impl<'i> FoldStreamIngredients<'i> { fn new( iterable_name: &'i str, instruction: Rc>, @@ -65,18 +66,16 @@ pub(crate) fn execute_with_stream<'i>( trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), fold_to_string)?; - let mut stream_cursor = StreamCursor::new(); - let mut stream_iterable = stream_cursor.construct_iterables(get_mut_stream(exec_ctx)); + let mut recursive_stream = RecursiveStreamCursor::new(); + let mut cursor_state = recursive_stream.met_fold_start(get_mut_stream(exec_ctx)); let mut observer = FoldGenerationObserver::new(); + // this cycle manages recursive streams - while !stream_iterable.is_empty() { - // add a new generation to made all consequence "new" (meaning that they are just executed on this peer) - // write operation to this stream to write to this new generation - add_new_generation_if_non_empty(get_mut_stream(exec_ctx)); + while let RecursiveCursorState::Continue(iterables) = cursor_state { let ingredients = - FoldStreamLikeIngredients::new(iterable_name, instruction.clone(), last_instruction.clone(), fold_id); + FoldStreamIngredients::new(iterable_name, instruction.clone(), last_instruction.clone(), fold_id); execute_iterations( - stream_iterable, + iterables, fold_to_string, ingredients, &mut observer, @@ -84,11 +83,7 @@ pub(crate) fn execute_with_stream<'i>( trace_ctx, )?; - // it's needed to get stream again, because RefCell allows only one mutable borrowing at time, - // and likely that stream could be mutably borrowed in execute_iterations - let stream = remove_new_generation_if_non_empty(get_mut_stream(exec_ctx)); - - stream_iterable = stream_cursor.construct_iterables(stream) + cursor_state = recursive_stream.met_iteration_end(get_mut_stream(exec_ctx)); } observer.update_completeness(exec_ctx); @@ -104,7 +99,7 @@ pub(crate) fn execute_with_stream<'i>( fn execute_iterations<'i>( iterables: Vec, fold_to_string: &impl ToString, - ingredients: FoldStreamLikeIngredients<'i>, + ingredients: FoldStreamIngredients<'i>, generation_observer: &mut FoldGenerationObserver, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler, @@ -139,17 +134,6 @@ fn execute_iterations<'i>( Ok(()) } -/// Safety: this function should be called iff stream is present in context -pub(super) fn add_new_generation_if_non_empty(stream: &mut Stream) { - stream.add_new_generation_if_non_empty(); -} - -/// Safety: this function should be called iff stream is present in context -pub(super) fn remove_new_generation_if_non_empty(stream: &mut Stream) -> &Stream { - stream.remove_last_generation_if_empty(); - stream -} - /// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be /// not deterministic. pub(super) fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> { diff --git a/air/src/execution_step/instructions/new.rs b/air/src/execution_step/instructions/new.rs index 86f8fcd5..3755b27a 100644 --- a/air/src/execution_step/instructions/new.rs +++ b/air/src/execution_step/instructions/new.rs @@ -22,8 +22,6 @@ use crate::log_instruction; use air_parser::ast::New; use air_parser::ast::NewArgument; -use std::convert::TryFrom; - impl<'i> super::ExecutableInstruction<'i> for New<'i> { fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { log_instruction!(new, exec_ctx, trace_ctx); @@ -51,18 +49,8 @@ impl<'i> super::ExecutableInstruction<'i> for New<'i> { fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) { let position = new.span.left; match &new.argument { - NewArgument::Stream(stream) => { - let iteration = exec_ctx.tracker.new_tracker.get_iteration(position); - let iteration = usize::try_from(iteration).unwrap(); - exec_ctx.streams.meet_scope_start(stream.name, new.span, iteration); - } - NewArgument::StreamMap(stream_map) => { - let iteration = exec_ctx.tracker.new_tracker.get_iteration(position); - let iteration = usize::try_from(iteration).unwrap(); - exec_ctx - .stream_maps - .meet_scope_start(stream_map.name, new.span, iteration); - } + NewArgument::Stream(stream) => exec_ctx.streams.meet_scope_start(stream.name, new.span), + NewArgument::StreamMap(stream_map) => exec_ctx.stream_maps.meet_scope_start(stream_map.name, new.span), NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_start_scalar(scalar.name.to_string()), NewArgument::CanonStream(canon_stream) => exec_ctx .scalars @@ -73,16 +61,11 @@ fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) { } fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { - let position = new.span.left; match &new.argument { - NewArgument::Stream(stream) => exec_ctx - .streams - .meet_scope_end(stream.name.to_string(), position, trace_ctx), - NewArgument::StreamMap(stream_map) => { - exec_ctx - .stream_maps - .meet_scope_end(stream_map.name.to_string(), position, trace_ctx) - } + NewArgument::Stream(stream) => exec_ctx.streams.meet_scope_end(stream.name.to_string(), trace_ctx), + NewArgument::StreamMap(stream_map) => exec_ctx + .stream_maps + .meet_scope_end(stream_map.name.to_string(), trace_ctx), NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_end_scalar(scalar.name), NewArgument::CanonStream(canon_stream) => exec_ctx.scalars.meet_new_end_canon_stream(canon_stream.name), } diff --git a/air/src/farewell_step/errors.rs b/air/src/farewell_step/errors.rs index 65a40887..e84f8d1a 100644 --- a/air/src/farewell_step/errors.rs +++ b/air/src/farewell_step/errors.rs @@ -15,15 +15,18 @@ */ use crate::ToErrorCode; -use air_interpreter_interface::CallResults; +use air_interpreter_interface::CallResults; +use fluence_keypair::error::SigningError; +use strum::EnumCount; use strum::IntoEnumIterator; +use strum_macros::EnumCount as EnumCountMacro; use strum_macros::EnumDiscriminants; use strum_macros::EnumIter; use thiserror::Error as ThisError; /// Errors happened during the interpreter farewell step. -#[derive(Debug, EnumDiscriminants, ThisError)] +#[derive(Debug, EnumDiscriminants, EnumCountMacro, ThisError)] #[strum_discriminants(derive(EnumIter))] pub enum FarewellError { /// Call results should be empty at the end of execution thanks to a execution invariant. @@ -39,3 +42,9 @@ impl ToErrorCode for FarewellError { crate::generate_to_error_code!(self, FarewellError, FAREWELL_ERRORS_START_ID) } } + +impl ToErrorCode for SigningError { + fn to_error_code(&self) -> i64 { + crate::utils::FAREWELL_ERRORS_START_ID + FarewellError::COUNT as i64 + } +} diff --git a/air/src/farewell_step/outcome.rs b/air/src/farewell_step/outcome.rs index 39077c4d..beef9e74 100644 --- a/air/src/farewell_step/outcome.rs +++ b/air/src/farewell_step/outcome.rs @@ -25,6 +25,7 @@ use crate::INTERPRETER_SUCCESS; use air_interpreter_data::InterpreterData; use air_interpreter_interface::CallRequests; use air_utils::measure; +use fluence_keypair::error::SigningError; use fluence_keypair::KeyPair; use std::fmt::Debug; @@ -90,26 +91,18 @@ fn populate_outcome_from_contexts( error_message: String, keypair: &KeyPair, ) -> InterpreterOutcome { - let maybe_gens = exec_ctx - .streams - .into_streams_data(&mut trace_handler) - .map_err(execution_error_into_outcome); - let (global_streams, restricted_streams) = match maybe_gens { - Ok(gens) => gens, + match compactify_streams(&mut exec_ctx, &mut trace_handler) { + Ok(()) => {} Err(outcome) => return outcome, }; - let current_signature = exec_ctx - .signature_tracker - .into_signature(&exec_ctx.run_parameters.current_peer_id, keypair) - .expect("siging shouldn't fail"); - let current_pubkey = keypair.public(); - exec_ctx.signature_store.put(current_pubkey.into(), current_signature); + match sign_result(&mut exec_ctx, keypair) { + Ok(()) => {} + Err(outcome) => return outcome, + }; let data = InterpreterData::from_execution_result( trace_handler.into_result_trace(), - global_streams, - restricted_streams, exec_ctx.cid_state.into(), exec_ctx.signature_store, exec_ctx.last_call_request_id, @@ -120,22 +113,46 @@ fn populate_outcome_from_contexts( tracing::Level::TRACE, "serde_json::to_vec(data)" ); + let next_peer_pks = dedup(exec_ctx.next_peer_pks); let call_requests = measure!( serde_json::to_vec(&exec_ctx.call_requests).expect("default serializer shouldn't fail"), tracing::Level::TRACE, "serde_json::to_vec(call_results)", ); - InterpreterOutcome::new(ret_code, error_message, data, next_peer_pks, call_requests) } -// this method is called only if there is an internal error in the interpreter and +fn compactify_streams(exec_ctx: &mut ExecutionCtx<'_>, trace_ctx: &mut TraceHandler) -> Result<(), InterpreterOutcome> { + exec_ctx + .streams + .compactify(trace_ctx) + .and_then(|_| exec_ctx.stream_maps.compactify(trace_ctx)) + .map_err(execution_error_into_outcome) +} + +fn sign_result(exec_ctx: &mut ExecutionCtx<'_>, keypair: &KeyPair) -> Result<(), InterpreterOutcome> { + let current_signature = exec_ctx + .signature_tracker + .into_signature(&exec_ctx.run_parameters.current_peer_id, keypair) + .map_err(signing_error_into_outcome)?; + + let current_pubkey = keypair.public(); + exec_ctx.signature_store.put(current_pubkey.into(), current_signature); + + Ok(()) +} + +// these methods are called only if there is an internal error in the interpreter and // new execution trace was corrupted fn execution_error_into_outcome(error: ExecutionError) -> InterpreterOutcome { InterpreterOutcome::new(error.to_error_code(), error.to_string(), vec![], vec![], vec![]) } +fn signing_error_into_outcome(error: SigningError) -> InterpreterOutcome { + InterpreterOutcome::new(error.to_error_code(), error.to_string(), vec![], vec![], vec![]) +} + /// Deduplicate values in a supplied vector. fn dedup(mut vec: Vec) -> Vec { use std::collections::HashSet; diff --git a/air/src/preparation_step/preparation.rs b/air/src/preparation_step/preparation.rs index b1e1416c..99d03955 100644 --- a/air/src/preparation_step/preparation.rs +++ b/air/src/preparation_step/preparation.rs @@ -54,17 +54,13 @@ pub(crate) fn prepare<'i>( let air: Instruction<'i> = *air_parser::parse(raw_air).map_err(PreparationError::AIRParseError)?; let prev_ingredients = ExecCtxIngredients { - global_streams: prev_data.global_streams, last_call_request_id: prev_data.last_call_request_id, - restricted_streams: prev_data.restricted_streams, cid_info: prev_data.cid_info, signature_store: prev_data.signatures, }; let current_ingredients = ExecCtxIngredients { - global_streams: current_data.global_streams, last_call_request_id: current_data.last_call_request_id, - restricted_streams: current_data.restricted_streams, cid_info: current_data.cid_info, signature_store: current_data.signatures, }; diff --git a/air/tests/test_module/features/data_merging/data_merge.rs b/air/tests/test_module/features/data_merging/data_merge.rs index 48f6d551..2aeed98b 100644 --- a/air/tests/test_module/features/data_merging/data_merge.rs +++ b/air/tests/test_module/features/data_merging/data_merge.rs @@ -305,17 +305,6 @@ fn fold_merge() { ); let data = InterpreterData::try_from_slice(&result_7.data).expect("data should be well-formed"); - let stream_1_generations = data - .global_streams - .get("$stream_1") - .expect("$stream_1 should be present in data"); - let stream_2_generations = data - .global_streams - .get("$stream_2") - .expect("$stream_2 should be present in data"); - - assert_eq!(*stream_1_generations, 8); - assert_eq!(*stream_2_generations, 6); let mut fold_states_count = 0; let mut calls_count = HashMap::new(); diff --git a/air/tests/test_module/features/streams/compactification.rs b/air/tests/test_module/features/streams/compactification.rs new file mode 100644 index 00000000..b2ea8b97 --- /dev/null +++ b/air/tests/test_module/features/streams/compactification.rs @@ -0,0 +1,164 @@ +/* + * Copyright 2023 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use air::ExecutionCidState; +use air_test_framework::AirScriptExecutor; +use air_test_utils::prelude::TestRunParameters; +use air_test_utils::*; + +#[test] +fn global_streams_are_compactified() { + let peer_id = "peer_id"; + let service_result = "service_result"; + let script = format!( + r#" + (seq + (ap 1 $stream) + (call "{peer_id}" ("" "") [] $stream) ; ok = "{service_result}" + ) + "# + ); + + let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(peer_id), &script).unwrap(); + let result = executor.execute_all(peer_id).unwrap(); + let actual_trace = trace_from_result(result.last().unwrap()); + + let mut cid_state = ExecutionCidState::new(); + let expected_trace = vec![ + executed_state::ap(0), + stream_tracked!( + service_result, + 1, + cid_state, + peer = peer_id, + service = "..0", + function = "" + ), + ]; + + assert_eq!(&actual_trace, &expected_trace); +} + +#[test] +fn global_stream_maps_are_compactified() { + let peer_id = "peer_id"; + let service_result = "service_result"; + let script = format!( + r#" + (seq + (ap (1 1) %stream_map) + (seq + (call "{peer_id}" ("" "") [] $stream) ; ok = "{service_result}" + (ap (1 1) %stream_map) + ) + ) + "# + ); + + let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(peer_id), &script).unwrap(); + let result = executor.execute_all(peer_id).unwrap(); + let actual_trace = trace_from_result(result.last().unwrap()); + + let mut cid_state = ExecutionCidState::new(); + let expected_trace = vec![ + executed_state::ap(0), + stream_tracked!( + service_result, + 0, + cid_state, + peer = peer_id, + service = "..0", + function = "" + ), + executed_state::ap(1), + ]; + + assert_eq!(&actual_trace, &expected_trace); +} + +#[test] +fn local_streams_are_compactified() { + let peer_id = "peer_id"; + let service_result = "service_result"; + let script = format!( + r#" + (new $stream + (seq + (ap 1 $stream) + (call "{peer_id}" ("" "") [] $stream) ; ok = "{service_result}" + ) + ) + "# + ); + + let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(peer_id), &script).unwrap(); + let result = executor.execute_all(peer_id).unwrap(); + let actual_trace = trace_from_result(result.last().unwrap()); + + let mut cid_state = ExecutionCidState::new(); + let expected_trace = vec![ + executed_state::ap(0), + stream_tracked!( + service_result, + 1, + cid_state, + peer = peer_id, + service = "..0", + function = "" + ), + ]; + + assert_eq!(actual_trace, expected_trace); +} + +#[test] +fn local_stream_maps_are_compactified() { + let peer_id = "peer_id"; + let service_result = "service_result"; + let script = format!( + r#" + (new $stream + (seq + (ap (1 1) %stream_map) + (seq + (call "{peer_id}" ("" "") [] $stream) ; ok = "{service_result}" + (ap (1 1) %stream_map) + ) + ) + ) + "# + ); + + let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(peer_id), &script).unwrap(); + let result = executor.execute_all(peer_id).unwrap(); + let actual_trace = trace_from_result(result.last().unwrap()); + + let mut cid_state = ExecutionCidState::new(); + let expected_trace = vec![ + executed_state::ap(0), + stream_tracked!( + service_result, + 0, + cid_state, + peer = peer_id, + service = "..0", + function = "" + ), + executed_state::ap(1), + ]; + + assert_eq!(actual_trace, expected_trace); +} diff --git a/air/tests/test_module/features/streams/mod.rs b/air/tests/test_module/features/streams/mod.rs index 79829d52..84720a34 100644 --- a/air/tests/test_module/features/streams/mod.rs +++ b/air/tests/test_module/features/streams/mod.rs @@ -15,6 +15,7 @@ */ mod ap_with_fold; +mod compactification; mod merging; mod recursive_streams; mod streams; diff --git a/air/tests/test_module/features/streams/recursive_streams.rs b/air/tests/test_module/features/streams/recursive_streams.rs index f807d0bf..ccb55f3a 100644 --- a/air/tests/test_module/features/streams/recursive_streams.rs +++ b/air/tests/test_module/features/streams/recursive_streams.rs @@ -131,11 +131,12 @@ fn recursive_stream_many_iterations() { executed_state::subtrace_lore(1, subtrace_desc(5, 2), subtrace_desc(7, 0)), executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(11, 0)), executed_state::subtrace_lore(6, subtrace_desc(9, 2), subtrace_desc(11, 0)), - executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(15, 0)), + executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(13, 0)), executed_state::subtrace_lore(10, subtrace_desc(13, 2), subtrace_desc(15, 0)), - executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(19, 0)), + executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(17, 0)), executed_state::subtrace_lore(14, subtrace_desc(17, 2), subtrace_desc(19, 0)), executed_state::subtrace_lore(16, subtrace_desc(19, 1), subtrace_desc(20, 0)), + executed_state::subtrace_lore(18, subtrace_desc(20, 1), subtrace_desc(21, 0)), ]); let expected_fold_v2 = executed_state::fold(vec![ @@ -143,15 +144,18 @@ fn recursive_stream_many_iterations() { executed_state::subtrace_lore(1, subtrace_desc(5, 2), subtrace_desc(7, 0)), executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(11, 0)), executed_state::subtrace_lore(6, subtrace_desc(9, 2), subtrace_desc(11, 0)), - executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(15, 0)), + executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(13, 0)), executed_state::subtrace_lore(10, subtrace_desc(13, 2), subtrace_desc(15, 0)), - executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(18, 0)), + executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(17, 0)), executed_state::subtrace_lore(14, subtrace_desc(17, 1), subtrace_desc(18, 0)), executed_state::subtrace_lore(16, subtrace_desc(18, 2), subtrace_desc(20, 0)), executed_state::subtrace_lore(19, subtrace_desc(20, 1), subtrace_desc(21, 0)), ]); let test_passed = (actual_fold == &expected_fold_v1) || (actual_fold == &expected_fold_v2); + if !test_passed { + print_trace(&result, ""); + } assert!(test_passed); let actual_last_state = actual_trace.last().unwrap(); @@ -378,7 +382,7 @@ fn recursive_stream_inner_fold() { "# ); - let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); + let result = call_vm!(vm_1, <_>::default(), &script, "", ""); let result = checked_call_vm!(vm_2, <_>::default(), script, "", result.data); let actual_trace = trace_from_result(&result); diff --git a/air/tests/test_module/instructions/new.rs b/air/tests/test_module/instructions/new.rs index 43d8f9df..bf72e468 100644 --- a/air/tests/test_module/instructions/new.rs +++ b/air/tests/test_module/instructions/new.rs @@ -15,7 +15,6 @@ */ use air_interpreter_data::ExecutionTrace; -use air_parser::AirPos; use air_test_utils::prelude::*; use pretty_assertions::assert_eq; @@ -92,15 +91,6 @@ fn new_with_global_streams_seq() { scalar!(json!([1, 2]), peer = local_vm_peer_id_1, args = [json!([1, 2])]), ]; assert_eq!(actual_trace, expected_trace); - - let data = data_from_result(&vm_2_result); - let actual_restricted_streams = data.restricted_streams; - let expected_restricted_streams = maplit::hashmap! { - "$stream".to_string() => maplit::hashmap! { - AirPos::from(282) => vec![1.into(), 1.into()] - } - }; - assert_eq!(actual_restricted_streams, expected_restricted_streams); } #[test] @@ -368,15 +358,6 @@ fn new_in_fold_with_ap() { scalar!(json!(["none"]), peer = vm_peer_id, args = [json!(["none"])]), ]; assert_eq!(actual_trace, expected_trace); - - let data = data_from_result(&result); - let actual_restricted_streams = data.restricted_streams; - let expected_restricted_streams = maplit::hashmap! { - "$s1".to_string() => maplit::hashmap! { - AirPos::from(146) => vec![1.into(), 1.into(), 1.into(), 1.into(), 1.into()] - } - }; - assert_eq!(actual_restricted_streams, expected_restricted_streams); } #[test] @@ -421,25 +402,6 @@ fn new_with_streams_with_errors() { ), ]; assert_eq!(actual_trace, expected_trace); - - let data = data_from_result(&result); - - let actual_restricted_streams = data.restricted_streams; - let expected_restricted_streams = maplit::hashmap! { - "$restricted_stream_2".to_string() => maplit::hashmap! { - AirPos::from(216) => vec![1.into()] - }, - "$restricted_stream_1".to_string() => maplit::hashmap! { - AirPos::from(141) => vec![0.into()] - } - }; - assert_eq!(actual_restricted_streams, expected_restricted_streams); - - let actual_global_streams = data.global_streams; - let expected_global_streams = maplit::hashmap! { - "$global_stream".to_string() => 1.into(), - }; - assert_eq!(actual_global_streams, expected_global_streams); } #[test] diff --git a/air/tests/test_module/issues/issue_173.rs b/air/tests/test_module/issues/issue_173.rs index 3133e010..def19dfd 100644 --- a/air/tests/test_module/issues/issue_173.rs +++ b/air/tests/test_module/issues/issue_173.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use air_parser::AirPos; use air_test_utils::prelude::*; #[test] @@ -92,13 +91,4 @@ fn issue_173() { scalar!(json!([1, 2])), ]; assert_eq!(actual_trace, expected_trace); - - let data = data_from_result(&vm_2_result); - let actual_restricted_streams = data.restricted_streams; - let expected_restricted_streams = maplit::hashmap! { - "$stream".to_string() => maplit::hashmap! { - AirPos::from(282) => vec![1.into(), 1.into()] - } - }; - assert_eq!(actual_restricted_streams, expected_restricted_streams); } diff --git a/air/tests/test_module/issues/issue_642.rs b/air/tests/test_module/issues/issue_642.rs index a2fbbe45..19ec1e1b 100644 --- a/air/tests/test_module/issues/issue_642.rs +++ b/air/tests/test_module/issues/issue_642.rs @@ -17,7 +17,6 @@ use air_test_utils::prelude::*; #[test] -#[ignore] // will be resolved in github.com/fluencelabs/aquavm/pull/621 fn issue_642() { let peer_id_1 = "peer_id_1"; let peer_id_2 = "peer_id_2"; diff --git a/air/tests/test_module/issues/issue_644.rs b/air/tests/test_module/issues/issue_644.rs index cd3edf3b..384a8fa2 100644 --- a/air/tests/test_module/issues/issue_644.rs +++ b/air/tests/test_module/issues/issue_644.rs @@ -17,7 +17,6 @@ use air_test_utils::prelude::*; #[test] -#[ignore] // will be resolved in github.com/fluencelabs/aquavm/pull/621 fn issue_644() { let peer_id_1 = "peer_id_1"; let peer_id_2 = "peer_id_2"; diff --git a/air/tests/test_module/negative_tests/uncatchable_trace_related.rs b/air/tests/test_module/negative_tests/uncatchable_trace_related.rs index a6ba74a6..28e37596 100644 --- a/air/tests/test_module/negative_tests/uncatchable_trace_related.rs +++ b/air/tests/test_module/negative_tests/uncatchable_trace_related.rs @@ -316,8 +316,6 @@ fn invalid_dst_generations() { <_>::default(), <_>::default(), <_>::default(), - <_>::default(), - <_>::default(), semver::Version::new(1, 1, 1), ); let mut data_value = serde_json::to_value(&empty_data).unwrap(); diff --git a/crates/air-lib/interpreter-data/src/executed_state/impls.rs b/crates/air-lib/interpreter-data/src/executed_state/impls.rs index da5f2fc2..6833dc81 100644 --- a/crates/air-lib/interpreter-data/src/executed_state/impls.rs +++ b/crates/air-lib/interpreter-data/src/executed_state/impls.rs @@ -47,10 +47,8 @@ impl CallResult { Self::executed_service_result(ValueRef::Scalar(service_result_agg_cid)) } - pub fn executed_stream( - cid: Rc>, - generation: GenerationIdx, - ) -> CallResult { + pub fn executed_stream_stub(cid: Rc>) -> CallResult { + let generation = GenerationIdx::stub(); let value = ValueRef::Stream { cid, generation }; CallResult::Executed(value) } @@ -102,6 +100,12 @@ impl ApResult { res_generations: vec![res_generation], } } + + pub fn stub() -> Self { + Self { + res_generations: vec![GenerationIdx::stub()], + } + } } impl CanonResult { diff --git a/crates/air-lib/interpreter-data/src/generation_idx.rs b/crates/air-lib/interpreter-data/src/generation_idx.rs index d76896ab..efd1de3b 100644 --- a/crates/air-lib/interpreter-data/src/generation_idx.rs +++ b/crates/air-lib/interpreter-data/src/generation_idx.rs @@ -46,6 +46,11 @@ impl GenerationIdx { // TODO: check for overflow Self::from(self.0 as usize - 1) } + + pub fn stub() -> Self { + const GENERATION_STUB: GenerationIdxType = 0xCAFEBABE; + Self(GENERATION_STUB) + } } impl PartialOrd for GenerationIdx { diff --git a/crates/air-lib/interpreter-data/src/interpreter_data.rs b/crates/air-lib/interpreter-data/src/interpreter_data.rs index 8c4068f6..5cb462d7 100644 --- a/crates/air-lib/interpreter-data/src/interpreter_data.rs +++ b/crates/air-lib/interpreter-data/src/interpreter_data.rs @@ -14,8 +14,6 @@ * limitations under the License. */ -use super::GlobalStreamGens; -use super::RestrictedStreamGens; use crate::cid_store::CidStore; use crate::CanonCidAggregate; use crate::CanonResultCidAggregate; @@ -43,18 +41,6 @@ pub struct InterpreterData { /// Trace of AIR execution, which contains executed call, par, fold, and ap states. pub trace: ExecutionTrace, - /// Contains maximum generation for each global stream. This info will be used while merging - /// values in streams. This field is also needed for backward compatibility with - /// <= 0.2.1 versions. - #[serde(rename = "streams")] // for compatibility with versions <= 0.2.1 - pub global_streams: GlobalStreamGens, - - /// Contains maximum generation for each private stream. This info will be used while merging - /// values in streams. - #[serde(default)] - #[serde(rename = "r_streams")] - pub restricted_streams: RestrictedStreamGens, - /// Last exposed to a peer call request id. All next call request ids will be bigger than this. #[serde(default)] #[serde(rename = "lcid")] @@ -87,9 +73,7 @@ impl InterpreterData { Self { versions, trace: ExecutionTrace::default(), - global_streams: GlobalStreamGens::new(), last_call_request_id: 0, - restricted_streams: RestrictedStreamGens::new(), cid_info: <_>::default(), signatures: <_>::default(), } @@ -98,8 +82,6 @@ impl InterpreterData { #[allow(clippy::too_many_arguments)] pub fn from_execution_result( trace: ExecutionTrace, - streams: GlobalStreamGens, - restricted_streams: RestrictedStreamGens, cid_info: CidInfo, signatures: SignatureStore, last_call_request_id: u32, @@ -110,9 +92,7 @@ impl InterpreterData { Self { versions, trace, - global_streams: streams, last_call_request_id, - restricted_streams, cid_info, signatures, } diff --git a/crates/air-lib/interpreter-data/src/lib.rs b/crates/air-lib/interpreter-data/src/lib.rs index 8cd2c48d..07f9faa0 100644 --- a/crates/air-lib/interpreter-data/src/lib.rs +++ b/crates/air-lib/interpreter-data/src/lib.rs @@ -30,7 +30,6 @@ mod cid_store; mod executed_state; mod generation_idx; mod interpreter_data; -mod stream_generations; mod trace; mod trace_pos; @@ -38,7 +37,6 @@ pub use cid_store::*; pub use executed_state::*; pub use generation_idx::*; pub use interpreter_data::*; -pub use stream_generations::*; pub use trace::*; pub use trace_pos::*; diff --git a/crates/air-lib/interpreter-data/src/stream_generations.rs b/crates/air-lib/interpreter-data/src/stream_generations.rs deleted file mode 100644 index 35848f1e..00000000 --- a/crates/air-lib/interpreter-data/src/stream_generations.rs +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use crate::GenerationIdx; -use air_parser::AirPos; - -use std::collections::HashMap; - -/// Mapping from a stream name to it's generation count. -/// Similar to pi-calculus non-restricted names/channels. -pub type GlobalStreamGens = HashMap; -pub type GlobalStreamMapGens = GlobalStreamGens; - -/// Mapping from a stream name to -/// position of a new instruction in a script that creates a scope for a stream -/// to vector where each position represents a corresponding iteration. -/// -/// Vec is needed because a new instruction could be placed into a fold instruction, -/// so it could be met several times during script execution. This field anchors iteration -/// where it was met. -/// Similar to pi-calculus restricted names/channels. -pub type RestrictedStreamGens = HashMap>>; -pub type RestrictedStreamMapGens = RestrictedStreamGens; diff --git a/crates/air-lib/test-utils/src/lib.rs b/crates/air-lib/test-utils/src/lib.rs index 2e069e8d..0676816f 100644 --- a/crates/air-lib/test-utils/src/lib.rs +++ b/crates/air-lib/test-utils/src/lib.rs @@ -93,8 +93,6 @@ pub fn raw_data_from_trace( ) -> Vec { let data = InterpreterData::from_execution_result( trace.into(), - <_>::default(), - <_>::default(), cid_state.into(), <_>::default(), 0, @@ -109,8 +107,6 @@ pub fn raw_data_from_trace_with_canon( ) -> Vec { let data = InterpreterData::from_execution_result( trace.into(), - <_>::default(), - <_>::default(), CidInfo { value_store: cid_state.value_tracker.into(), tetraplet_store: cid_state.tetraplet_tracker.into(), diff --git a/crates/air-lib/trace-handler/src/errors.rs b/crates/air-lib/trace-handler/src/errors.rs index e662c409..09759d10 100644 --- a/crates/air-lib/trace-handler/src/errors.rs +++ b/crates/air-lib/trace-handler/src/errors.rs @@ -40,7 +40,7 @@ pub enum TraceHandlerError { #[derive(ThisError, Debug)] #[allow(clippy::enum_variant_names)] -pub enum GenerationCompatificationError { +pub enum GenerationCompactificationError { #[error("trying to change generation of an invalid trace position {0}")] TracePosPointsToNowhere(TracePos), @@ -50,13 +50,13 @@ pub enum GenerationCompatificationError { TracePosPointsToInvalidState { position: TracePos, state: ExecutedState }, } -impl GenerationCompatificationError { +impl GenerationCompactificationError { pub fn points_to_nowhere(position: TracePos) -> Self { - GenerationCompatificationError::TracePosPointsToNowhere(position) + GenerationCompactificationError::TracePosPointsToNowhere(position) } pub fn points_to_invalid_state(position: TracePos, state: ExecutedState) -> Self { - GenerationCompatificationError::TracePosPointsToInvalidState { position, state } + GenerationCompactificationError::TracePosPointsToInvalidState { position, state } } } diff --git a/crates/air-lib/trace-handler/src/handler.rs b/crates/air-lib/trace-handler/src/handler.rs index e5f6ab59..8f7262ed 100644 --- a/crates/air-lib/trace-handler/src/handler.rs +++ b/crates/air-lib/trace-handler/src/handler.rs @@ -64,12 +64,12 @@ impl TraceHandler { &mut self, trace_pos: TracePos, generation: GenerationIdx, - ) -> Result<(), GenerationCompatificationError> { + ) -> Result<(), GenerationCompactificationError> { let state = self .data_keeper .result_trace .get_mut(trace_pos) - .ok_or_else(|| GenerationCompatificationError::points_to_nowhere(trace_pos))?; + .ok_or_else(|| GenerationCompactificationError::points_to_nowhere(trace_pos))?; match state { ExecutedState::Ap(ap_result) => ap_result.res_generations = vec![generation], @@ -78,7 +78,7 @@ impl TraceHandler { .. })) => *call_generation = generation, state => { - return Err(GenerationCompatificationError::points_to_invalid_state( + return Err(GenerationCompactificationError::points_to_invalid_state( trace_pos, state.clone(), )) diff --git a/crates/air-lib/trace-handler/src/lib.rs b/crates/air-lib/trace-handler/src/lib.rs index 3e7f99cd..c587f28a 100644 --- a/crates/air-lib/trace-handler/src/lib.rs +++ b/crates/air-lib/trace-handler/src/lib.rs @@ -33,7 +33,7 @@ pub mod merger; mod state_automata; pub use data_keeper::KeeperError; -pub use errors::GenerationCompatificationError; +pub use errors::GenerationCompactificationError; pub use errors::IntConversionError; pub use errors::TraceHandlerError; pub use handler::TraceHandler;