feat(execution-engine)!: refactor streams [fixes VM-255] (#621)

Refactored stream and stream generation a lot, it introduces the following changes:
- no generation in data anymore, AquaVM relies on generation inside data to stay valid and places value accordingly to it
- stream is internally divided into previous, current, and new values, before, it was one array for all of them
- recursive streams cursors are refactored and rely on new generation values instead
- the Generation enum was refactored and now contains the source of the generation
This commit is contained in:
Mike Voronov 2023-08-03 21:07:57 +03:00 committed by GitHub
parent 3843da5cb9
commit eca52b7191
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 1536 additions and 1189 deletions

7
Cargo.lock generated
View File

@ -333,6 +333,7 @@ dependencies = [
"strum_macros",
"thiserror",
"tracing",
"typed-index-collections",
]
[[package]]
@ -3639,6 +3640,12 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "typed-index-collections"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "183496e014253d15abbe6235677b1392dba2d40524c88938991226baa38ac7c4"
[[package]]
name = "typenum"
version = "1.16.0"

View File

@ -36,6 +36,7 @@ serde_json = "1.0.95"
concat-idents = "1.1.4"
maplit = "1.0.2"
non-empty-vec = "0.2.3"
typed-index-collections = "3.1.0"
log = "0.4.17"
once_cell = "1.17.1"
thiserror = "1.0.40"

View File

@ -17,6 +17,7 @@
use super::Iterable;
use super::IterableItem;
use crate::execution_step::boxed_value::CanonStream;
use crate::execution_step::boxed_value::TracePosOperate;
use crate::foldable_next;
use crate::foldable_prev;

View File

@ -22,6 +22,8 @@ mod stream;
mod stream_map;
mod utils;
pub type Stream = stream::Stream<ValueAggregate>;
pub(crate) use canon_stream::*;
pub(crate) use iterable::*;
pub(crate) use jvaluable::*;
@ -29,10 +31,15 @@ pub(crate) use scalar::CanonResultAggregate;
pub(crate) use scalar::LiteralAggregate;
pub(crate) use scalar::ScalarRef;
pub(crate) use scalar::ServiceResultAggregate;
pub(crate) use scalar::TracePosOperate;
pub(crate) use scalar::ValueAggregate;
pub(crate) use stream::Generation;
pub(crate) use stream::Stream;
pub(crate) use stream::IterableValue;
pub(crate) use stream::RecursiveCursorState;
pub(crate) use stream::RecursiveStreamCursor;
pub(crate) use stream_map::StreamMap;
pub(crate) use utils::populate_tetraplet_with_lambda;
use super::ExecutionResult;

View File

@ -55,6 +55,7 @@ pub enum ValueAggregate {
provenance_cid: Rc<CID<CanonResultCidAggregate>>,
},
}
pub(crate) enum ScalarRef<'i> {
Value(&'i ValueAggregate),
IterableValue(&'i FoldState<'i>),
@ -176,7 +177,29 @@ impl ValueAggregate {
}
}
pub fn get_trace_pos(&self) -> TracePos {
pub fn get_provenance(&self) -> Provenance {
match self {
ValueAggregate::Literal(_) => Provenance::Literal,
ValueAggregate::ServiceResult {
result: _,
provenance_cid: cid,
} => Provenance::ServiceResult { cid: cid.clone() },
ValueAggregate::Canon {
result: _,
provenance_cid: cid,
} => Provenance::Canon { cid: cid.clone() },
}
}
}
pub trait TracePosOperate {
fn get_trace_pos(&self) -> TracePos;
fn set_trace_pos(&mut self, pos: TracePos);
}
impl TracePosOperate for ValueAggregate {
fn get_trace_pos(&self) -> TracePos {
match self {
ValueAggregate::Literal(literal) => literal.trace_pos,
ValueAggregate::ServiceResult {
@ -190,7 +213,7 @@ impl ValueAggregate {
}
}
pub fn set_trace_pos(&mut self, trace_pos: TracePos) {
fn set_trace_pos(&mut self, trace_pos: TracePos) {
let trace_pos_ref = match self {
ValueAggregate::Literal(literal) => &mut literal.trace_pos,
ValueAggregate::ServiceResult {
@ -204,20 +227,6 @@ impl ValueAggregate {
};
*trace_pos_ref = trace_pos;
}
pub fn get_provenance(&self) -> Provenance {
match self {
ValueAggregate::Literal(_) => Provenance::Literal,
ValueAggregate::ServiceResult {
result: _,
provenance_cid: cid,
} => Provenance::ServiceResult { cid: cid.clone() },
ValueAggregate::Canon {
result: _,
provenance_cid: cid,
} => Provenance::Canon { cid: cid.clone() },
}
}
}
use std::fmt;

View File

@ -1,400 +0,0 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionResult;
use super::ValueAggregate;
use crate::ExecutionError;
use crate::UncatchableError;
use air_interpreter_data::GenerationIdx;
use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler;
/// Streams are CRDT-like append only data structures. They are guaranteed to have the same order
/// of values on each peer.
#[derive(Debug, Default, Clone)]
pub struct Stream {
/// The first Vec represents generations, the second values in a generation. Generation is a set
/// of values that interpreter obtained from one particle. It means that number of generation on
/// a peer is equal to number of the interpreter runs in context of one particle. And each set of
/// obtained values from a current_data that were not present in prev_data becomes a new generation.
values: Vec<Vec<ValueAggregate>>,
/// Count of values from previous data.
previous_gens_count: usize,
}
impl Stream {
pub(crate) fn from_generations_count(previous_count: GenerationIdx, current_count: GenerationIdx) -> Self {
let last_generation_count = GenerationIdx::from(1);
// TODO: bubble up an overflow error instead of expect
let overall_count = previous_count
.checked_add(current_count)
.and_then(|value| value.checked_add(last_generation_count))
.expect("it shouldn't overflow");
Self {
values: vec![vec![]; overall_count.into()],
previous_gens_count: previous_count.into(),
}
}
// streams created with this ctor assumed to have only one generation,
// for streams that have values in
pub(crate) fn from_value(value: ValueAggregate) -> Self {
Self {
values: vec![vec![value]],
previous_gens_count: 0,
}
}
// if generation is None, value would be added to the last generation, otherwise it would
// be added to given generation
pub(crate) fn add_value(
&mut self,
value: ValueAggregate,
generation: Generation,
source: ValueSource,
) -> ExecutionResult<GenerationIdx> {
let generation_number = match (generation, source) {
(Generation::Last, _) => self.values.len() - 1,
(Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen.into(),
(Generation::Nth(current_gen), ValueSource::CurrentData) => {
self.previous_gens_count + usize::from(current_gen)
}
};
if generation_number >= self.values.len() {
return Err(UncatchableError::StreamDontHaveSuchGeneration {
stream: self.clone(),
generation,
}
.into());
}
self.values[generation_number].push(value);
Ok(generation_number.into())
}
// TODO: remove this function
pub(crate) fn generations_count(&self) -> usize {
// the last generation could be empty due to the logic of from_generations_count ctor
if self.values.last().unwrap().is_empty() {
self.values.len() - 1
} else {
self.values.len()
}
}
pub(crate) fn last_non_empty_generation(&self) -> GenerationIdx {
self.values
.iter()
.rposition(|generation| !generation.is_empty())
// it's safe to add + 1 here, because this function is called when
// there is a new state was added with add_new_generation_if_non_empty
.map(|non_empty_gens| non_empty_gens + 1)
.unwrap_or_else(|| self.generations_count())
.into()
}
/// Add a new empty generation if the latest isn't empty.
pub(crate) fn add_new_generation_if_non_empty(&mut self) -> bool {
let should_add_generation = match self.values.last() {
Some(last) => !last.is_empty(),
None => true,
};
if should_add_generation {
self.values.push(vec![]);
}
should_add_generation
}
/// Remove a last generation if it's empty.
pub(crate) fn remove_last_generation_if_empty(&mut self) -> bool {
let should_remove_generation = match self.values.last() {
Some(last) => last.is_empty(),
None => false,
};
if should_remove_generation {
self.values.pop();
}
should_remove_generation
}
pub(crate) fn generation_elements_count(&self, generation: Generation) -> Option<usize> {
match generation {
Generation::Nth(generation) if generation > self.generations_count() => None,
Generation::Nth(generation) => {
let elements_count = generation.into();
Some(self.values.iter().take(elements_count).map(|v| v.len()).sum())
}
Generation::Last => Some(self.values.iter().map(|v| v.len()).sum()),
}
}
pub(crate) fn is_empty(&self) -> bool {
self.values.iter().all(|v| v.is_empty())
}
pub(crate) fn iter(&self, generation: Generation) -> Option<StreamIter<'_>> {
let iter: Box<dyn Iterator<Item = &ValueAggregate>> = match generation {
Generation::Nth(generation) if generation >= self.generations_count() => return None,
Generation::Nth(generation) => {
Box::new(self.values.iter().take(generation.next().into()).flat_map(|v| v.iter()))
}
Generation::Last => Box::new(self.values.iter().flat_map(|v| v.iter())),
};
// unwrap is safe here, because generation's been already checked
let len = self.generation_elements_count(generation).unwrap();
let iter = StreamIter { iter, len };
Some(iter)
}
pub(crate) fn slice_iter(&self, start: Generation, end: Generation) -> Option<StreamSliceIter<'_>> {
if self.is_empty() {
return None;
}
let generations_count = self.generations_count() - 1;
let (start, end) = match (start, end) {
(Generation::Nth(start), Generation::Nth(end)) => (usize::from(start), usize::from(end)),
(Generation::Nth(start), Generation::Last) => (start.into(), generations_count),
(Generation::Last, Generation::Nth(end)) => (generations_count, end.into()),
(Generation::Last, Generation::Last) => (generations_count, generations_count),
};
if start > end || end > generations_count {
return None;
}
let len = (end - start) + 1;
let iter: Box<dyn Iterator<Item = &[ValueAggregate]>> =
Box::new(self.values.iter().skip(start).take(len).map(|v| v.as_slice()));
let iter = StreamSliceIter { iter, len };
Some(iter)
}
/// Removes empty generations updating data and returns final generation count.
pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<GenerationIdx> {
self.remove_empty_generations();
for (generation, values) in self.values.iter().enumerate() {
for value in values.iter() {
trace_ctx
.update_generation(value.get_trace_pos(), generation.into())
.map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompatificationError(e)))?;
}
}
let last_generation_idx = self.values.len();
Ok(last_generation_idx.into())
}
/// Removes empty generations from current values.
fn remove_empty_generations(&mut self) {
self.values.retain(|values| !values.is_empty());
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Generation {
Last,
Nth(GenerationIdx),
}
impl Generation {
pub fn last() -> Self {
Self::Last
}
#[cfg(test)]
pub fn nth(generation_id: u32) -> Self {
use std::convert::TryFrom;
let generation_id = usize::try_from(generation_id).unwrap();
let generation_idx = GenerationIdx::from(generation_id);
Self::Nth(generation_idx)
}
}
pub(crate) struct StreamIter<'result> {
iter: Box<dyn Iterator<Item = &'result ValueAggregate> + 'result>,
len: usize,
}
impl<'result> Iterator for StreamIter<'result> {
type Item = &'result ValueAggregate;
fn next(&mut self) -> Option<Self::Item> {
if self.len > 0 {
self.len -= 1;
}
self.iter.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}
impl<'result> ExactSizeIterator for StreamIter<'result> {}
pub(crate) struct StreamSliceIter<'slice> {
iter: Box<dyn Iterator<Item = &'slice [ValueAggregate]> + 'slice>,
pub len: usize,
}
impl<'slice> Iterator for StreamSliceIter<'slice> {
type Item = &'slice [ValueAggregate];
fn next(&mut self) -> Option<Self::Item> {
if self.len > 0 {
self.len -= 1;
}
self.iter.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}
use std::fmt;
impl fmt::Display for Stream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.values.is_empty() {
return write!(f, "[]");
}
writeln!(f, "[")?;
for (id, generation) in self.values.iter().enumerate() {
write!(f, " -- {id}: ")?;
for value in generation.iter() {
write!(f, "{value:?}, ")?;
}
writeln!(f)?;
}
write!(f, "]")
}
}
impl fmt::Display for Generation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Generation::Nth(generation) => write!(f, "{}", generation),
Generation::Last => write!(f, "Last"),
}
}
}
#[cfg(test)]
mod test {
use super::Generation;
use super::Stream;
use super::ValueAggregate;
use super::ValueSource;
use crate::execution_step::ServiceResultAggregate;
use air_interpreter_cid::CID;
use serde_json::json;
use std::rc::Rc;
#[test]
fn test_slice_iter() {
let value_1 = ValueAggregate::from_service_result(
ServiceResultAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into()),
CID::new("some fake cid").into(),
);
let value_2 = ValueAggregate::from_service_result(
ServiceResultAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into()),
CID::new("some fake cid").into(),
);
let mut stream = Stream::from_generations_count(2.into(), 0.into());
stream
.add_value(value_1, Generation::nth(0), ValueSource::PreviousData)
.unwrap();
stream
.add_value(value_2, Generation::nth(1), ValueSource::PreviousData)
.unwrap();
let slice = stream.slice_iter(Generation::nth(0), Generation::nth(1)).unwrap();
assert_eq!(slice.len, 2);
let slice = stream.slice_iter(Generation::nth(0), Generation::Last).unwrap();
assert_eq!(slice.len, 2);
let slice = stream.slice_iter(Generation::nth(0), Generation::nth(0)).unwrap();
assert_eq!(slice.len, 1);
let slice = stream.slice_iter(Generation::Last, Generation::Last).unwrap();
assert_eq!(slice.len, 1);
}
#[test]
fn test_slice_on_empty_stream() {
let stream = Stream::from_generations_count(2.into(), 0.into());
let slice = stream.slice_iter(Generation::nth(0), Generation::nth(1));
assert!(slice.is_none());
let slice = stream.slice_iter(Generation::nth(0), Generation::Last);
assert!(slice.is_none());
let slice = stream.slice_iter(Generation::nth(0), Generation::nth(0));
assert!(slice.is_none());
let slice = stream.slice_iter(Generation::Last, Generation::Last);
assert!(slice.is_none());
}
#[test]
fn generation_from_current_data() {
let value_1 = ValueAggregate::from_service_result(
ServiceResultAggregate::new(Rc::new(json!("value_1")), <_>::default(), 1.into()),
CID::new("some fake cid").into(),
);
let value_2 = ValueAggregate::from_service_result(
ServiceResultAggregate::new(Rc::new(json!("value_2")), <_>::default(), 2.into()),
CID::new("some fake cid").into(),
);
let mut stream = Stream::from_generations_count(5.into(), 5.into());
stream
.add_value(value_1.clone(), Generation::nth(2), ValueSource::CurrentData)
.unwrap();
stream
.add_value(value_2.clone(), Generation::nth(4), ValueSource::PreviousData)
.unwrap();
let generations_count = stream.generations_count();
assert_eq!(generations_count, 10);
let mut iter = stream.iter(Generation::Last).unwrap();
let stream_value_1 = iter.next().unwrap();
let stream_value_2 = iter.next().unwrap();
assert_eq!(stream_value_1, &value_2);
assert_eq!(stream_value_2, &value_1);
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod recursive_stream;
mod stream_definition;
pub(self) mod values_matrix;
pub(crate) use recursive_stream::IterableValue;
pub(crate) use recursive_stream::RecursiveCursorState;
pub(crate) use recursive_stream::RecursiveStreamCursor;
pub(crate) use recursive_stream::StreamCursor;
pub(crate) use stream_definition::Generation;
pub(crate) use stream_definition::Stream;

View File

@ -0,0 +1,254 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::Stream;
use crate::execution_step::boxed_value::Iterable;
use crate::execution_step::boxed_value::IterableItem;
use crate::execution_step::boxed_value::IterableVecResolvedCall;
use crate::execution_step::ValueAggregate;
use air_interpreter_data::GenerationIdx;
pub(crate) type IterableValue = Box<dyn for<'ctx> Iterable<'ctx, Item = IterableItem<'ctx>>>;
/// Tracks a state of a stream by storing last generation of every value type.
#[derive(Debug, Clone, Copy)]
pub struct StreamCursor {
pub previous_start_idx: GenerationIdx,
pub current_start_idx: GenerationIdx,
pub new_start_idx: GenerationIdx,
}
/// Intended to generate values for recursive stream handling.
///
/// It could be considered as a simple state machine which should be started with
/// fold_started and then continued with next_iteration:
/// met_fold_start - met_iteration_end - ... met_iteration_end - Exhausted
/// | |
/// Exhausted Exhausted
#[derive(Debug, Clone, Copy)]
pub(crate) struct RecursiveStreamCursor {
cursor: StreamCursor,
}
pub(crate) enum RecursiveCursorState {
Continue(Vec<IterableValue>),
Exhausted,
}
impl RecursiveStreamCursor {
pub fn new() -> Self {
Self {
cursor: StreamCursor::empty(),
}
}
pub fn met_fold_start(&mut self, stream: &mut Stream<ValueAggregate>) -> RecursiveCursorState {
let state = self.cursor_state(stream);
self.cursor = stream.cursor();
if state.should_continue() {
// add a new generation to made all consequence "new" (meaning that they are just executed on this peer)
// write operation to this stream to write to this new generation
stream.new_values().add_new_empty_generation();
}
state
}
pub fn met_iteration_end(&mut self, stream: &mut Stream<ValueAggregate>) -> RecursiveCursorState {
let state = self.cursor_state(stream);
// remove last generation if it empty to track cursor state
remove_last_generation_if_empty(stream);
self.cursor = stream.cursor();
// add new last generation to store new values into this generation
stream.new_values().add_new_empty_generation();
state
}
fn cursor_state(&self, stream: &Stream<ValueAggregate>) -> RecursiveCursorState {
let slice_iter = stream.slice_iter(self.cursor);
let iterable = Self::slice_iter_to_iterable(slice_iter);
RecursiveCursorState::from_iterable_values(iterable)
}
fn slice_iter_to_iterable<'value>(iter: impl Iterator<Item = &'value [ValueAggregate]>) -> Vec<IterableValue> {
iter.map(|iterable| {
let foldable = IterableVecResolvedCall::init(iterable.to_vec());
let foldable: IterableValue = Box::new(foldable);
foldable
})
.collect::<Vec<_>>()
}
}
fn remove_last_generation_if_empty(stream: &mut Stream<ValueAggregate>) {
if stream.new_values().last_generation_is_empty() {
stream.new_values().remove_last_generation();
}
}
impl StreamCursor {
pub(crate) fn empty() -> Self {
Self {
previous_start_idx: GenerationIdx::from(0),
current_start_idx: GenerationIdx::from(0),
new_start_idx: GenerationIdx::from(0),
}
}
pub(crate) fn new(
previous_start_idx: GenerationIdx,
current_start_idx: GenerationIdx,
new_start_idx: GenerationIdx,
) -> Self {
Self {
previous_start_idx,
current_start_idx,
new_start_idx,
}
}
}
impl RecursiveCursorState {
pub(crate) fn from_iterable_values(values: Vec<IterableValue>) -> Self {
if values.is_empty() {
Self::Exhausted
} else {
Self::Continue(values)
}
}
pub(crate) fn should_continue(&self) -> bool {
matches!(self, Self::Continue(_))
}
}
#[cfg(test)]
mod test {
use super::IterableValue;
use super::RecursiveCursorState;
use super::RecursiveStreamCursor;
use super::Stream;
use super::ValueAggregate;
use crate::execution_step::Generation;
use crate::execution_step::ServiceResultAggregate;
use crate::JValue;
use air_interpreter_cid::CID;
use serde_json::json;
use std::rc::Rc;
fn create_value(value: JValue) -> ValueAggregate {
ValueAggregate::from_service_result(
ServiceResultAggregate::new(Rc::new(value), <_>::default(), 0.into()),
CID::new("some fake cid").into(),
)
}
fn iterables_unwrap(cursor_state: RecursiveCursorState) -> Vec<IterableValue> {
match cursor_state {
RecursiveCursorState::Continue(iterables) => iterables,
RecursiveCursorState::Exhausted => panic!("cursor is exhausted"),
}
}
#[test]
fn fold_started_empty_if_no_values() {
let mut stream = Stream::new();
let mut recursive_stream = RecursiveStreamCursor::new();
let cursor_state = recursive_stream.met_fold_start(&mut stream);
assert!(!cursor_state.should_continue())
}
#[test]
fn next_iteration_empty_if_no_values() {
let mut stream = Stream::new();
let mut recursive_stream = RecursiveStreamCursor::new();
let cursor_state = recursive_stream.met_iteration_end(&mut stream);
assert!(!cursor_state.should_continue())
}
#[test]
fn next_iteration_empty_if_no_values_added() {
let mut stream = Stream::new();
let mut recursive_stream = RecursiveStreamCursor::new();
let value = create_value(json!("1"));
stream.add_value(value, Generation::Current(0.into()));
let cursor_state = recursive_stream.met_fold_start(&mut stream);
let iterables = iterables_unwrap(cursor_state);
assert_eq!(iterables.len(), 1);
let cursor_state = recursive_stream.met_iteration_end(&mut stream);
assert!(!cursor_state.should_continue());
}
#[test]
fn one_recursive_iteration() {
let mut stream = Stream::new();
let mut recursive_stream = RecursiveStreamCursor::new();
let value = create_value(json!("1"));
stream.add_value(value.clone(), Generation::Current(0.into()));
let cursor_state = recursive_stream.met_fold_start(&mut stream);
let iterables = iterables_unwrap(cursor_state);
assert_eq!(iterables.len(), 1);
stream.add_value(value.clone(), Generation::New);
stream.add_value(value, Generation::New);
let cursor_state = recursive_stream.met_iteration_end(&mut stream);
let iterables = iterables_unwrap(cursor_state);
assert_eq!(iterables.len(), 1);
let cursor_state = recursive_stream.met_iteration_end(&mut stream);
assert!(!cursor_state.should_continue());
}
#[test]
fn add_value_into_prev_and_current() {
let mut stream = Stream::new();
let mut recursive_stream = RecursiveStreamCursor::new();
let value = create_value(json!("1"));
stream.add_value(value.clone(), Generation::Current(0.into()));
let cursor_state = recursive_stream.met_fold_start(&mut stream);
assert!(cursor_state.should_continue());
stream.add_value(value.clone(), Generation::Previous(0.into()));
let cursor_state = recursive_stream.met_iteration_end(&mut stream);
assert!(cursor_state.should_continue());
stream.add_value(value, Generation::Current(1.into()));
let cursor_state = recursive_stream.met_iteration_end(&mut stream);
assert!(cursor_state.should_continue());
let cursor_state = recursive_stream.met_iteration_end(&mut stream);
assert!(!cursor_state.should_continue());
}
}

View File

@ -0,0 +1,554 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::values_matrix::NewValuesMatrix;
use super::values_matrix::ValuesMatrix;
use super::StreamCursor;
use crate::execution_step::boxed_value::TracePosOperate;
use crate::execution_step::ExecutionResult;
use air_interpreter_data::GenerationIdx;
use air_trace_handler::TraceHandler;
/// Streams are CRDT-like append only data structures. They are guaranteed to have locally
/// the same order of values on each peer.
#[derive(Debug, Clone)]
pub struct Stream<T> {
/// Values from previous data.
previous_values: ValuesMatrix<T>,
/// Values from current data.
current_values: ValuesMatrix<T>,
/// Values from call results or aps executed on a current peer.
new_values: NewValuesMatrix<T>,
}
impl<'value, T: 'value> Stream<T> {
pub(crate) fn new() -> Self {
Self {
previous_values: ValuesMatrix::new(),
current_values: ValuesMatrix::new(),
new_values: NewValuesMatrix::new(),
}
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &T> {
self.previous_values
.iter()
.chain(self.current_values.iter())
.chain(self.new_values.iter())
}
// Contract: all slices will be non-empty
pub(crate) fn slice_iter(&self, cursor: StreamCursor) -> impl Iterator<Item = &[T]> {
self.previous_values
.slice_iter(cursor.previous_start_idx)
.chain(self.current_values.slice_iter(cursor.current_start_idx))
.chain(self.new_values.slice_iter(cursor.new_start_idx))
}
pub(crate) fn cursor(&self) -> StreamCursor {
StreamCursor::new(
self.previous_values.generations_count(),
self.current_values.generations_count(),
self.new_values.generations_count(),
)
}
pub(super) fn new_values(&mut self) -> &mut NewValuesMatrix<T> {
&mut self.new_values
}
}
impl<'value, T: 'value + Clone + fmt::Display> Stream<T> {
pub(crate) fn add_value(&mut self, value: T, generation: Generation) {
match generation {
Generation::Previous(previous_gen) => self.previous_values.add_value_to_generation(value, previous_gen),
Generation::Current(current_gen) => self.current_values.add_value_to_generation(value, current_gen),
Generation::New => self.new_values.add_to_last_generation(value),
}
}
}
impl<'value, T: 'value + TracePosOperate + fmt::Display> Stream<T> {
/// Removes empty generations updating data.
pub(crate) fn compactify(&mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
self.previous_values.remove_empty_generations();
self.current_values.remove_empty_generations();
self.new_values.remove_empty_generations();
let start_idx = 0.into();
Self::update_generations(self.previous_values.slice_iter(0.into()), start_idx, trace_ctx)?;
let start_idx = self.previous_values.generations_count();
Self::update_generations(self.current_values.slice_iter(0.into()), start_idx, trace_ctx)?;
let start_idx = start_idx.checked_add(self.current_values.generations_count()).unwrap();
Self::update_generations(self.new_values.slice_iter(0.into()), start_idx, trace_ctx)?;
Ok(())
}
fn update_generations(
values: impl Iterator<Item = &'value [T]>,
start_idx: GenerationIdx,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> {
use crate::execution_step::errors::UncatchableError;
use crate::execution_step::ExecutionError;
for (position, values) in values.enumerate() {
// TODO: replace it with error
let generation = start_idx.checked_add(position.into()).unwrap();
for value in values.iter() {
trace_ctx
.update_generation(value.get_trace_pos(), generation)
.map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompactificationError(e)))?;
}
}
Ok(())
}
}
impl<T> Default for Stream<T> {
fn default() -> Self {
Self {
previous_values: <_>::default(),
current_values: <_>::default(),
new_values: <_>::default(),
}
}
}
use air_trace_handler::merger::MetApResult;
use air_trace_handler::merger::ValueSource;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Generation {
Previous(GenerationIdx),
Current(GenerationIdx),
New,
}
impl Generation {
#[cfg(test)]
pub fn previous(generation_id: u32) -> Self {
use std::convert::TryFrom;
let generation_id = usize::try_from(generation_id).unwrap();
let generation_idx = GenerationIdx::from(generation_id);
Self::Previous(generation_idx)
}
#[cfg(test)]
pub fn current(generation_id: u32) -> Self {
use std::convert::TryFrom;
let generation_id = usize::try_from(generation_id).unwrap();
let generation_idx = GenerationIdx::from(generation_id);
Self::Current(generation_idx)
}
pub fn from_met_result(result: &MetApResult) -> Self {
Self::from_data(result.value_source, result.generation)
}
pub fn from_data(data_type: ValueSource, generation: GenerationIdx) -> Self {
match data_type {
ValueSource::PreviousData => Generation::Previous(generation),
ValueSource::CurrentData => Generation::Current(generation),
}
}
pub fn new() -> Self {
Self::New
}
}
use std::fmt;
impl<T: fmt::Display> fmt::Display for Stream<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "previous values:\n{}", self.previous_values)?;
writeln!(f, "current values:\n{}", self.current_values)?;
writeln!(f, "new values:\n{}", self.new_values)
}
}
impl fmt::Display for Generation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Generation::Previous(generation) => write!(f, "previous({})", generation),
Generation::Current(generation) => write!(f, "current({})", generation),
Generation::New => write!(f, "new"),
}
}
}
#[cfg(test)]
mod test {
use super::Generation;
use super::StreamCursor;
use super::TraceHandler;
use crate::execution_step::ServiceResultAggregate;
use crate::execution_step::ValueAggregate;
use crate::ExecutionError;
use crate::JValue;
use crate::UncatchableError;
use air_interpreter_cid::CID;
use air_interpreter_data::ApResult;
use air_interpreter_data::CanonResult;
use air_interpreter_data::ExecutedState;
use air_interpreter_data::ExecutionTrace;
use air_interpreter_data::TracePos;
use air_trace_handler::GenerationCompactificationError;
use serde_json::json;
use std::rc::Rc;
type Stream = super::Stream<ValueAggregate>;
fn create_value(value: JValue) -> ValueAggregate {
ValueAggregate::from_service_result(
ServiceResultAggregate::new(Rc::new(value), <_>::default(), 1.into()),
CID::new("some fake cid").into(),
)
}
fn create_value_with_pos(value: JValue, trace_pos: TracePos) -> ValueAggregate {
ValueAggregate::from_service_result(
ServiceResultAggregate::new(Rc::new(value), <_>::default(), trace_pos),
CID::new("some fake cid").into(),
)
}
#[test]
fn test_iter() {
let value_1 = create_value(json!("value_1"));
let value_2 = create_value(json!("value_2"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::previous(0));
stream.add_value(value_2.clone(), Generation::previous(1));
let mut iter = stream.iter();
println!(" after getting iter");
assert_eq!(iter.next(), Some(&value_1));
assert_eq!(iter.next(), Some(&value_2));
assert_eq!(iter.next(), None);
}
#[test]
fn test_slice_iter_prev() {
let value_1 = create_value(json!("value_1"));
let value_2 = create_value(json!("value_2"));
let value_3 = create_value(json!("value_3"));
let value_4 = create_value(json!("value_4"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::previous(0));
stream.add_value(value_2.clone(), Generation::previous(0));
stream.add_value(value_3.clone(), Generation::previous(0));
stream.add_value(value_4.clone(), Generation::previous(0));
let mut slice_iter = stream.slice_iter(StreamCursor::empty());
assert_eq!(
slice_iter.next(),
Some(vec![value_1, value_2, value_3, value_4].as_slice())
);
assert_eq!(slice_iter.next(), None);
}
#[test]
fn test_slice_iter_current() {
let value_1 = create_value(json!("value_1"));
let value_2 = create_value(json!("value_2"));
let value_3 = create_value(json!("value_3"));
let value_4 = create_value(json!("value_4"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::current(0));
stream.add_value(value_2.clone(), Generation::current(0));
stream.add_value(value_3.clone(), Generation::current(0));
stream.add_value(value_4.clone(), Generation::current(0));
let mut slice_iter = stream.slice_iter(StreamCursor::empty());
assert_eq!(
slice_iter.next(),
Some(vec![value_1, value_2, value_3, value_4].as_slice())
);
assert_eq!(slice_iter.next(), None);
}
#[test]
fn test_slice_iter_new() {
let value_1 = create_value(json!("value_1"));
let value_2 = create_value(json!("value_2"));
let value_3 = create_value(json!("value_3"));
let value_4 = create_value(json!("value_4"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::New);
stream.add_value(value_2.clone(), Generation::New);
stream.add_value(value_3.clone(), Generation::New);
stream.add_value(value_4.clone(), Generation::New);
let mut slice_iter = stream.slice_iter(StreamCursor::empty());
assert_eq!(
slice_iter.next(),
Some(vec![value_1, value_2, value_3, value_4].as_slice())
);
assert_eq!(slice_iter.next(), None);
}
#[test]
fn test_iter_on_empty_stream() {
let stream = Stream::new();
let mut slice = stream.iter();
assert_eq!(slice.next(), None);
}
#[test]
fn test_slice_on_empty_stream() {
let stream = Stream::new();
let mut slice = stream.slice_iter(StreamCursor::empty());
assert_eq!(slice.next(), None);
}
#[test]
fn generation_from_current_data_after_previous() {
let value_1 = create_value(json!("value_1"));
let value_2 = create_value(json!("value_2"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::current(0));
stream.add_value(value_2.clone(), Generation::previous(0));
let mut iter = stream.iter();
assert_eq!(iter.next(), Some(&value_2));
assert_eq!(iter.next(), Some(&value_1));
assert_eq!(iter.next(), None);
}
#[test]
fn generation_from_new_data_after_current_and_previous() {
let value_1 = create_value(json!("value_1"));
let value_2 = create_value(json!("value_2"));
let value_3 = create_value(json!("value_3"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::New);
stream.add_value(value_2.clone(), Generation::current(0));
stream.add_value(value_3.clone(), Generation::previous(0));
let mut iter = stream.iter();
assert_eq!(iter.next(), Some(&value_3));
assert_eq!(iter.next(), Some(&value_2));
assert_eq!(iter.next(), Some(&value_1));
assert_eq!(iter.next(), None);
}
#[test]
fn empty_generations_skipped_in_slice_iter_prev() {
let value_1 = create_value(json!("value_1"));
let value_2 = create_value(json!("value_2"));
let value_3 = create_value(json!("value_3"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::previous(0));
stream.add_value(value_2.clone(), Generation::previous(1));
stream.add_value(value_3.clone(), Generation::previous(3));
let mut slice_iter = stream.slice_iter(StreamCursor::empty());
assert_eq!(slice_iter.next(), Some(vec![value_1].as_slice()));
assert_eq!(slice_iter.next(), Some(vec![value_2].as_slice()));
assert_eq!(slice_iter.next(), Some(vec![value_3].as_slice()));
assert_eq!(slice_iter.next(), None);
}
#[test]
fn empty_generations_skipped_in_slice_iter_current() {
let value_1 = create_value(json!("value_1"));
let value_2 = create_value(json!("value_2"));
let value_3 = create_value(json!("value_3"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::current(0));
stream.add_value(value_2.clone(), Generation::current(1));
stream.add_value(value_3.clone(), Generation::current(3));
let mut slice_iter = stream.slice_iter(StreamCursor::empty());
assert_eq!(slice_iter.next(), Some(vec![value_1].as_slice()));
assert_eq!(slice_iter.next(), Some(vec![value_2].as_slice()));
assert_eq!(slice_iter.next(), Some(vec![value_3].as_slice()));
assert_eq!(slice_iter.next(), None);
}
#[test]
fn compactification_with_previous_new_generation() {
let value_1 = create_value_with_pos(json!("value_1"), 0.into());
let value_2 = create_value_with_pos(json!("value_2"), 1.into());
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::previous(0));
stream.add_value(value_2.clone(), Generation::New);
let trace = ExecutionTrace::from(vec![]);
let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace);
let ap_result = ApResult::stub();
trace_ctx.meet_ap_end(ap_result.clone());
trace_ctx.meet_ap_end(ap_result);
let compactification_result = stream.compactify(&mut trace_ctx);
assert!(compactification_result.is_ok());
let actual_trace = trace_ctx.into_result_trace();
let expected_trace = vec![
ExecutedState::Ap(ApResult::new(0.into())),
ExecutedState::Ap(ApResult::new(1.into())),
];
let expected_trace = ExecutionTrace::from(expected_trace);
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn compactification_with_current_generation() {
let value_1 = create_value_with_pos(json!("value_1"), 0.into());
let value_2 = create_value_with_pos(json!("value_2"), 1.into());
let value_3 = create_value_with_pos(json!("value_3"), 2.into());
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::current(0));
stream.add_value(value_2.clone(), Generation::current(2));
stream.add_value(value_3.clone(), Generation::current(4));
let trace = ExecutionTrace::from(vec![]);
let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace);
let ap_result = ApResult::stub();
trace_ctx.meet_ap_end(ap_result.clone());
trace_ctx.meet_ap_end(ap_result.clone());
trace_ctx.meet_ap_end(ap_result);
let compactification_result = stream.compactify(&mut trace_ctx);
assert!(compactification_result.is_ok());
let actual_trace = trace_ctx.into_result_trace();
let expected_trace = vec![
ExecutedState::Ap(ApResult::new(0.into())),
ExecutedState::Ap(ApResult::new(1.into())),
ExecutedState::Ap(ApResult::new(2.into())),
];
let expected_trace = ExecutionTrace::from(expected_trace);
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn compactification_works_with_mixed_generations() {
let value_1 = create_value_with_pos(json!("value_1"), 0.into());
let value_2 = create_value_with_pos(json!("value_2"), 1.into());
let value_3 = create_value_with_pos(json!("value_3"), 2.into());
let value_4 = create_value_with_pos(json!("value_1"), 3.into());
let value_5 = create_value_with_pos(json!("value_2"), 4.into());
let value_6 = create_value_with_pos(json!("value_3"), 5.into());
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::New);
stream.add_value(value_2.clone(), Generation::current(4));
stream.add_value(value_3.clone(), Generation::current(0));
stream.add_value(value_4.clone(), Generation::previous(100));
stream.add_value(value_5.clone(), Generation::New);
stream.add_value(value_6.clone(), Generation::current(2));
let trace = ExecutionTrace::from(vec![]);
let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace);
let ap_result = ApResult::stub();
trace_ctx.meet_ap_end(ap_result.clone());
trace_ctx.meet_ap_end(ap_result.clone());
trace_ctx.meet_ap_end(ap_result.clone());
trace_ctx.meet_ap_end(ap_result.clone());
trace_ctx.meet_ap_end(ap_result.clone());
trace_ctx.meet_ap_end(ap_result);
let compactification_result = stream.compactify(&mut trace_ctx);
assert!(compactification_result.is_ok());
let actual_trace = trace_ctx.into_result_trace();
let expected_trace = vec![
ExecutedState::Ap(ApResult::new(4.into())),
ExecutedState::Ap(ApResult::new(3.into())),
ExecutedState::Ap(ApResult::new(1.into())),
ExecutedState::Ap(ApResult::new(0.into())),
ExecutedState::Ap(ApResult::new(4.into())),
ExecutedState::Ap(ApResult::new(2.into())),
];
let expected_trace = ExecutionTrace::from(expected_trace);
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn compactification_invalid_state_error() {
let value_1 = create_value(json!("value_1"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::current(0));
let trace = ExecutionTrace::from(vec![]);
let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace);
let canon_result = CanonResult(Rc::new(CID::new("fake canon CID")));
trace_ctx.meet_canon_end(canon_result.clone());
trace_ctx.meet_canon_end(canon_result.clone());
trace_ctx.meet_canon_end(canon_result);
let compactification_result = stream.compactify(&mut trace_ctx);
assert!(matches!(
compactification_result,
Err(ExecutionError::Uncatchable(
UncatchableError::GenerationCompactificationError(
GenerationCompactificationError::TracePosPointsToInvalidState { .. }
)
))
));
}
#[test]
fn compactification_points_to_nowhere_error() {
let value_1 = create_value(json!("value_1"));
let mut stream = Stream::new();
stream.add_value(value_1.clone(), Generation::current(0));
let trace = ExecutionTrace::from(vec![]);
let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace);
let compactification_result = stream.compactify(&mut trace_ctx);
assert!(matches!(
compactification_result,
Err(ExecutionError::Uncatchable(
UncatchableError::GenerationCompactificationError(
GenerationCompactificationError::TracePosPointsToNowhere { .. }
)
))
));
}
}

View File

@ -0,0 +1,169 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air_interpreter_data::GenerationIdx;
use typed_index_collections::TiVec;
/// It is intended to store values according to generations which stream has. Each generation could
/// contain several values, generation are ordered, meaning that values in elder generations are
/// handled by AIR instructions (such as fold) later. Also, there is an order between values in one
/// generation. And being placed by generations values could be considered as a matrix.
///
/// This matrix is used for values in previous and current data.
#[derive(Debug, Clone)]
pub(crate) struct ValuesMatrix<T> {
/// The first Vec represents generations, the second values in a generation. Generation is a set
/// of values that interpreter obtained from one particle. It means that number of generation on
/// a peer is equal to number of the interpreter runs in context of one particle.
values: TiVec<GenerationIdx, Vec<T>>,
}
impl<T> ValuesMatrix<T> {
pub fn new() -> Self {
Self { values: TiVec::new() }
}
pub fn remove_empty_generations(&mut self) {
self.values.retain(|generation| !generation.is_empty())
}
pub fn generations_count(&self) -> GenerationIdx {
self.values.len().into()
}
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.values.iter().flat_map(|generation| generation.iter())
}
pub fn slice_iter(&self, skip: GenerationIdx) -> impl Iterator<Item = &[T]> {
self.values
.iter()
.filter(|generation| !generation.is_empty())
.skip(skip.into())
.map(|generation| generation.as_ref())
}
}
impl<T: Clone> ValuesMatrix<T> {
pub fn add_value_to_generation(&mut self, value: T, generation_idx: GenerationIdx) {
if generation_idx >= self.values.len() {
// TODO: replace unwrap with error
let new_size = generation_idx.checked_add(1.into()).unwrap();
self.values.resize(new_size.into(), Vec::new());
}
self.values[generation_idx].push(value);
}
}
/// It's intended to handle new values from call results.
#[derive(Debug, Clone)]
pub(crate) struct NewValuesMatrix<T>(ValuesMatrix<T>);
impl<T> NewValuesMatrix<T> {
pub fn new() -> Self {
let values = ValuesMatrix::new();
Self(values)
}
pub fn add_new_empty_generation(&mut self) {
self.0.values.push(vec![]);
}
pub fn remove_empty_generations(&mut self) {
self.0.remove_empty_generations();
}
pub fn remove_last_generation(&mut self) {
self.0.values.pop();
}
pub fn last_generation_is_empty(&mut self) -> bool {
if self.0.values.is_empty() {
return true;
}
self.0.values[self.last_non_empty_generation_idx()].is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.0.iter()
}
pub fn slice_iter(&self, skip: GenerationIdx) -> impl Iterator<Item = &[T]> {
self.0.slice_iter(skip)
}
pub fn generations_count(&self) -> GenerationIdx {
self.0.generations_count()
}
pub fn last_non_empty_generation_idx(&self) -> GenerationIdx {
let values_len = self.0.values.len();
if values_len == 0 {
return 0.into();
}
(values_len - 1).into()
}
}
impl<T: Clone> NewValuesMatrix<T> {
pub fn add_to_last_generation(&mut self, value: T) {
let last_generation_idx = self.last_non_empty_generation_idx();
self.0.add_value_to_generation(value, last_generation_idx);
}
}
impl<T> Default for ValuesMatrix<T> {
fn default() -> Self {
Self { values: TiVec::new() }
}
}
impl<T> Default for NewValuesMatrix<T> {
fn default() -> Self {
Self(<_>::default())
}
}
use std::fmt;
impl<T: fmt::Display> fmt::Display for ValuesMatrix<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.values.is_empty() {
return write!(f, "[]");
}
writeln!(f, "[")?;
for (idx, generation_values) in self.values.iter_enumerated() {
write!(f, " -- {idx}: ")?;
for value in generation_values.iter() {
write!(f, "{value}, ")?;
}
writeln!(f)?;
}
write!(f, "]")
}
}
impl<T: fmt::Display> fmt::Display for NewValuesMatrix<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

View File

@ -14,14 +14,14 @@
* limitations under the License.
*/
use super::stream::*;
use super::Generation;
use super::Stream;
use super::ValueAggregate;
use crate::execution_step::boxed_value::TracePosOperate;
use crate::execution_step::execution_context::stream_map_key::StreamMapKey;
use crate::execution_step::ExecutionResult;
use crate::JValue;
use air_interpreter_data::GenerationIdx;
use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler;
use serde_json::json;
@ -37,13 +37,11 @@ pub struct StreamMap {
}
impl StreamMap {
pub(crate) fn from_generations_count(previous_count: GenerationIdx, current_count: GenerationIdx) -> Self {
Self {
stream: Stream::from_generations_count(previous_count, current_count),
}
pub(crate) fn new() -> Self {
Self { stream: Stream::new() }
}
pub(crate) fn from_value(key: StreamMapKey<'_>, value: &ValueAggregate) -> Self {
pub(crate) fn insert(&mut self, key: StreamMapKey<'_>, value: &ValueAggregate, generation: Generation) {
let obj = from_key_value(key, value.get_result());
let value = ValueAggregate::new(
obj,
@ -51,29 +49,10 @@ impl StreamMap {
value.get_trace_pos(),
value.get_provenance(),
);
Self {
stream: Stream::from_value(value),
}
self.stream.add_value(value, generation)
}
pub(crate) fn insert(
&mut self,
key: StreamMapKey<'_>,
value: &ValueAggregate,
generation: Generation,
source: ValueSource,
) -> ExecutionResult<GenerationIdx> {
let obj = from_key_value(key, value.get_result());
let value = ValueAggregate::new(
obj,
value.get_tetraplet(),
value.get_trace_pos(),
value.get_provenance(),
);
self.stream.add_value(value, generation, source)
}
pub(crate) fn compactify(self, trace_ctx: &mut TraceHandler) -> ExecutionResult<GenerationIdx> {
pub(crate) fn compactify(&mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
self.stream.compactify(trace_ctx)
}
@ -87,8 +66,7 @@ impl StreamMap {
let mut met_keys = HashSet::new();
// it's always possible to go through all values
self.stream.iter(Generation::Last).unwrap().filter(move |value| {
self.stream.iter().filter(move |value| {
StreamMapKey::from_kvpair(value)
.map(|key| met_keys.insert(key))
.unwrap_or(false)
@ -111,106 +89,149 @@ mod test {
use crate::execution_step::boxed_value::stream_map::from_key_value;
use crate::execution_step::execution_context::stream_map_key::StreamMapKey;
use crate::execution_step::ValueAggregate;
use air_trace_handler::merger::ValueSource;
use crate::ExecutionError;
use crate::JValue;
use crate::UncatchableError;
use air_interpreter_cid::CID;
use air_interpreter_data::ExecutionTrace;
use air_trace_handler::GenerationCompactificationError;
use air_trace_handler::TraceHandler;
use serde_json::json;
use std::borrow::Cow;
use std::rc::Rc;
fn create_value_aggregate(value: Rc<JValue>) -> ValueAggregate {
ValueAggregate::new(
value,
<_>::default(),
0.into(),
air_interpreter_data::Provenance::literal(),
)
}
fn compare_stream_iter<'value>(
mut iter: impl Iterator<Item = &'value ValueAggregate>,
key: StreamMapKey<'_>,
value: &Rc<JValue>,
) -> bool {
let actual_value = iter.next().map(|e| e.get_result()).unwrap();
let expected_value = from_key_value(key, value);
actual_value == &expected_value
}
#[test]
fn test_from_value() {
let obj = json!([{"top_level": [{"first": 42},{"second": 43}]}]);
let key_str = "some_key";
let key = StreamMapKey::Str(Cow::Borrowed(key_str));
let value = Rc::new(obj.clone());
fn test_from_value_key_str() {
let key = StreamMapKey::Str(Cow::Borrowed("some_key"));
let value = Rc::new(json!("1"));
let value_aggregate = create_value_aggregate(value.clone());
let generation_idx = 0;
let generation = Generation::Nth(generation_idx.into());
let value_aggregate: ValueAggregate = ValueAggregate::new(
value.clone(),
<_>::default(),
0.into(),
air_interpreter_data::Provenance::literal(),
);
let stream_map = StreamMap::from_value(key.clone(), &value_aggregate);
let mut stream_map = StreamMap::new();
stream_map.insert(key.clone(), &value_aggregate, Generation::New);
let mut iter = stream_map.stream.iter();
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
let v = internal_stream_iter.next().map(|e| e.get_result()).unwrap();
let examplar = from_key_value(key, value.as_ref());
assert_eq!(*v, examplar);
assert_eq!(internal_stream_iter.next(), None);
assert!(compare_stream_iter(&mut iter, key, &value));
assert_eq!(iter.next(), None);
}
#[test]
fn test_from_value_key_int() {
let key = StreamMapKey::I64(42.into());
let value_aggregate = ValueAggregate::new(
value.clone(),
<_>::default(),
0.into(),
air_interpreter_data::Provenance::literal(),
);
let stream_map = StreamMap::from_value(key.clone(), &value_aggregate);
let value = Rc::new(json!("1"));
let value_aggregate = create_value_aggregate(value.clone());
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
let examplar = from_key_value(key, value.as_ref());
assert_eq!(*v, *examplar.as_ref());
assert_eq!(internal_stream_iter.next(), None);
let mut stream_map = StreamMap::new();
stream_map.insert(key.clone(), &value_aggregate, Generation::New);
let mut iter = stream_map.stream.iter();
assert!(compare_stream_iter(&mut iter, key, &value));
assert_eq!(iter.next(), None);
}
#[test]
fn test_insert() {
let obj = json!([{"top_level": [{"first": 42},{"second": 43}]}]);
let key_str = "some_key";
let key12 = StreamMapKey::Str(Cow::Borrowed(key_str));
let value = Rc::new(obj.clone());
let generation_idx = 0;
let value_aggregate: ValueAggregate = ValueAggregate::new(
value.clone(),
<_>::default(),
0.into(),
air_interpreter_data::Provenance::literal(),
);
let mut stream_map = StreamMap::from_value(key12.clone(), &value_aggregate);
let generation = Generation::Nth(generation_idx.into());
let generation_idx_res = stream_map
.insert(key12.clone(), &value_aggregate, generation, ValueSource::CurrentData)
.unwrap();
assert_eq!(generation_idx_res, generation_idx);
let key_1_2 = StreamMapKey::Str(Cow::Borrowed("some_key"));
let value_1 = Rc::new(json!("1"));
let value_aggregate_1 = create_value_aggregate(value_1.clone());
let examplar = from_key_value(key12, value.as_ref());
let s = stream_map
.stream
.iter(generation)
.unwrap()
.all(|e| *e.get_result().as_ref() == *examplar.as_ref());
assert!(s);
let value_2 = Rc::new(json!("2"));
let value_aggregate_2 = create_value_aggregate(value_2.clone());
let key_str = "other_key";
let key3 = StreamMapKey::Str(Cow::Borrowed(key_str));
let generation_idx = stream_map
.insert(key3.clone(), &value_aggregate, generation, ValueSource::CurrentData)
.unwrap();
assert_eq!(generation_idx_res, generation_idx);
let mut stream_map = StreamMap::new();
stream_map.insert(key_1_2.clone(), &value_aggregate_1, Generation::New);
stream_map.insert(key_1_2.clone(), &value_aggregate_2, Generation::Current(0.into()));
let key4 = StreamMapKey::I64(42.into());
let generation_idx = stream_map
.insert(key4.clone(), &value_aggregate, generation, ValueSource::CurrentData)
.unwrap();
assert_eq!(generation_idx_res, generation_idx);
let key_3 = StreamMapKey::Str(Cow::Borrowed("other_key"));
let value_3 = Rc::new(json!("3"));
let value_aggregate_3 = create_value_aggregate(value_3.clone());
stream_map.insert(key_3.clone(), &value_aggregate_3, Generation::Current(0.into()));
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
assert_eq!(*v, *examplar.as_ref());
let key_4 = StreamMapKey::I64(42.into());
let value_4 = Rc::new(json!("4"));
let value_aggregate_4 = create_value_aggregate(value_4.clone());
stream_map.insert(key_4.clone(), &value_aggregate_4, Generation::Current(0.into()));
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
assert_eq!(*v, *examplar.as_ref());
let mut iter = stream_map.stream.iter();
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
let examplar = from_key_value(key3, value.as_ref());
assert_eq!(*v, *examplar.as_ref());
assert!(compare_stream_iter(&mut iter, key_1_2.clone(), &value_2));
assert!(compare_stream_iter(&mut iter, key_3, &value_3));
assert!(compare_stream_iter(&mut iter, key_4, &value_4));
assert!(compare_stream_iter(&mut iter, key_1_2, &value_1));
assert_eq!(iter.next(), None);
}
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
let examplar = from_key_value(key4, value.as_ref());
assert_eq!(*v, *examplar.as_ref());
assert_eq!(internal_stream_iter.next(), None);
#[test]
fn compactification_invalid_state_error() {
use air_interpreter_data::CanonResult;
let key = StreamMapKey::Str(Cow::Borrowed("some_key"));
let value = Rc::new(json!("1"));
let value_aggregate = create_value_aggregate(value.clone());
let mut stream_map = StreamMap::new();
stream_map.insert(key, &value_aggregate, Generation::Current(0.into()));
let trace = ExecutionTrace::from(vec![]);
let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace);
let canon_result = CanonResult(Rc::new(CID::new("fake canon CID")));
trace_ctx.meet_canon_end(canon_result.clone());
trace_ctx.meet_canon_end(canon_result.clone());
trace_ctx.meet_canon_end(canon_result);
let compactification_result = stream_map.compactify(&mut trace_ctx);
assert!(matches!(
compactification_result,
Err(ExecutionError::Uncatchable(
UncatchableError::GenerationCompactificationError(
GenerationCompactificationError::TracePosPointsToInvalidState { .. }
)
))
));
}
#[test]
fn compactification_points_to_nowhere_error() {
let key = StreamMapKey::Str(Cow::Borrowed("some_key"));
let value = Rc::new(json!("1"));
let value_aggregate = create_value_aggregate(value.clone());
let mut stream_map = StreamMap::new();
stream_map.insert(key, &value_aggregate, Generation::current(0));
let trace = ExecutionTrace::from(vec![]);
let mut trace_ctx = TraceHandler::from_trace(trace.clone(), trace);
let compactification_result = stream_map.compactify(&mut trace_ctx);
assert!(matches!(
compactification_result,
Err(ExecutionError::Uncatchable(
UncatchableError::GenerationCompactificationError(
GenerationCompactificationError::TracePosPointsToNowhere { .. }
)
))
));
}
fn generate_key_values(count: usize) -> Vec<(String, ValueAggregate)> {
@ -230,30 +251,18 @@ mod test {
.collect()
}
fn insert_into_map(
stream_map: &mut StreamMap,
key_value: &(String, ValueAggregate),
generation: Generation,
source: ValueSource,
) {
stream_map
.insert(key_value.0.as_str().into(), &key_value.1, generation, source)
.unwrap();
fn insert_into_map(stream_map: &mut StreamMap, key_value: &(String, ValueAggregate), generation: Generation) {
stream_map.insert(key_value.0.as_str().into(), &key_value.1, generation);
}
#[test]
fn get_unique_map_keys_stream_behaves_correct_with_no_duplicates() {
const TEST_DATA_SIZE: usize = 3;
let key_values = generate_key_values(TEST_DATA_SIZE);
let mut stream_map = StreamMap::from_generations_count(0.into(), TEST_DATA_SIZE.into());
let mut stream_map = StreamMap::new();
for id in 0..3 {
insert_into_map(
&mut stream_map,
&key_values[id],
Generation::nth(id as u32),
ValueSource::CurrentData,
);
insert_into_map(&mut stream_map, &key_values[id], Generation::current(id as u32));
}
let mut iter = stream_map.iter_unique_key();
@ -266,19 +275,17 @@ mod test {
#[test]
fn get_unique_map_keys_stream_removes_duplicates() {
use ValueSource::CurrentData;
const TEST_DATA_SIZE: usize = 5;
let key_values = generate_key_values(TEST_DATA_SIZE);
let mut stream_map = StreamMap::from_generations_count(0.into(), TEST_DATA_SIZE.into());
insert_into_map(&mut stream_map, &key_values[0], Generation::nth(0), CurrentData);
insert_into_map(&mut stream_map, &key_values[0], Generation::nth(1), CurrentData);
insert_into_map(&mut stream_map, &key_values[2], Generation::nth(1), CurrentData);
insert_into_map(&mut stream_map, &key_values[2], Generation::nth(3), CurrentData);
insert_into_map(&mut stream_map, &key_values[2], Generation::nth(4), CurrentData);
insert_into_map(&mut stream_map, &key_values[1], Generation::nth(4), CurrentData);
insert_into_map(&mut stream_map, &key_values[3], Generation::nth(2), CurrentData);
let mut stream_map = StreamMap::new();
insert_into_map(&mut stream_map, &key_values[0], Generation::current(0));
insert_into_map(&mut stream_map, &key_values[0], Generation::current(1));
insert_into_map(&mut stream_map, &key_values[2], Generation::current(1));
insert_into_map(&mut stream_map, &key_values[2], Generation::current(3));
insert_into_map(&mut stream_map, &key_values[2], Generation::current(4));
insert_into_map(&mut stream_map, &key_values[1], Generation::current(4));
insert_into_map(&mut stream_map, &key_values[3], Generation::current(2));
let mut iter = stream_map.iter_unique_key();

View File

@ -20,7 +20,7 @@ use crate::ToErrorCode;
use air_interpreter_cid::CidCalculationError;
use air_interpreter_data::ValueRef;
use air_trace_handler::GenerationCompatificationError;
use air_trace_handler::GenerationCompactificationError;
use air_trace_handler::IntConversionError;
use air_trace_handler::TraceHandlerError;
@ -46,7 +46,7 @@ pub enum UncatchableError {
/// These errors are related to internal bug in the interpreter when result trace is corrupted.
#[error(transparent)]
GenerationCompatificationError(#[from] GenerationCompatificationError),
GenerationCompactificationError(#[from] GenerationCompactificationError),
/// Integer casts, e.g. usize(=u64) to u32, might trigger such errors.
#[error(transparent)]

View File

@ -25,8 +25,6 @@ use air_execution_info_collector::InstructionTracker;
use air_interpreter_cid::CID;
use air_interpreter_data::CanonResultCidAggregate;
use air_interpreter_data::CidInfo;
use air_interpreter_data::GlobalStreamGens;
use air_interpreter_data::RestrictedStreamGens;
use air_interpreter_data::ServiceResultCidAggregate;
use air_interpreter_interface::*;
use air_interpreter_signatures::SignatureStore;
@ -99,12 +97,7 @@ impl<'i> ExecutionCtx<'i> {
run_parameters: &RunParameters,
) -> Self {
let run_parameters = RcRunParameters::from_run_parameters(run_parameters);
let streams = Streams::from_data(
prev_ingredients.global_streams,
current_ingredients.global_streams,
prev_ingredients.restricted_streams,
current_ingredients.restricted_streams,
);
let streams = Streams::new();
let cid_state = ExecutionCidState::from_cid_info(prev_ingredients.cid_info, current_ingredients.cid_info);
// TODO we might keep both stores and merge them only with signature info collected into SignatureTracker
@ -162,9 +155,7 @@ impl ExecutionCtx<'_> {
/// Helper struct for ExecCtx construction.
#[derive(Debug, Clone)]
pub(crate) struct ExecCtxIngredients {
pub(crate) global_streams: GlobalStreamGens,
pub(crate) last_call_request_id: u32,
pub(crate) restricted_streams: RestrictedStreamGens,
pub(crate) cid_info: CidInfo,
pub(crate) signature_store: SignatureStore,
}

View File

@ -23,12 +23,8 @@ use crate::execution_step::ExecutionResult;
use crate::execution_step::Generation;
use crate::execution_step::ValueAggregate;
use air_interpreter_data::GenerationIdx;
use air_interpreter_data::RestrictedStreamGens;
use air_interpreter_data::RestrictedStreamMapGens;
use air_parser::ast::Span;
use air_parser::AirPos;
use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler;
use std::collections::hash_map::Entry::{Occupied, Vacant};
@ -39,23 +35,15 @@ use std::fmt;
pub(crate) struct StreamMapValueDescriptor<'stream_name> {
pub value: ValueAggregate,
pub name: &'stream_name str,
pub source: ValueSource,
pub generation: Generation,
pub position: AirPos,
}
impl<'stream_name> StreamMapValueDescriptor<'stream_name> {
pub fn new(
value: ValueAggregate,
name: &'stream_name str,
source: ValueSource,
generation: Generation,
position: AirPos,
) -> Self {
pub fn new(value: ValueAggregate, name: &'stream_name str, generation: Generation, position: AirPos) -> Self {
Self {
value,
name,
source,
generation,
position,
}
@ -113,18 +101,6 @@ pub(crate) struct StreamMaps {
// that a script could have a lot of new.
// TODO: use shared string (Rc<String>) to avoid copying.
stream_maps: HashMap<String, Vec<StreamMapDescriptor>>,
/// Contains stream generations from previous data that a restricted stream
/// should have at the scope start.
previous_restricted_stream_maps_gens: RestrictedStreamMapGens,
/// Contains stream generations from current data that a restricted stream
/// should have at the scope start.
current_restricted_stream_maps_gens: RestrictedStreamMapGens,
/// Contains stream generations that each private stream had at the scope end.
/// Then it's placed into data
new_restricted_stream_maps_gens: RestrictedStreamMapGens,
}
impl StreamMaps {
@ -144,17 +120,16 @@ impl StreamMaps {
&mut self,
key: StreamMapKey<'_>,
value_descriptor: StreamMapValueDescriptor<'_>,
) -> ExecutionResult<GenerationIdx> {
) {
let StreamMapValueDescriptor {
value,
name,
source,
generation,
position,
} = value_descriptor;
match self.get_mut(name, position) {
Some(stream_map) => stream_map.insert(key, &value, generation, source),
Some(stream_map) => stream_map.insert(key, &value, generation),
None => {
// streams could be created in three ways:
// - after met new instruction with stream name that isn't present in streams
@ -163,20 +138,18 @@ impl StreamMaps {
// for global streams
// - and by this function, and if there is no such a streams in streams,
// it means that a new global one should be created.
let stream_map = StreamMap::from_value(key, &value);
let mut stream_map = StreamMap::new();
stream_map.insert(key, &value, generation);
let descriptor = StreamMapDescriptor::global(stream_map);
self.stream_maps.insert(name.to_string(), vec![descriptor]);
let generation = 0;
Ok(generation.into())
}
}
}
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: usize) {
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span) {
let name = name.into();
let (prev_gens_count, current_gens_count) = self.stream_generation_from_data(&name, span.left, iteration);
let new_stream_map = StreamMap::from_generations_count(prev_gens_count, current_gens_count);
let new_stream_map = StreamMap::new();
let new_descriptor = StreamMapDescriptor::restricted(new_stream_map, span);
match self.stream_maps.entry(name) {
Occupied(mut entry) => {
@ -188,65 +161,28 @@ impl StreamMaps {
}
}
pub(crate) fn meet_scope_end(
&mut self,
name: String,
position: AirPos,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> {
pub(crate) fn meet_scope_end(&mut self, name: String, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
// unwraps are safe here because met_scope_end must be called after met_scope_start
let stream_map_descriptors = self.stream_maps.get_mut(&name).unwrap();
// delete a stream after exit from a scope
let last_descriptor = stream_map_descriptors.pop().unwrap();
let mut last_descriptor = stream_map_descriptors.pop().unwrap();
if stream_map_descriptors.is_empty() {
// streams should contain only non-empty stream embodiments
self.stream_maps.remove(&name);
}
let gens_count = last_descriptor.stream_map.compactify(trace_ctx)?;
self.collect_stream_generation(name, position, gens_count);
last_descriptor.stream_map.compactify(trace_ctx)
}
pub(crate) fn compactify(&mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
for (_, descriptors) in self.stream_maps.iter_mut() {
for descriptor in descriptors.iter_mut() {
descriptor.stream_map.compactify(trace_ctx)?;
}
}
Ok(())
}
fn stream_generation_from_data(
&self,
name: &str,
position: AirPos,
iteration: usize,
) -> (GenerationIdx, GenerationIdx) {
let previous_generation =
Self::restricted_stream_generation(&self.previous_restricted_stream_maps_gens, name, position, iteration)
.unwrap_or_default();
let current_generation =
Self::restricted_stream_generation(&self.current_restricted_stream_maps_gens, name, position, iteration)
.unwrap_or_default();
(previous_generation, current_generation)
}
fn restricted_stream_generation(
restricted_stream_maps_gens: &RestrictedStreamGens,
name: &str,
position: AirPos,
iteration: usize,
) -> Option<GenerationIdx> {
restricted_stream_maps_gens
.get(name)
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
.copied()
}
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: GenerationIdx) {
match self.new_restricted_stream_maps_gens.entry(name) {
Occupied(mut streams) => streams.get_mut().entry(position).or_default().push(generation),
Vacant(entry) => {
let iterations = maplit::hashmap! {
position => vec![generation],
};
entry.insert(iterations);
}
}
}
}
impl fmt::Display for StreamMaps {

View File

@ -16,18 +16,13 @@
mod stream_descriptor;
mod stream_value_descriptor;
mod utils;
use crate::execution_step::ExecutionResult;
use crate::execution_step::Stream;
use crate::ExecutionError;
use stream_descriptor::*;
pub(crate) use stream_value_descriptor::StreamValueDescriptor;
use air_interpreter_data::GenerationIdx;
use air_interpreter_data::GlobalStreamGens;
use air_interpreter_data::RestrictedStreamGens;
use air_parser::ast::Span;
use air_parser::AirPos;
use air_trace_handler::TraceHandler;
@ -41,34 +36,12 @@ pub(crate) struct Streams {
// that a script could have a lot of new.
// TODO: use shared string (Rc<String>) to avoid copying.
streams: HashMap<String, Vec<StreamDescriptor>>,
/// Contains stream generations from previous data that a restricted stream
/// should have at the scope start.
previous_restricted_stream_gens: RestrictedStreamGens,
/// Contains stream generations from current data that a restricted stream
/// should have at the scope start.
current_restricted_stream_gens: RestrictedStreamGens,
/// Contains stream generations that each private stream had at the scope end.
/// Then it's placed into data
new_restricted_stream_gens: RestrictedStreamGens,
}
impl Streams {
pub(crate) fn from_data(
previous_global_streams: GlobalStreamGens,
current_global_streams: GlobalStreamGens,
previous_restricted_stream_gens: RestrictedStreamGens,
current_restricted_stream_gens: RestrictedStreamGens,
) -> Self {
let streams = utils::merge_global_streams(previous_global_streams, current_global_streams);
pub(crate) fn new() -> Self {
Self {
streams,
previous_restricted_stream_gens,
current_restricted_stream_gens,
new_restricted_stream_gens: <_>::default(),
streams: <_>::default(),
}
}
@ -84,20 +57,16 @@ impl Streams {
.and_then(|descriptors| find_closest_mut(descriptors.iter_mut(), position))
}
pub(crate) fn add_stream_value(
&mut self,
value_descriptor: StreamValueDescriptor<'_>,
) -> ExecutionResult<GenerationIdx> {
pub(crate) fn add_stream_value(&mut self, value_descriptor: StreamValueDescriptor<'_>) {
let StreamValueDescriptor {
value,
name,
source,
generation,
position,
} = value_descriptor;
match self.get_mut(name, position) {
Some(stream) => stream.add_value(value, generation, source),
Some(stream) => stream.add_value(value, generation),
None => {
// streams could be created in three ways:
// - after met new instruction with stream name that isn't present in streams
@ -106,20 +75,18 @@ impl Streams {
// for global streams
// - and by this function, and if there is no such a streams in streams,
// it means that a new global one should be created.
let stream = Stream::from_value(value);
let mut stream = Stream::new();
stream.add_value(value, generation);
let descriptor = StreamDescriptor::global(stream);
self.streams.insert(name.to_string(), vec![descriptor]);
let generation = 0;
Ok(generation.into())
}
}
}
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: usize) {
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span) {
let name = name.into();
let (prev_gens_count, current_gens_count) = self.stream_generation_from_data(&name, span.left, iteration);
let new_stream = Stream::from_generations_count(prev_gens_count, current_gens_count);
let new_stream = Stream::new();
let new_descriptor = StreamDescriptor::restricted(new_stream, span);
match self.streams.entry(name) {
Occupied(mut entry) => {
@ -131,94 +98,28 @@ impl Streams {
}
}
pub(crate) fn meet_scope_end(
&mut self,
name: String,
position: AirPos,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> {
pub(crate) fn meet_scope_end(&mut self, name: String, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
// unwraps are safe here because met_scope_end must be called after met_scope_start
let stream_descriptors = self.streams.get_mut(&name).unwrap();
// delete a stream after exit from a scope
let last_descriptor = stream_descriptors.pop().unwrap();
let mut last_descriptor = stream_descriptors.pop().unwrap();
if stream_descriptors.is_empty() {
// streams should contain only non-empty stream embodiments
self.streams.remove(&name);
}
let gens_count = last_descriptor.stream.compactify(trace_ctx)?;
self.collect_stream_generation(name, position, gens_count);
last_descriptor.stream.compactify(trace_ctx)
}
pub(crate) fn compactify(&mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
for (_, descriptors) in self.streams.iter_mut() {
for descriptor in descriptors {
descriptor.stream.compactify(trace_ctx)?;
}
}
Ok(())
}
/// This method must be called at the end of execution, because it contains logic to collect
/// all global streams depending on their presence in a streams field.
pub(crate) fn into_streams_data(
self,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<(GlobalStreamGens, RestrictedStreamGens)> {
// since it's called at the end of execution, streams contains only global ones,
// because all private's been deleted after exiting a scope
let global_streams = self
.streams
.into_iter()
.map(|(name, mut descriptors)| -> Result<_, ExecutionError> {
// unwrap is safe here because of invariant that streams contains non-empty vectors,
// moreover it must contain only one value, because this method is called at the end
// of the execution
let stream = descriptors.pop().unwrap().stream;
let gens_count = stream.compactify(trace_ctx)?;
Ok((name, gens_count))
})
.collect::<Result<GlobalStreamGens, _>>()?;
Ok((global_streams, self.new_restricted_stream_gens))
}
fn stream_generation_from_data(
&self,
name: &str,
position: AirPos,
iteration: usize,
) -> (GenerationIdx, GenerationIdx) {
let previous_generation =
Self::restricted_stream_generation(&self.previous_restricted_stream_gens, name, position, iteration)
.unwrap_or_default();
let current_generation =
Self::restricted_stream_generation(&self.current_restricted_stream_gens, name, position, iteration)
.unwrap_or_default();
(previous_generation, current_generation)
}
fn restricted_stream_generation(
restricted_stream_gens: &RestrictedStreamGens,
name: &str,
position: AirPos,
iteration: usize,
) -> Option<GenerationIdx> {
restricted_stream_gens
.get(name)
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
.copied()
}
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: GenerationIdx) {
match self.new_restricted_stream_gens.entry(name) {
Occupied(mut streams) => match streams.get_mut().entry(position) {
Occupied(mut iterations) => iterations.get_mut().push(generation),
Vacant(entry) => {
entry.insert(vec![generation]);
}
},
Vacant(entry) => {
let iterations = maplit::hashmap! {
position => vec![generation],
};
entry.insert(iterations);
}
}
}
}
use std::fmt;

View File

@ -18,28 +18,19 @@ use crate::execution_step::Generation;
use crate::execution_step::ValueAggregate;
use air_parser::AirPos;
use air_trace_handler::merger::ValueSource;
pub(crate) struct StreamValueDescriptor<'stream_name> {
pub value: ValueAggregate,
pub name: &'stream_name str,
pub source: ValueSource,
pub generation: Generation,
pub position: AirPos,
}
impl<'stream_name> StreamValueDescriptor<'stream_name> {
pub fn new(
value: ValueAggregate,
name: &'stream_name str,
source: ValueSource,
generation: Generation,
position: AirPos,
) -> Self {
pub fn new(value: ValueAggregate, name: &'stream_name str, generation: Generation, position: AirPos) -> Self {
Self {
value,
name,
source,
generation,
position,
}

View File

@ -1,49 +0,0 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::StreamDescriptor;
use crate::execution_step::Stream;
use air_interpreter_data::GlobalStreamGens;
use std::collections::HashMap;
pub(super) fn merge_global_streams(
previous_global_streams: GlobalStreamGens,
current_global_streams: GlobalStreamGens,
) -> HashMap<String, Vec<StreamDescriptor>> {
let mut global_streams = previous_global_streams
.iter()
.map(|(stream_name, &prev_gens_count)| {
let current_gens_count = current_global_streams.get(stream_name).cloned().unwrap_or_default();
let global_stream = Stream::from_generations_count(prev_gens_count, current_gens_count);
let descriptor = StreamDescriptor::global(global_stream);
(stream_name.to_string(), vec![descriptor])
})
.collect::<HashMap<_, _>>();
for (stream_name, current_gens_count) in current_global_streams {
if previous_global_streams.contains_key(&stream_name) {
continue;
}
let global_stream = Stream::from_generations_count(0.into(), current_gens_count);
let descriptor = StreamDescriptor::global(global_stream);
global_streams.insert(stream_name, vec![descriptor]);
}
global_streams
}

View File

@ -30,7 +30,6 @@ use crate::JValue;
use apply_to_arguments::apply_to_arg;
use utils::*;
use air_interpreter_data::GenerationIdx;
use air_parser::ast;
use air_parser::ast::Ap;
use air_trace_handler::merger::MergerApResult;
@ -52,8 +51,8 @@ impl<'i> super::ExecutableInstruction<'i> for Ap<'i> {
)?;
let merger_ap_result = to_merger_ap_result(self, trace_ctx)?;
let maybe_generation = populate_context(&self.result, &merger_ap_result, result, exec_ctx)?;
maybe_update_trace(maybe_generation, trace_ctx);
populate_context(&self.result, &merger_ap_result, result, exec_ctx)?;
maybe_update_trace(should_touch_trace, trace_ctx);
Ok(())
}
@ -80,21 +79,24 @@ fn populate_context<'ctx>(
merger_ap_result: &MergerApResult,
result: ValueAggregate,
exec_ctx: &mut ExecutionCtx<'ctx>,
) -> ExecutionResult<Option<GenerationIdx>> {
) -> ExecutionResult<()> {
match ap_result {
ast::ApResult::Scalar(scalar) => exec_ctx.scalars.set_scalar_value(scalar.name, result).map(|_| None),
ast::ApResult::Scalar(scalar) => {
exec_ctx.scalars.set_scalar_value(scalar.name, result)?;
}
ast::ApResult::Stream(stream) => {
let value_descriptor = generate_value_descriptor(result, stream, merger_ap_result);
exec_ctx.streams.add_stream_value(value_descriptor).map(Some)
}
exec_ctx.streams.add_stream_value(value_descriptor);
}
};
Ok(())
}
fn maybe_update_trace(maybe_generation: Option<GenerationIdx>, trace_ctx: &mut TraceHandler) {
fn maybe_update_trace(should_touch_trace: bool, trace_ctx: &mut TraceHandler) {
use air_interpreter_data::ApResult;
if let Some(generation) = maybe_generation {
let final_ap_result = ApResult::new(generation);
trace_ctx.meet_ap_end(final_ap_result);
if should_touch_trace {
trace_ctx.meet_ap_end(ApResult::stub());
}
}

View File

@ -15,8 +15,11 @@
*/
use super::*;
use crate::execution_step::boxed_value::TracePosOperate;
use crate::execution_step::resolver::Resolvable;
use crate::execution_step::{CanonResultAggregate, LiteralAggregate, PEEK_ALLOWED_ON_NON_EMPTY};
use crate::execution_step::CanonResultAggregate;
use crate::execution_step::LiteralAggregate;
use crate::execution_step::PEEK_ALLOWED_ON_NON_EMPTY;
use crate::UncatchableError;
use air_interpreter_data::Provenance;

View File

@ -27,21 +27,12 @@ pub(super) fn generate_value_descriptor<'stream>(
stream: &'stream ast::Stream<'_>,
ap_result: &MergerApResult,
) -> StreamValueDescriptor<'stream> {
use air_trace_handler::merger::ValueSource;
match ap_result {
MergerApResult::NotMet => StreamValueDescriptor::new(
value,
stream.name,
ValueSource::PreviousData,
Generation::Last,
stream.position,
),
MergerApResult::NotMet => StreamValueDescriptor::new(value, stream.name, Generation::New, stream.position),
MergerApResult::Met(met_result) => StreamValueDescriptor::new(
value,
stream.name,
met_result.value_source,
Generation::Nth(met_result.generation),
Generation::from_met_result(met_result),
stream.position,
),
}
@ -52,21 +43,12 @@ pub(crate) fn generate_map_value_descriptor<'stream>(
stream: &'stream ast::StreamMap<'_>,
ap_result: &MergerApResult,
) -> StreamMapValueDescriptor<'stream> {
use air_trace_handler::merger::ValueSource;
match ap_result {
MergerApResult::NotMet => StreamMapValueDescriptor::new(
value,
stream.name,
ValueSource::PreviousData,
Generation::Last,
stream.position,
),
MergerApResult::NotMet => StreamMapValueDescriptor::new(value, stream.name, Generation::New, stream.position),
MergerApResult::Met(met_result) => StreamMapValueDescriptor::new(
value,
stream.name,
met_result.value_source,
Generation::Nth(met_result.generation),
Generation::from_met_result(met_result),
stream.position,
),
}

View File

@ -28,10 +28,9 @@ use crate::unsupported_map_key_type;
use crate::CatchableError;
use crate::ExecutionError;
use air_interpreter_data::GenerationIdx;
use air_interpreter_data::ApResult;
use air_parser::ast::ApMap;
use air_parser::ast::ApMapKey;
use air_parser::ast::Number;
use air_parser::ast::StreamMap;
use air_trace_handler::merger::MergerApResult;
@ -46,8 +45,8 @@ impl<'i> super::ExecutableInstruction<'i> for ApMap<'i> {
let merger_ap_result = to_merger_ap_map_result(&self, trace_ctx)?;
let key = resolve_if_needed(&self.key, exec_ctx, self.map.name)?;
let generation = populate_context(key, &self.map, &merger_ap_result, result, exec_ctx)?;
maybe_update_trace(generation, trace_ctx);
populate_context(key, &self.map, &merger_ap_result, result, exec_ctx);
trace_ctx.meet_ap_end(ApResult::stub());
Ok(())
}
@ -64,9 +63,9 @@ fn populate_context<'ctx>(
merger_ap_result: &MergerApResult,
result: ValueAggregate,
exec_ctx: &mut ExecutionCtx<'ctx>,
) -> ExecutionResult<GenerationIdx> {
) {
let value_descriptor = generate_map_value_descriptor(result, ap_map_result, merger_ap_result);
exec_ctx.stream_maps.add_stream_map_value(key, value_descriptor)
exec_ctx.stream_maps.add_stream_map_value(key, value_descriptor);
}
fn resolve_if_needed<'ctx>(
@ -74,6 +73,8 @@ fn resolve_if_needed<'ctx>(
exec_ctx: &mut ExecutionCtx<'ctx>,
map_name: &str,
) -> Result<StreamMapKey<'ctx>, ExecutionError> {
use air_parser::ast::Number;
match key {
&ApMapKey::Literal(s) => Ok(s.into()),
ApMapKey::Number(n) => match n {
@ -94,10 +95,3 @@ fn resolve<'ctx>(
let (value, _, _) = resolvable.resolve(exec_ctx)?;
StreamMapKey::from_value(value, map_name)
}
fn maybe_update_trace(generation: GenerationIdx, trace_ctx: &mut TraceHandler) {
use air_interpreter_data::ApResult;
let final_ap_result = ApResult::new(generation);
trace_ctx.meet_ap_end(final_ap_result);
}

View File

@ -58,16 +58,11 @@ pub(crate) fn populate_context_from_peer_service_result<'i>(
let executed_result = ValueAggregate::from_service_result(executed_result, service_result_agg_cid.clone());
let value_descriptor = StreamValueDescriptor::new(
executed_result,
stream.name,
ValueSource::PreviousData,
Generation::Last,
stream.position,
);
let generation = exec_ctx.streams.add_stream_value(value_descriptor)?;
let value_descriptor =
StreamValueDescriptor::new(executed_result, stream.name, Generation::New, stream.position);
exec_ctx.streams.add_stream_value(value_descriptor);
exec_ctx.record_call_cid(&*peer_id, &service_result_agg_cid);
Ok(CallResult::executed_stream(service_result_agg_cid, generation))
Ok(CallResult::executed_stream_stub(service_result_agg_cid))
}
CallOutputValue::None => {
let value_cid = value_to_json_cid(&*executed_result.result)
@ -86,41 +81,33 @@ pub(crate) fn populate_context_from_data<'i>(
value_source: ValueSource,
output: &CallOutputValue<'i>,
exec_ctx: &mut ExecutionCtx<'i>,
) -> ExecutionResult<ValueRef> {
) -> ExecutionResult<()> {
match (output, value) {
(CallOutputValue::Scalar(scalar), ValueRef::Scalar(cid)) => {
let value = exec_ctx.cid_state.resolve_service_value(&cid)?;
let result = ServiceResultAggregate::new(value, tetraplet, trace_pos);
let result = ValueAggregate::from_service_result(result, cid.clone());
let result = ValueAggregate::from_service_result(result, cid);
exec_ctx.scalars.set_scalar_value(scalar.name, result)?;
Ok(ValueRef::Scalar(cid))
}
(CallOutputValue::Stream(stream), ValueRef::Stream { cid, generation }) => {
let value = exec_ctx.cid_state.resolve_service_value(&cid)?;
let result = ServiceResultAggregate::new(value, tetraplet, trace_pos);
let result = ValueAggregate::from_service_result(result, cid.clone());
let value_descriptor = StreamValueDescriptor::new(
result,
stream.name,
value_source,
Generation::Nth(generation),
stream.position,
);
let resulted_generation = exec_ctx.streams.add_stream_value(value_descriptor)?;
let result = ValueRef::Stream {
cid,
generation: resulted_generation,
};
Ok(result)
let result = ValueAggregate::from_service_result(result, cid);
let generation = Generation::from_data(value_source, generation);
let value_descriptor = StreamValueDescriptor::new(result, stream.name, generation, stream.position);
exec_ctx.streams.add_stream_value(value_descriptor);
}
(CallOutputValue::None, value @ ValueRef::Unused(_)) => Ok(value),
(_, value) => Err(ExecutionError::Uncatchable(
(CallOutputValue::None, ValueRef::Unused(_)) => {}
(_, value) => {
return Err(ExecutionError::Uncatchable(
UncatchableError::CallResultNotCorrespondToInstr(value),
)),
))
}
}
Ok(())
}
/// Writes an executed state of a particle being sent to remote node.
pub(crate) fn handle_remote_call(peer_pk: String, exec_ctx: &mut ExecutionCtx<'_>, trace_ctx: &mut TraceHandler) {
exec_ctx.next_peer_pks.push(peer_pk);

View File

@ -101,8 +101,8 @@ pub(super) fn handle_prev_state<'i>(
Executed(value) => {
use air_interpreter_data::ValueRef;
let resulted_value = populate_context_from_data(
value,
populate_context_from_data(
value.clone(),
tetraplet.clone(),
met_result.trace_pos,
met_result.source,
@ -110,14 +110,14 @@ pub(super) fn handle_prev_state<'i>(
exec_ctx,
)?;
match &resulted_value {
match &value {
ValueRef::Scalar(ref cid) | ValueRef::Stream { ref cid, .. } => {
exec_ctx.record_call_cid(&*tetraplet.peer_pk, cid);
}
ValueRef::Unused(_) => {}
}
let call_result = CallResult::Executed(resulted_value);
let call_result = CallResult::Executed(value);
trace_ctx.meet_call_end(call_result);
Ok(StateDescriptor::executed())

View File

@ -23,7 +23,6 @@ use super::ExecutionResult;
use super::TraceHandler;
use crate::execution_step::boxed_value::CanonStream;
use crate::execution_step::boxed_value::CanonStreamWithProvenance;
use crate::execution_step::Generation;
use crate::log_instruction;
use crate::trace_to_exec_err;
@ -86,8 +85,7 @@ fn create_canon_stream_producer<'closure, 'name: 'closure>(
.map(Cow::Borrowed)
.unwrap_or_default();
// it's always possible to iter over all generations of a stream
let values = stream.iter(Generation::Last).unwrap().cloned().collect::<Vec<_>>();
let values = stream.iter().cloned().collect::<Vec<_>>();
CanonStream::from_values(values, peer_pk)
})
}

View File

@ -16,6 +16,7 @@
use super::*;
use crate::execution_step::boxed_value::populate_tetraplet_with_lambda;
use crate::execution_step::boxed_value::IterableValue;
use crate::execution_step::CatchableError;
use crate::execution_step::PEEK_ALLOWED_ON_NON_EMPTY;
use crate::JValue;
@ -31,8 +32,6 @@ use std::rc::Rc;
// TODO: refactor this file after switching to boxed value
pub(crate) type IterableValue = Box<dyn for<'ctx> Iterable<'ctx, Item = IterableItem<'ctx>>>;
pub(crate) enum FoldIterableScalar {
Empty,
ScalarBased(IterableValue),
@ -95,28 +94,6 @@ pub(crate) fn create_canon_stream_iterable_value<'ctx>(
Ok(FoldIterableScalar::ScalarBased(iterable))
}
/// Constructs iterable value for given stream iterable.
pub(crate) fn construct_stream_iterable_values(
stream: &Stream,
start: Generation,
end: Generation,
) -> Vec<IterableValue> {
let stream_iter = match stream.slice_iter(start, end) {
Some(stream_iter) => stream_iter,
None => return vec![],
};
stream_iter
.filter(|iterable| !iterable.is_empty())
.map(|iterable| {
let call_results = iterable.to_vec();
let foldable = IterableVecResolvedCall::init(call_results);
let foldable: IterableValue = Box::new(foldable);
foldable
})
.collect::<Vec<_>>()
}
/// Constructs iterable value from resolved call result.
fn from_value(call_result: ValueAggregate, variable_name: &str) -> ExecutionResult<FoldIterableScalar> {
let len = match call_result.get_result().deref() {

View File

@ -19,6 +19,7 @@ use super::ExecutableInstruction;
use super::ExecutionCtx;
use super::ExecutionResult;
use super::TraceHandler;
use crate::execution_step::boxed_value::IterableValue;
use crate::execution_step::Joinable;
use crate::joinable;
use crate::log_instruction;

View File

@ -15,10 +15,8 @@
*/
pub(super) mod completeness_updater;
mod stream_cursor;
pub(super) mod stream_execute_helpers;
use super::fold::*;
use super::ExecutableInstruction;
use super::ExecutionCtx;
use super::ExecutionResult;

View File

@ -1,42 +0,0 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air_interpreter_data::GenerationIdx;
use super::construct_stream_iterable_values;
use crate::execution_step::boxed_value::Generation;
use crate::execution_step::boxed_value::Stream;
use crate::execution_step::instructions::fold::IterableValue;
pub(super) struct StreamCursor {
last_seen_generation: GenerationIdx,
}
impl StreamCursor {
pub(super) fn new() -> Self {
Self {
last_seen_generation: GenerationIdx::from(0),
}
}
pub(super) fn construct_iterables(&mut self, stream: &Stream) -> Vec<IterableValue> {
let iterables =
construct_stream_iterable_values(stream, Generation::Nth(self.last_seen_generation), Generation::Last);
self.last_seen_generation = stream.last_non_empty_generation();
iterables
}
}

View File

@ -15,13 +15,14 @@
*/
use super::completeness_updater::FoldGenerationObserver;
use super::stream_cursor::StreamCursor;
use super::ExecutionCtx;
use super::ExecutionResult;
use super::TraceHandler;
use crate::execution_step::boxed_value::IterableValue;
use crate::execution_step::boxed_value::RecursiveCursorState;
use crate::execution_step::boxed_value::RecursiveStreamCursor;
use crate::execution_step::boxed_value::Stream;
use crate::execution_step::instructions::fold::IterableType;
use crate::execution_step::instructions::fold::IterableValue;
use crate::execution_step::instructions::fold_scalar::fold;
use crate::trace_to_exec_err;
@ -29,14 +30,14 @@ use air_parser::ast::Instruction;
use std::rc::Rc;
struct FoldStreamLikeIngredients<'i> {
struct FoldStreamIngredients<'i> {
iterable_name: &'i str,
instruction: Rc<Instruction<'i>>,
last_instruction: Option<Rc<Instruction<'i>>>,
fold_id: u32,
}
impl<'i> FoldStreamLikeIngredients<'i> {
impl<'i> FoldStreamIngredients<'i> {
fn new(
iterable_name: &'i str,
instruction: Rc<Instruction<'i>>,
@ -65,18 +66,16 @@ pub(crate) fn execute_with_stream<'i>(
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), fold_to_string)?;
let mut stream_cursor = StreamCursor::new();
let mut stream_iterable = stream_cursor.construct_iterables(get_mut_stream(exec_ctx));
let mut recursive_stream = RecursiveStreamCursor::new();
let mut cursor_state = recursive_stream.met_fold_start(get_mut_stream(exec_ctx));
let mut observer = FoldGenerationObserver::new();
// this cycle manages recursive streams
while !stream_iterable.is_empty() {
// add a new generation to made all consequence "new" (meaning that they are just executed on this peer)
// write operation to this stream to write to this new generation
add_new_generation_if_non_empty(get_mut_stream(exec_ctx));
while let RecursiveCursorState::Continue(iterables) = cursor_state {
let ingredients =
FoldStreamLikeIngredients::new(iterable_name, instruction.clone(), last_instruction.clone(), fold_id);
FoldStreamIngredients::new(iterable_name, instruction.clone(), last_instruction.clone(), fold_id);
execute_iterations(
stream_iterable,
iterables,
fold_to_string,
ingredients,
&mut observer,
@ -84,11 +83,7 @@ pub(crate) fn execute_with_stream<'i>(
trace_ctx,
)?;
// it's needed to get stream again, because RefCell allows only one mutable borrowing at time,
// and likely that stream could be mutably borrowed in execute_iterations
let stream = remove_new_generation_if_non_empty(get_mut_stream(exec_ctx));
stream_iterable = stream_cursor.construct_iterables(stream)
cursor_state = recursive_stream.met_iteration_end(get_mut_stream(exec_ctx));
}
observer.update_completeness(exec_ctx);
@ -104,7 +99,7 @@ pub(crate) fn execute_with_stream<'i>(
fn execute_iterations<'i>(
iterables: Vec<IterableValue>,
fold_to_string: &impl ToString,
ingredients: FoldStreamLikeIngredients<'i>,
ingredients: FoldStreamIngredients<'i>,
generation_observer: &mut FoldGenerationObserver,
exec_ctx: &mut ExecutionCtx<'i>,
trace_ctx: &mut TraceHandler,
@ -139,17 +134,6 @@ fn execute_iterations<'i>(
Ok(())
}
/// Safety: this function should be called iff stream is present in context
pub(super) fn add_new_generation_if_non_empty(stream: &mut Stream) {
stream.add_new_generation_if_non_empty();
}
/// Safety: this function should be called iff stream is present in context
pub(super) fn remove_new_generation_if_non_empty(stream: &mut Stream) -> &Stream {
stream.remove_last_generation_if_empty();
stream
}
/// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be
/// not deterministic.
pub(super) fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> {

View File

@ -22,8 +22,6 @@ use crate::log_instruction;
use air_parser::ast::New;
use air_parser::ast::NewArgument;
use std::convert::TryFrom;
impl<'i> super::ExecutableInstruction<'i> for New<'i> {
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(new, exec_ctx, trace_ctx);
@ -51,18 +49,8 @@ impl<'i> super::ExecutableInstruction<'i> for New<'i> {
fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
let position = new.span.left;
match &new.argument {
NewArgument::Stream(stream) => {
let iteration = exec_ctx.tracker.new_tracker.get_iteration(position);
let iteration = usize::try_from(iteration).unwrap();
exec_ctx.streams.meet_scope_start(stream.name, new.span, iteration);
}
NewArgument::StreamMap(stream_map) => {
let iteration = exec_ctx.tracker.new_tracker.get_iteration(position);
let iteration = usize::try_from(iteration).unwrap();
exec_ctx
.stream_maps
.meet_scope_start(stream_map.name, new.span, iteration);
}
NewArgument::Stream(stream) => exec_ctx.streams.meet_scope_start(stream.name, new.span),
NewArgument::StreamMap(stream_map) => exec_ctx.stream_maps.meet_scope_start(stream_map.name, new.span),
NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_start_scalar(scalar.name.to_string()),
NewArgument::CanonStream(canon_stream) => exec_ctx
.scalars
@ -73,16 +61,11 @@ fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
}
fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
let position = new.span.left;
match &new.argument {
NewArgument::Stream(stream) => exec_ctx
.streams
.meet_scope_end(stream.name.to_string(), position, trace_ctx),
NewArgument::StreamMap(stream_map) => {
exec_ctx
NewArgument::Stream(stream) => exec_ctx.streams.meet_scope_end(stream.name.to_string(), trace_ctx),
NewArgument::StreamMap(stream_map) => exec_ctx
.stream_maps
.meet_scope_end(stream_map.name.to_string(), position, trace_ctx)
}
.meet_scope_end(stream_map.name.to_string(), trace_ctx),
NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_end_scalar(scalar.name),
NewArgument::CanonStream(canon_stream) => exec_ctx.scalars.meet_new_end_canon_stream(canon_stream.name),
}

View File

@ -15,15 +15,18 @@
*/
use crate::ToErrorCode;
use air_interpreter_interface::CallResults;
use air_interpreter_interface::CallResults;
use fluence_keypair::error::SigningError;
use strum::EnumCount;
use strum::IntoEnumIterator;
use strum_macros::EnumCount as EnumCountMacro;
use strum_macros::EnumDiscriminants;
use strum_macros::EnumIter;
use thiserror::Error as ThisError;
/// Errors happened during the interpreter farewell step.
#[derive(Debug, EnumDiscriminants, ThisError)]
#[derive(Debug, EnumDiscriminants, EnumCountMacro, ThisError)]
#[strum_discriminants(derive(EnumIter))]
pub enum FarewellError {
/// Call results should be empty at the end of execution thanks to a execution invariant.
@ -39,3 +42,9 @@ impl ToErrorCode for FarewellError {
crate::generate_to_error_code!(self, FarewellError, FAREWELL_ERRORS_START_ID)
}
}
impl ToErrorCode for SigningError {
fn to_error_code(&self) -> i64 {
crate::utils::FAREWELL_ERRORS_START_ID + FarewellError::COUNT as i64
}
}

View File

@ -25,6 +25,7 @@ use crate::INTERPRETER_SUCCESS;
use air_interpreter_data::InterpreterData;
use air_interpreter_interface::CallRequests;
use air_utils::measure;
use fluence_keypair::error::SigningError;
use fluence_keypair::KeyPair;
use std::fmt::Debug;
@ -90,26 +91,18 @@ fn populate_outcome_from_contexts(
error_message: String,
keypair: &KeyPair,
) -> InterpreterOutcome {
let maybe_gens = exec_ctx
.streams
.into_streams_data(&mut trace_handler)
.map_err(execution_error_into_outcome);
let (global_streams, restricted_streams) = match maybe_gens {
Ok(gens) => gens,
match compactify_streams(&mut exec_ctx, &mut trace_handler) {
Ok(()) => {}
Err(outcome) => return outcome,
};
let current_signature = exec_ctx
.signature_tracker
.into_signature(&exec_ctx.run_parameters.current_peer_id, keypair)
.expect("siging shouldn't fail");
let current_pubkey = keypair.public();
exec_ctx.signature_store.put(current_pubkey.into(), current_signature);
match sign_result(&mut exec_ctx, keypair) {
Ok(()) => {}
Err(outcome) => return outcome,
};
let data = InterpreterData::from_execution_result(
trace_handler.into_result_trace(),
global_streams,
restricted_streams,
exec_ctx.cid_state.into(),
exec_ctx.signature_store,
exec_ctx.last_call_request_id,
@ -120,22 +113,46 @@ fn populate_outcome_from_contexts(
tracing::Level::TRACE,
"serde_json::to_vec(data)"
);
let next_peer_pks = dedup(exec_ctx.next_peer_pks);
let call_requests = measure!(
serde_json::to_vec(&exec_ctx.call_requests).expect("default serializer shouldn't fail"),
tracing::Level::TRACE,
"serde_json::to_vec(call_results)",
);
InterpreterOutcome::new(ret_code, error_message, data, next_peer_pks, call_requests)
}
// this method is called only if there is an internal error in the interpreter and
fn compactify_streams(exec_ctx: &mut ExecutionCtx<'_>, trace_ctx: &mut TraceHandler) -> Result<(), InterpreterOutcome> {
exec_ctx
.streams
.compactify(trace_ctx)
.and_then(|_| exec_ctx.stream_maps.compactify(trace_ctx))
.map_err(execution_error_into_outcome)
}
fn sign_result(exec_ctx: &mut ExecutionCtx<'_>, keypair: &KeyPair) -> Result<(), InterpreterOutcome> {
let current_signature = exec_ctx
.signature_tracker
.into_signature(&exec_ctx.run_parameters.current_peer_id, keypair)
.map_err(signing_error_into_outcome)?;
let current_pubkey = keypair.public();
exec_ctx.signature_store.put(current_pubkey.into(), current_signature);
Ok(())
}
// these methods are called only if there is an internal error in the interpreter and
// new execution trace was corrupted
fn execution_error_into_outcome(error: ExecutionError) -> InterpreterOutcome {
InterpreterOutcome::new(error.to_error_code(), error.to_string(), vec![], vec![], vec![])
}
fn signing_error_into_outcome(error: SigningError) -> InterpreterOutcome {
InterpreterOutcome::new(error.to_error_code(), error.to_string(), vec![], vec![], vec![])
}
/// Deduplicate values in a supplied vector.
fn dedup<T: Eq + Hash + Debug>(mut vec: Vec<T>) -> Vec<T> {
use std::collections::HashSet;

View File

@ -54,17 +54,13 @@ pub(crate) fn prepare<'i>(
let air: Instruction<'i> = *air_parser::parse(raw_air).map_err(PreparationError::AIRParseError)?;
let prev_ingredients = ExecCtxIngredients {
global_streams: prev_data.global_streams,
last_call_request_id: prev_data.last_call_request_id,
restricted_streams: prev_data.restricted_streams,
cid_info: prev_data.cid_info,
signature_store: prev_data.signatures,
};
let current_ingredients = ExecCtxIngredients {
global_streams: current_data.global_streams,
last_call_request_id: current_data.last_call_request_id,
restricted_streams: current_data.restricted_streams,
cid_info: current_data.cid_info,
signature_store: current_data.signatures,
};

View File

@ -305,17 +305,6 @@ fn fold_merge() {
);
let data = InterpreterData::try_from_slice(&result_7.data).expect("data should be well-formed");
let stream_1_generations = data
.global_streams
.get("$stream_1")
.expect("$stream_1 should be present in data");
let stream_2_generations = data
.global_streams
.get("$stream_2")
.expect("$stream_2 should be present in data");
assert_eq!(*stream_1_generations, 8);
assert_eq!(*stream_2_generations, 6);
let mut fold_states_count = 0;
let mut calls_count = HashMap::new();

View File

@ -0,0 +1,164 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air::ExecutionCidState;
use air_test_framework::AirScriptExecutor;
use air_test_utils::prelude::TestRunParameters;
use air_test_utils::*;
#[test]
fn global_streams_are_compactified() {
let peer_id = "peer_id";
let service_result = "service_result";
let script = format!(
r#"
(seq
(ap 1 $stream)
(call "{peer_id}" ("" "") [] $stream) ; ok = "{service_result}"
)
"#
);
let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(peer_id), &script).unwrap();
let result = executor.execute_all(peer_id).unwrap();
let actual_trace = trace_from_result(result.last().unwrap());
let mut cid_state = ExecutionCidState::new();
let expected_trace = vec![
executed_state::ap(0),
stream_tracked!(
service_result,
1,
cid_state,
peer = peer_id,
service = "..0",
function = ""
),
];
assert_eq!(&actual_trace, &expected_trace);
}
#[test]
fn global_stream_maps_are_compactified() {
let peer_id = "peer_id";
let service_result = "service_result";
let script = format!(
r#"
(seq
(ap (1 1) %stream_map)
(seq
(call "{peer_id}" ("" "") [] $stream) ; ok = "{service_result}"
(ap (1 1) %stream_map)
)
)
"#
);
let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(peer_id), &script).unwrap();
let result = executor.execute_all(peer_id).unwrap();
let actual_trace = trace_from_result(result.last().unwrap());
let mut cid_state = ExecutionCidState::new();
let expected_trace = vec![
executed_state::ap(0),
stream_tracked!(
service_result,
0,
cid_state,
peer = peer_id,
service = "..0",
function = ""
),
executed_state::ap(1),
];
assert_eq!(&actual_trace, &expected_trace);
}
#[test]
fn local_streams_are_compactified() {
let peer_id = "peer_id";
let service_result = "service_result";
let script = format!(
r#"
(new $stream
(seq
(ap 1 $stream)
(call "{peer_id}" ("" "") [] $stream) ; ok = "{service_result}"
)
)
"#
);
let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(peer_id), &script).unwrap();
let result = executor.execute_all(peer_id).unwrap();
let actual_trace = trace_from_result(result.last().unwrap());
let mut cid_state = ExecutionCidState::new();
let expected_trace = vec![
executed_state::ap(0),
stream_tracked!(
service_result,
1,
cid_state,
peer = peer_id,
service = "..0",
function = ""
),
];
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn local_stream_maps_are_compactified() {
let peer_id = "peer_id";
let service_result = "service_result";
let script = format!(
r#"
(new $stream
(seq
(ap (1 1) %stream_map)
(seq
(call "{peer_id}" ("" "") [] $stream) ; ok = "{service_result}"
(ap (1 1) %stream_map)
)
)
)
"#
);
let executor = AirScriptExecutor::from_annotated(TestRunParameters::from_init_peer_id(peer_id), &script).unwrap();
let result = executor.execute_all(peer_id).unwrap();
let actual_trace = trace_from_result(result.last().unwrap());
let mut cid_state = ExecutionCidState::new();
let expected_trace = vec![
executed_state::ap(0),
stream_tracked!(
service_result,
0,
cid_state,
peer = peer_id,
service = "..0",
function = ""
),
executed_state::ap(1),
];
assert_eq!(actual_trace, expected_trace);
}

View File

@ -15,6 +15,7 @@
*/
mod ap_with_fold;
mod compactification;
mod merging;
mod recursive_streams;
mod streams;

View File

@ -131,11 +131,12 @@ fn recursive_stream_many_iterations() {
executed_state::subtrace_lore(1, subtrace_desc(5, 2), subtrace_desc(7, 0)),
executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(11, 0)),
executed_state::subtrace_lore(6, subtrace_desc(9, 2), subtrace_desc(11, 0)),
executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(15, 0)),
executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(13, 0)),
executed_state::subtrace_lore(10, subtrace_desc(13, 2), subtrace_desc(15, 0)),
executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(19, 0)),
executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(17, 0)),
executed_state::subtrace_lore(14, subtrace_desc(17, 2), subtrace_desc(19, 0)),
executed_state::subtrace_lore(16, subtrace_desc(19, 1), subtrace_desc(20, 0)),
executed_state::subtrace_lore(18, subtrace_desc(20, 1), subtrace_desc(21, 0)),
]);
let expected_fold_v2 = executed_state::fold(vec![
@ -143,15 +144,18 @@ fn recursive_stream_many_iterations() {
executed_state::subtrace_lore(1, subtrace_desc(5, 2), subtrace_desc(7, 0)),
executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(11, 0)),
executed_state::subtrace_lore(6, subtrace_desc(9, 2), subtrace_desc(11, 0)),
executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(15, 0)),
executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(13, 0)),
executed_state::subtrace_lore(10, subtrace_desc(13, 2), subtrace_desc(15, 0)),
executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(18, 0)),
executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(17, 0)),
executed_state::subtrace_lore(14, subtrace_desc(17, 1), subtrace_desc(18, 0)),
executed_state::subtrace_lore(16, subtrace_desc(18, 2), subtrace_desc(20, 0)),
executed_state::subtrace_lore(19, subtrace_desc(20, 1), subtrace_desc(21, 0)),
]);
let test_passed = (actual_fold == &expected_fold_v1) || (actual_fold == &expected_fold_v2);
if !test_passed {
print_trace(&result, "");
}
assert!(test_passed);
let actual_last_state = actual_trace.last().unwrap();
@ -378,7 +382,7 @@ fn recursive_stream_inner_fold() {
"#
);
let result = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
let result = call_vm!(vm_1, <_>::default(), &script, "", "");
let result = checked_call_vm!(vm_2, <_>::default(), script, "", result.data);
let actual_trace = trace_from_result(&result);

View File

@ -15,7 +15,6 @@
*/
use air_interpreter_data::ExecutionTrace;
use air_parser::AirPos;
use air_test_utils::prelude::*;
use pretty_assertions::assert_eq;
@ -92,15 +91,6 @@ fn new_with_global_streams_seq() {
scalar!(json!([1, 2]), peer = local_vm_peer_id_1, args = [json!([1, 2])]),
];
assert_eq!(actual_trace, expected_trace);
let data = data_from_result(&vm_2_result);
let actual_restricted_streams = data.restricted_streams;
let expected_restricted_streams = maplit::hashmap! {
"$stream".to_string() => maplit::hashmap! {
AirPos::from(282) => vec![1.into(), 1.into()]
}
};
assert_eq!(actual_restricted_streams, expected_restricted_streams);
}
#[test]
@ -368,15 +358,6 @@ fn new_in_fold_with_ap() {
scalar!(json!(["none"]), peer = vm_peer_id, args = [json!(["none"])]),
];
assert_eq!(actual_trace, expected_trace);
let data = data_from_result(&result);
let actual_restricted_streams = data.restricted_streams;
let expected_restricted_streams = maplit::hashmap! {
"$s1".to_string() => maplit::hashmap! {
AirPos::from(146) => vec![1.into(), 1.into(), 1.into(), 1.into(), 1.into()]
}
};
assert_eq!(actual_restricted_streams, expected_restricted_streams);
}
#[test]
@ -421,25 +402,6 @@ fn new_with_streams_with_errors() {
),
];
assert_eq!(actual_trace, expected_trace);
let data = data_from_result(&result);
let actual_restricted_streams = data.restricted_streams;
let expected_restricted_streams = maplit::hashmap! {
"$restricted_stream_2".to_string() => maplit::hashmap! {
AirPos::from(216) => vec![1.into()]
},
"$restricted_stream_1".to_string() => maplit::hashmap! {
AirPos::from(141) => vec![0.into()]
}
};
assert_eq!(actual_restricted_streams, expected_restricted_streams);
let actual_global_streams = data.global_streams;
let expected_global_streams = maplit::hashmap! {
"$global_stream".to_string() => 1.into(),
};
assert_eq!(actual_global_streams, expected_global_streams);
}
#[test]

View File

@ -14,7 +14,6 @@
* limitations under the License.
*/
use air_parser::AirPos;
use air_test_utils::prelude::*;
#[test]
@ -92,13 +91,4 @@ fn issue_173() {
scalar!(json!([1, 2])),
];
assert_eq!(actual_trace, expected_trace);
let data = data_from_result(&vm_2_result);
let actual_restricted_streams = data.restricted_streams;
let expected_restricted_streams = maplit::hashmap! {
"$stream".to_string() => maplit::hashmap! {
AirPos::from(282) => vec![1.into(), 1.into()]
}
};
assert_eq!(actual_restricted_streams, expected_restricted_streams);
}

View File

@ -17,7 +17,6 @@
use air_test_utils::prelude::*;
#[test]
#[ignore] // will be resolved in github.com/fluencelabs/aquavm/pull/621
fn issue_642() {
let peer_id_1 = "peer_id_1";
let peer_id_2 = "peer_id_2";

View File

@ -17,7 +17,6 @@
use air_test_utils::prelude::*;
#[test]
#[ignore] // will be resolved in github.com/fluencelabs/aquavm/pull/621
fn issue_644() {
let peer_id_1 = "peer_id_1";
let peer_id_2 = "peer_id_2";

View File

@ -316,8 +316,6 @@ fn invalid_dst_generations() {
<_>::default(),
<_>::default(),
<_>::default(),
<_>::default(),
<_>::default(),
semver::Version::new(1, 1, 1),
);
let mut data_value = serde_json::to_value(&empty_data).unwrap();

View File

@ -47,10 +47,8 @@ impl CallResult {
Self::executed_service_result(ValueRef::Scalar(service_result_agg_cid))
}
pub fn executed_stream(
cid: Rc<CID<ServiceResultCidAggregate>>,
generation: GenerationIdx,
) -> CallResult {
pub fn executed_stream_stub(cid: Rc<CID<ServiceResultCidAggregate>>) -> CallResult {
let generation = GenerationIdx::stub();
let value = ValueRef::Stream { cid, generation };
CallResult::Executed(value)
}
@ -102,6 +100,12 @@ impl ApResult {
res_generations: vec![res_generation],
}
}
pub fn stub() -> Self {
Self {
res_generations: vec![GenerationIdx::stub()],
}
}
}
impl CanonResult {

View File

@ -46,6 +46,11 @@ impl GenerationIdx {
// TODO: check for overflow
Self::from(self.0 as usize - 1)
}
pub fn stub() -> Self {
const GENERATION_STUB: GenerationIdxType = 0xCAFEBABE;
Self(GENERATION_STUB)
}
}
impl PartialOrd<usize> for GenerationIdx {

View File

@ -14,8 +14,6 @@
* limitations under the License.
*/
use super::GlobalStreamGens;
use super::RestrictedStreamGens;
use crate::cid_store::CidStore;
use crate::CanonCidAggregate;
use crate::CanonResultCidAggregate;
@ -43,18 +41,6 @@ pub struct InterpreterData {
/// Trace of AIR execution, which contains executed call, par, fold, and ap states.
pub trace: ExecutionTrace,
/// Contains maximum generation for each global stream. This info will be used while merging
/// values in streams. This field is also needed for backward compatibility with
/// <= 0.2.1 versions.
#[serde(rename = "streams")] // for compatibility with versions <= 0.2.1
pub global_streams: GlobalStreamGens,
/// Contains maximum generation for each private stream. This info will be used while merging
/// values in streams.
#[serde(default)]
#[serde(rename = "r_streams")]
pub restricted_streams: RestrictedStreamGens,
/// Last exposed to a peer call request id. All next call request ids will be bigger than this.
#[serde(default)]
#[serde(rename = "lcid")]
@ -87,9 +73,7 @@ impl InterpreterData {
Self {
versions,
trace: ExecutionTrace::default(),
global_streams: GlobalStreamGens::new(),
last_call_request_id: 0,
restricted_streams: RestrictedStreamGens::new(),
cid_info: <_>::default(),
signatures: <_>::default(),
}
@ -98,8 +82,6 @@ impl InterpreterData {
#[allow(clippy::too_many_arguments)]
pub fn from_execution_result(
trace: ExecutionTrace,
streams: GlobalStreamGens,
restricted_streams: RestrictedStreamGens,
cid_info: CidInfo,
signatures: SignatureStore,
last_call_request_id: u32,
@ -110,9 +92,7 @@ impl InterpreterData {
Self {
versions,
trace,
global_streams: streams,
last_call_request_id,
restricted_streams,
cid_info,
signatures,
}

View File

@ -30,7 +30,6 @@ mod cid_store;
mod executed_state;
mod generation_idx;
mod interpreter_data;
mod stream_generations;
mod trace;
mod trace_pos;
@ -38,7 +37,6 @@ pub use cid_store::*;
pub use executed_state::*;
pub use generation_idx::*;
pub use interpreter_data::*;
pub use stream_generations::*;
pub use trace::*;
pub use trace_pos::*;

View File

@ -1,36 +0,0 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::GenerationIdx;
use air_parser::AirPos;
use std::collections::HashMap;
/// Mapping from a stream name to it's generation count.
/// Similar to pi-calculus non-restricted names/channels.
pub type GlobalStreamGens = HashMap<String, GenerationIdx>;
pub type GlobalStreamMapGens = GlobalStreamGens;
/// Mapping from a stream name to
/// position of a new instruction in a script that creates a scope for a stream
/// to vector where each position represents a corresponding iteration.
///
/// Vec<u32> is needed because a new instruction could be placed into a fold instruction,
/// so it could be met several times during script execution. This field anchors iteration
/// where it was met.
/// Similar to pi-calculus restricted names/channels.
pub type RestrictedStreamGens = HashMap<String, HashMap<AirPos, Vec<GenerationIdx>>>;
pub type RestrictedStreamMapGens = RestrictedStreamGens;

View File

@ -93,8 +93,6 @@ pub fn raw_data_from_trace(
) -> Vec<u8> {
let data = InterpreterData::from_execution_result(
trace.into(),
<_>::default(),
<_>::default(),
cid_state.into(),
<_>::default(),
0,
@ -109,8 +107,6 @@ pub fn raw_data_from_trace_with_canon(
) -> Vec<u8> {
let data = InterpreterData::from_execution_result(
trace.into(),
<_>::default(),
<_>::default(),
CidInfo {
value_store: cid_state.value_tracker.into(),
tetraplet_store: cid_state.tetraplet_tracker.into(),

View File

@ -40,7 +40,7 @@ pub enum TraceHandlerError {
#[derive(ThisError, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum GenerationCompatificationError {
pub enum GenerationCompactificationError {
#[error("trying to change generation of an invalid trace position {0}")]
TracePosPointsToNowhere(TracePos),
@ -50,13 +50,13 @@ pub enum GenerationCompatificationError {
TracePosPointsToInvalidState { position: TracePos, state: ExecutedState },
}
impl GenerationCompatificationError {
impl GenerationCompactificationError {
pub fn points_to_nowhere(position: TracePos) -> Self {
GenerationCompatificationError::TracePosPointsToNowhere(position)
GenerationCompactificationError::TracePosPointsToNowhere(position)
}
pub fn points_to_invalid_state(position: TracePos, state: ExecutedState) -> Self {
GenerationCompatificationError::TracePosPointsToInvalidState { position, state }
GenerationCompactificationError::TracePosPointsToInvalidState { position, state }
}
}

View File

@ -64,12 +64,12 @@ impl TraceHandler {
&mut self,
trace_pos: TracePos,
generation: GenerationIdx,
) -> Result<(), GenerationCompatificationError> {
) -> Result<(), GenerationCompactificationError> {
let state = self
.data_keeper
.result_trace
.get_mut(trace_pos)
.ok_or_else(|| GenerationCompatificationError::points_to_nowhere(trace_pos))?;
.ok_or_else(|| GenerationCompactificationError::points_to_nowhere(trace_pos))?;
match state {
ExecutedState::Ap(ap_result) => ap_result.res_generations = vec![generation],
@ -78,7 +78,7 @@ impl TraceHandler {
..
})) => *call_generation = generation,
state => {
return Err(GenerationCompatificationError::points_to_invalid_state(
return Err(GenerationCompactificationError::points_to_invalid_state(
trace_pos,
state.clone(),
))

View File

@ -33,7 +33,7 @@ pub mod merger;
mod state_automata;
pub use data_keeper::KeeperError;
pub use errors::GenerationCompatificationError;
pub use errors::GenerationCompactificationError;
pub use errors::IntConversionError;
pub use errors::TraceHandlerError;
pub use handler::TraceHandler;