mirror of
https://github.com/fluencelabs/aquavm
synced 2024-12-05 07:30:18 +00:00
feat(air): introduce explicit types for generation numbers (#530)
--------- Co-authored-by: vms <michail.vms@gmail.com> Co-authored-by: Anatolios Laskaris <github_me@nahsi.dev>
This commit is contained in:
parent
3027f0be26
commit
d62fa6fe60
@ -20,6 +20,7 @@ use crate::ExecutionError;
|
||||
use crate::JValue;
|
||||
use crate::UncatchableError;
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
use air_trace_handler::merger::ValueSource;
|
||||
use air_trace_handler::TraceHandler;
|
||||
|
||||
@ -38,17 +39,16 @@ pub struct Stream {
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
pub(crate) fn from_generations_count(previous_count: usize, current_count: usize) -> Self {
|
||||
let last_generation_count = 1;
|
||||
pub(crate) fn from_generations_count(previous_count: GenerationIdx, current_count: GenerationIdx) -> Self {
|
||||
let last_generation_count = GenerationIdx::from(1);
|
||||
// TODO: bubble up an overflow error instead of expect
|
||||
let overall_count = previous_count
|
||||
.checked_add(current_count)
|
||||
.and_then(|value| value.checked_add(last_generation_count))
|
||||
.expect("it shouldn't overflow");
|
||||
|
||||
Self {
|
||||
values: vec![vec![]; overall_count],
|
||||
previous_gens_count: previous_count,
|
||||
values: vec![vec![]; overall_count.into()],
|
||||
previous_gens_count: previous_count.into(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,11 +68,13 @@ impl Stream {
|
||||
value: ValueAggregate,
|
||||
generation: Generation,
|
||||
source: ValueSource,
|
||||
) -> ExecutionResult<u32> {
|
||||
) -> ExecutionResult<GenerationIdx> {
|
||||
let generation_number = match (generation, source) {
|
||||
(Generation::Last, _) => self.values.len() - 1,
|
||||
(Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen as usize,
|
||||
(Generation::Nth(current_gen), ValueSource::CurrentData) => self.previous_gens_count + current_gen as usize,
|
||||
(Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen.into(),
|
||||
(Generation::Nth(current_gen), ValueSource::CurrentData) => {
|
||||
self.previous_gens_count + usize::from(current_gen)
|
||||
}
|
||||
};
|
||||
|
||||
if generation_number >= self.values.len() {
|
||||
@ -84,9 +86,10 @@ impl Stream {
|
||||
}
|
||||
|
||||
self.values[generation_number].push(value);
|
||||
Ok(generation_number as u32)
|
||||
Ok(generation_number.into())
|
||||
}
|
||||
|
||||
// TODO: remove this function
|
||||
pub(crate) fn generations_count(&self) -> usize {
|
||||
// the last generation could be empty due to the logic of from_generations_count ctor
|
||||
if self.values.last().unwrap().is_empty() {
|
||||
@ -96,7 +99,7 @@ impl Stream {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn last_non_empty_generation(&self) -> usize {
|
||||
pub(crate) fn last_non_empty_generation(&self) -> GenerationIdx {
|
||||
self.values
|
||||
.iter()
|
||||
.rposition(|generation| !generation.is_empty())
|
||||
@ -104,6 +107,7 @@ impl Stream {
|
||||
// there is a new state was added with add_new_generation_if_non_empty
|
||||
.map(|non_empty_gens| non_empty_gens + 1)
|
||||
.unwrap_or_else(|| self.generations_count())
|
||||
.into()
|
||||
}
|
||||
|
||||
/// Add a new empty generation if the latest isn't empty.
|
||||
@ -133,10 +137,13 @@ impl Stream {
|
||||
should_remove_generation
|
||||
}
|
||||
|
||||
pub(crate) fn elements_count(&self, generation: Generation) -> Option<usize> {
|
||||
pub(crate) fn generation_elements_count(&self, generation: Generation) -> Option<usize> {
|
||||
match generation {
|
||||
Generation::Nth(generation) if generation as usize > self.generations_count() => None,
|
||||
Generation::Nth(generation) => Some(self.values.iter().take(generation as usize).map(|v| v.len()).sum()),
|
||||
Generation::Nth(generation) if generation > self.generations_count() => None,
|
||||
Generation::Nth(generation) => {
|
||||
let elements_count = generation.into();
|
||||
Some(self.values.iter().take(elements_count).map(|v| v.len()).sum())
|
||||
}
|
||||
Generation::Last => Some(self.values.iter().map(|v| v.len()).sum()),
|
||||
}
|
||||
}
|
||||
@ -160,14 +167,14 @@ impl Stream {
|
||||
|
||||
pub(crate) fn iter(&self, generation: Generation) -> Option<StreamIter<'_>> {
|
||||
let iter: Box<dyn Iterator<Item = &ValueAggregate>> = match generation {
|
||||
Generation::Nth(generation) if generation as usize >= self.generations_count() => return None,
|
||||
Generation::Nth(generation) if generation >= self.generations_count() => return None,
|
||||
Generation::Nth(generation) => {
|
||||
Box::new(self.values.iter().take(generation as usize + 1).flat_map(|v| v.iter()))
|
||||
Box::new(self.values.iter().take(generation.next().into()).flat_map(|v| v.iter()))
|
||||
}
|
||||
Generation::Last => Box::new(self.values.iter().flat_map(|v| v.iter())),
|
||||
};
|
||||
// unwrap is safe here, because generation's been already checked
|
||||
let len = self.elements_count(generation).unwrap();
|
||||
let len = self.generation_elements_count(generation).unwrap();
|
||||
|
||||
let iter = StreamIter { iter, len };
|
||||
|
||||
@ -179,11 +186,11 @@ impl Stream {
|
||||
return None;
|
||||
}
|
||||
|
||||
let generations_count = self.generations_count() as u32 - 1;
|
||||
let generations_count = self.generations_count() - 1;
|
||||
let (start, end) = match (start, end) {
|
||||
(Generation::Nth(start), Generation::Nth(end)) => (start, end),
|
||||
(Generation::Nth(start), Generation::Last) => (start, generations_count),
|
||||
(Generation::Last, Generation::Nth(end)) => (generations_count, end),
|
||||
(Generation::Nth(start), Generation::Nth(end)) => (usize::from(start), usize::from(end)),
|
||||
(Generation::Nth(start), Generation::Last) => (start.into(), generations_count),
|
||||
(Generation::Last, Generation::Nth(end)) => (generations_count, end.into()),
|
||||
(Generation::Last, Generation::Last) => (generations_count, generations_count),
|
||||
};
|
||||
|
||||
@ -191,27 +198,27 @@ impl Stream {
|
||||
return None;
|
||||
}
|
||||
|
||||
let len = (end - start) as usize + 1;
|
||||
let len = end - start + 1;
|
||||
let iter: Box<dyn Iterator<Item = &[ValueAggregate]>> =
|
||||
Box::new(self.values.iter().skip(start as usize).take(len).map(|v| v.as_slice()));
|
||||
Box::new(self.values.iter().skip(start).take(len).map(|v| v.as_slice()));
|
||||
let iter = StreamSliceIter { iter, len };
|
||||
|
||||
Some(iter)
|
||||
}
|
||||
|
||||
/// Removes empty generations updating data and returns final generation count.
|
||||
pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<usize> {
|
||||
pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<GenerationIdx> {
|
||||
self.remove_empty_generations();
|
||||
|
||||
for (generation, values) in self.values.iter().enumerate() {
|
||||
for value in values.iter() {
|
||||
trace_ctx
|
||||
.update_generation(value.trace_pos, generation as u32)
|
||||
.update_generation(value.trace_pos, generation.into())
|
||||
.map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompatificationError(e)))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(self.values.len())
|
||||
let last_generation_idx = self.values.len();
|
||||
Ok(last_generation_idx.into())
|
||||
}
|
||||
|
||||
/// Removes empty generations from current values.
|
||||
@ -223,7 +230,22 @@ impl Stream {
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum Generation {
|
||||
Last,
|
||||
Nth(u32),
|
||||
Nth(GenerationIdx),
|
||||
}
|
||||
|
||||
impl Generation {
|
||||
pub fn last() -> Self {
|
||||
Self::Last
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn nth(generation_id: u32) -> Self {
|
||||
use std::convert::TryFrom;
|
||||
|
||||
let generation_id = usize::try_from(generation_id).unwrap();
|
||||
let generation_idx = GenerationIdx::from(generation_id);
|
||||
Self::Nth(generation_idx)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct StreamIter<'result> {
|
||||
@ -313,22 +335,22 @@ mod test {
|
||||
fn test_slice_iter() {
|
||||
let value_1 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
|
||||
let value_2 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
|
||||
let mut stream = Stream::from_generations_count(2, 0);
|
||||
let mut stream = Stream::from_generations_count(2.into(), 0.into());
|
||||
|
||||
stream
|
||||
.add_value(value_1, Generation::Nth(0), ValueSource::PreviousData)
|
||||
.add_value(value_1, Generation::nth(0), ValueSource::PreviousData)
|
||||
.unwrap();
|
||||
stream
|
||||
.add_value(value_2, Generation::Nth(1), ValueSource::PreviousData)
|
||||
.add_value(value_2, Generation::nth(1), ValueSource::PreviousData)
|
||||
.unwrap();
|
||||
|
||||
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1)).unwrap();
|
||||
let slice = stream.slice_iter(Generation::nth(0), Generation::nth(1)).unwrap();
|
||||
assert_eq!(slice.len, 2);
|
||||
|
||||
let slice = stream.slice_iter(Generation::Nth(0), Generation::Last).unwrap();
|
||||
let slice = stream.slice_iter(Generation::nth(0), Generation::Last).unwrap();
|
||||
assert_eq!(slice.len, 2);
|
||||
|
||||
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(0)).unwrap();
|
||||
let slice = stream.slice_iter(Generation::nth(0), Generation::nth(0)).unwrap();
|
||||
assert_eq!(slice.len, 1);
|
||||
|
||||
let slice = stream.slice_iter(Generation::Last, Generation::Last).unwrap();
|
||||
@ -337,15 +359,15 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_slice_on_empty_stream() {
|
||||
let stream = Stream::from_generations_count(2, 0);
|
||||
let stream = Stream::from_generations_count(2.into(), 0.into());
|
||||
|
||||
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1));
|
||||
let slice = stream.slice_iter(Generation::nth(0), Generation::nth(1));
|
||||
assert!(slice.is_none());
|
||||
|
||||
let slice = stream.slice_iter(Generation::Nth(0), Generation::Last);
|
||||
let slice = stream.slice_iter(Generation::nth(0), Generation::Last);
|
||||
assert!(slice.is_none());
|
||||
|
||||
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(0));
|
||||
let slice = stream.slice_iter(Generation::nth(0), Generation::nth(0));
|
||||
assert!(slice.is_none());
|
||||
|
||||
let slice = stream.slice_iter(Generation::Last, Generation::Last);
|
||||
@ -356,13 +378,13 @@ mod test {
|
||||
fn generation_from_current_data() {
|
||||
let value_1 = ValueAggregate::new(Rc::new(json!("value_1")), <_>::default(), 1.into());
|
||||
let value_2 = ValueAggregate::new(Rc::new(json!("value_2")), <_>::default(), 2.into());
|
||||
let mut stream = Stream::from_generations_count(5, 5);
|
||||
let mut stream = Stream::from_generations_count(5.into(), 5.into());
|
||||
|
||||
stream
|
||||
.add_value(value_1.clone(), Generation::Nth(2), ValueSource::CurrentData)
|
||||
.add_value(value_1.clone(), Generation::nth(2), ValueSource::CurrentData)
|
||||
.unwrap();
|
||||
stream
|
||||
.add_value(value_2.clone(), Generation::Nth(4), ValueSource::PreviousData)
|
||||
.add_value(value_2.clone(), Generation::nth(4), ValueSource::PreviousData)
|
||||
.unwrap();
|
||||
|
||||
let generations_count = stream.generations_count();
|
||||
|
@ -21,9 +21,11 @@ mod utils;
|
||||
use crate::execution_step::ExecutionResult;
|
||||
use crate::execution_step::Stream;
|
||||
use crate::ExecutionError;
|
||||
|
||||
use stream_descriptor::*;
|
||||
pub(crate) use stream_value_descriptor::StreamValueDescriptor;
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
use air_interpreter_data::GlobalStreamGens;
|
||||
use air_interpreter_data::RestrictedStreamGens;
|
||||
use air_parser::ast::Span;
|
||||
@ -82,7 +84,10 @@ impl Streams {
|
||||
.and_then(|descriptors| find_closest_mut(descriptors.iter_mut(), position))
|
||||
}
|
||||
|
||||
pub(crate) fn add_stream_value(&mut self, value_descriptor: StreamValueDescriptor<'_>) -> ExecutionResult<u32> {
|
||||
pub(crate) fn add_stream_value(
|
||||
&mut self,
|
||||
value_descriptor: StreamValueDescriptor<'_>,
|
||||
) -> ExecutionResult<GenerationIdx> {
|
||||
let StreamValueDescriptor {
|
||||
value,
|
||||
name,
|
||||
@ -105,17 +110,16 @@ impl Streams {
|
||||
let descriptor = StreamDescriptor::global(stream);
|
||||
self.streams.insert(name.to_string(), vec![descriptor]);
|
||||
let generation = 0;
|
||||
Ok(generation)
|
||||
Ok(generation.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: u32) {
|
||||
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: usize) {
|
||||
let name = name.into();
|
||||
let (prev_gens_count, current_gens_count) =
|
||||
self.stream_generation_from_data(&name, span.left, iteration as usize);
|
||||
let (prev_gens_count, current_gens_count) = self.stream_generation_from_data(&name, span.left, iteration);
|
||||
|
||||
let new_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
|
||||
let new_stream = Stream::from_generations_count(prev_gens_count, current_gens_count);
|
||||
let new_descriptor = StreamDescriptor::restricted(new_stream, span);
|
||||
match self.streams.entry(name) {
|
||||
Occupied(mut entry) => {
|
||||
@ -143,7 +147,7 @@ impl Streams {
|
||||
}
|
||||
let gens_count = last_descriptor.stream.compactify(trace_ctx)?;
|
||||
|
||||
self.collect_stream_generation(name, position, gens_count as u32);
|
||||
self.collect_stream_generation(name, position, gens_count);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -164,14 +168,19 @@ impl Streams {
|
||||
// of the execution
|
||||
let stream = descriptors.pop().unwrap().stream;
|
||||
let gens_count = stream.compactify(trace_ctx)?;
|
||||
Ok((name, gens_count as u32))
|
||||
Ok((name, gens_count))
|
||||
})
|
||||
.collect::<Result<GlobalStreamGens, _>>()?;
|
||||
|
||||
Ok((global_streams, self.new_restricted_stream_gens))
|
||||
}
|
||||
|
||||
fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> (u32, u32) {
|
||||
fn stream_generation_from_data(
|
||||
&self,
|
||||
name: &str,
|
||||
position: AirPos,
|
||||
iteration: usize,
|
||||
) -> (GenerationIdx, GenerationIdx) {
|
||||
let previous_generation =
|
||||
Self::restricted_stream_generation(&self.previous_restricted_stream_gens, name, position, iteration)
|
||||
.unwrap_or_default();
|
||||
@ -187,14 +196,14 @@ impl Streams {
|
||||
name: &str,
|
||||
position: AirPos,
|
||||
iteration: usize,
|
||||
) -> Option<u32> {
|
||||
) -> Option<GenerationIdx> {
|
||||
restricted_stream_gens
|
||||
.get(name)
|
||||
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
|
||||
.copied()
|
||||
}
|
||||
|
||||
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: u32) {
|
||||
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: GenerationIdx) {
|
||||
match self.new_restricted_stream_gens.entry(name) {
|
||||
Occupied(mut streams) => match streams.get_mut().entry(position) {
|
||||
Occupied(mut iterations) => iterations.get_mut().push(generation),
|
||||
|
@ -29,7 +29,7 @@ pub(super) fn merge_global_streams(
|
||||
.iter()
|
||||
.map(|(stream_name, &prev_gens_count)| {
|
||||
let current_gens_count = current_global_streams.get(stream_name).cloned().unwrap_or_default();
|
||||
let global_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
|
||||
let global_stream = Stream::from_generations_count(prev_gens_count, current_gens_count);
|
||||
let descriptor = StreamDescriptor::global(global_stream);
|
||||
(stream_name.to_string(), vec![descriptor])
|
||||
})
|
||||
@ -40,7 +40,7 @@ pub(super) fn merge_global_streams(
|
||||
continue;
|
||||
}
|
||||
|
||||
let global_stream = Stream::from_generations_count(0, current_gens_count as usize);
|
||||
let global_stream = Stream::from_generations_count(0.into(), current_gens_count);
|
||||
let descriptor = StreamDescriptor::global(global_stream);
|
||||
global_streams.insert(stream_name, vec![descriptor]);
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ use crate::SecurityTetraplet;
|
||||
use apply_to_arguments::*;
|
||||
use utils::*;
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
use air_parser::ast;
|
||||
use air_parser::ast::Ap;
|
||||
use air_trace_handler::merger::MergerApResult;
|
||||
@ -75,7 +76,7 @@ fn populate_context<'ctx>(
|
||||
merger_ap_result: &MergerApResult,
|
||||
result: ValueAggregate,
|
||||
exec_ctx: &mut ExecutionCtx<'ctx>,
|
||||
) -> ExecutionResult<Option<u32>> {
|
||||
) -> ExecutionResult<Option<GenerationIdx>> {
|
||||
match ap_result {
|
||||
ast::ApResult::Scalar(scalar) => exec_ctx.scalars.set_scalar_value(scalar.name, result).map(|_| None),
|
||||
ast::ApResult::Stream(stream) => {
|
||||
@ -85,7 +86,7 @@ fn populate_context<'ctx>(
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_update_trace(maybe_generation: Option<u32>, trace_ctx: &mut TraceHandler) {
|
||||
fn maybe_update_trace(maybe_generation: Option<GenerationIdx>, trace_ctx: &mut TraceHandler) {
|
||||
use air_interpreter_data::ApResult;
|
||||
|
||||
if let Some(generation) = maybe_generation {
|
||||
|
@ -14,26 +14,28 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
|
||||
use super::construct_stream_iterable_values;
|
||||
use crate::execution_step::boxed_value::Generation;
|
||||
use crate::execution_step::boxed_value::Stream;
|
||||
use crate::execution_step::instructions::fold::IterableValue;
|
||||
|
||||
pub(super) struct StreamCursor {
|
||||
last_seen_generation: u32,
|
||||
last_seen_generation: GenerationIdx,
|
||||
}
|
||||
|
||||
impl StreamCursor {
|
||||
pub(super) fn new() -> Self {
|
||||
Self {
|
||||
last_seen_generation: 0,
|
||||
last_seen_generation: GenerationIdx::from(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn construct_iterables(&mut self, stream: &Stream) -> Vec<IterableValue> {
|
||||
let iterables =
|
||||
construct_stream_iterable_values(stream, Generation::Nth(self.last_seen_generation), Generation::Last);
|
||||
self.last_seen_generation = stream.last_non_empty_generation() as u32;
|
||||
self.last_seen_generation = stream.last_non_empty_generation();
|
||||
|
||||
iterables
|
||||
}
|
||||
|
@ -22,6 +22,8 @@ use crate::log_instruction;
|
||||
use air_parser::ast::New;
|
||||
use air_parser::ast::NewArgument;
|
||||
|
||||
use std::convert::TryFrom;
|
||||
|
||||
impl<'i> super::ExecutableInstruction<'i> for New<'i> {
|
||||
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||
log_instruction!(new, exec_ctx, trace_ctx);
|
||||
@ -51,6 +53,7 @@ fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
|
||||
match &new.argument {
|
||||
NewArgument::Stream(stream) => {
|
||||
let iteration = exec_ctx.tracker.new_tracker.get_iteration(position);
|
||||
let iteration = usize::try_from(iteration).unwrap();
|
||||
exec_ctx.streams.meet_scope_start(stream.name, new.span, iteration);
|
||||
}
|
||||
NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_start_scalar(scalar.name.to_string()),
|
||||
|
@ -223,7 +223,7 @@ fn par_early_exit() {
|
||||
vec![],
|
||||
&mut cid_state,
|
||||
),
|
||||
generation: 1,
|
||||
generation: 1.into(),
|
||||
};
|
||||
let current_value = ValueRef::Stream {
|
||||
cid: value_aggregate_cid(
|
||||
@ -232,7 +232,7 @@ fn par_early_exit() {
|
||||
vec![],
|
||||
&mut cid_state,
|
||||
),
|
||||
generation: 0,
|
||||
generation: 0.into(),
|
||||
};
|
||||
let expected_error = UncatchableError::TraceError {
|
||||
trace_error: TraceHandlerError::MergeError(MergeError::IncorrectCallResult(CallResultError::ValuesNotEqual {
|
||||
|
@ -95,7 +95,7 @@ fn new_with_global_streams_seq() {
|
||||
let actual_restricted_streams = data.restricted_streams;
|
||||
let expected_restricted_streams = maplit::hashmap! {
|
||||
"$stream".to_string() => maplit::hashmap! {
|
||||
AirPos::from(282) => vec![1,1]
|
||||
AirPos::from(282) => vec![1.into(), 1.into()]
|
||||
}
|
||||
};
|
||||
assert_eq!(actual_restricted_streams, expected_restricted_streams);
|
||||
@ -365,7 +365,7 @@ fn new_in_fold_with_ap() {
|
||||
let actual_restricted_streams = data.restricted_streams;
|
||||
let expected_restricted_streams = maplit::hashmap! {
|
||||
"$s1".to_string() => maplit::hashmap! {
|
||||
AirPos::from(146) => vec![1,1,1,1,1]
|
||||
AirPos::from(146) => vec![1.into(), 1.into(), 1.into(), 1.into(), 1.into()]
|
||||
}
|
||||
};
|
||||
assert_eq!(actual_restricted_streams, expected_restricted_streams);
|
||||
@ -417,17 +417,17 @@ fn new_with_streams_with_errors() {
|
||||
let actual_restricted_streams = data.restricted_streams;
|
||||
let expected_restricted_streams = maplit::hashmap! {
|
||||
"$restricted_stream_2".to_string() => maplit::hashmap! {
|
||||
AirPos::from(216) => vec![1]
|
||||
AirPos::from(216) => vec![1.into()]
|
||||
},
|
||||
"$restricted_stream_1".to_string() => maplit::hashmap! {
|
||||
AirPos::from(141) => vec![0]
|
||||
AirPos::from(141) => vec![0.into()]
|
||||
}
|
||||
};
|
||||
assert_eq!(actual_restricted_streams, expected_restricted_streams);
|
||||
|
||||
let actual_global_streams = data.global_streams;
|
||||
let expected_global_streams = maplit::hashmap! {
|
||||
"$global_stream".to_string() => 1,
|
||||
"$global_stream".to_string() => 1.into(),
|
||||
};
|
||||
assert_eq!(actual_global_streams, expected_global_streams);
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ fn issue_173() {
|
||||
let actual_restricted_streams = data.restricted_streams;
|
||||
let expected_restricted_streams = maplit::hashmap! {
|
||||
"$stream".to_string() => maplit::hashmap! {
|
||||
AirPos::from(282) => vec![1,1]
|
||||
AirPos::from(282) => vec![1.into(), 1.into()]
|
||||
}
|
||||
};
|
||||
assert_eq!(actual_restricted_streams, expected_restricted_streams);
|
||||
|
@ -49,7 +49,7 @@ fn issue_295() {
|
||||
);
|
||||
let expected_error = UncatchableError::TraceError {
|
||||
trace_error: TraceHandlerError::MergeError(MergeError::IncompatibleExecutedStates(
|
||||
ExecutedState::Ap(ApResult::new(1)),
|
||||
ExecutedState::Ap(ApResult::new(1.into())),
|
||||
ExecutedState::Call(CallResult::Executed(ValueRef::Scalar(cid))),
|
||||
)),
|
||||
instruction: "ap scalar $stream".to_string(),
|
||||
|
@ -266,7 +266,7 @@ fn incompatible_executed_states() {
|
||||
|
||||
let expected_error = UncatchableError::TraceError {
|
||||
trace_error: MergeError(air_trace_handler::merger::MergeError::IncompatibleExecutedStates(
|
||||
ExecutedState::Ap(ApResult::new(1)),
|
||||
ExecutedState::Ap(ApResult::new(1.into())),
|
||||
scalar!("", peer = vm_peer_id),
|
||||
)),
|
||||
instruction: "ap scalar $stream".to_string(),
|
||||
@ -336,7 +336,7 @@ fn invalid_dst_generations() {
|
||||
let expected_error = UncatchableError::TraceError {
|
||||
trace_error: MergeError(air_trace_handler::MergeError::IncorrectApResult(
|
||||
ApResultError::InvalidDstGenerations(ApResult {
|
||||
res_generations: vec![42, 42],
|
||||
res_generations: vec![42.into(), 42.into()],
|
||||
}),
|
||||
)),
|
||||
instruction: String::from(r#"ap "a" $s"#),
|
||||
|
@ -17,6 +17,7 @@
|
||||
mod impls;
|
||||
mod se_de;
|
||||
|
||||
use crate::GenerationIdx;
|
||||
use crate::JValue;
|
||||
use crate::TracePos;
|
||||
|
||||
@ -86,7 +87,7 @@ pub enum ValueRef {
|
||||
/// The call value is stored to a stream variable.
|
||||
Stream {
|
||||
cid: Rc<CID<ServiceResultAggregate>>,
|
||||
generation: u32,
|
||||
generation: GenerationIdx,
|
||||
},
|
||||
/// The call value is not stored.
|
||||
Unused(Rc<CID<JValue>>),
|
||||
@ -128,7 +129,7 @@ pub struct ServiceResultAggregate {
|
||||
/// (call 3)
|
||||
/// (call 4)
|
||||
/// )
|
||||
///
|
||||
///x
|
||||
/// Having started with stream with two elements {v1, v2} the resulted trace would looks like
|
||||
/// [(1) (2)] [(1) (2)] [(3) (4)] [(3) (4)] <--- the sequence of call states
|
||||
/// v1 v2 v2 v1 <---- corresponding values from $stream that
|
||||
@ -179,7 +180,7 @@ pub struct FoldResult {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct ApResult {
|
||||
#[serde(rename = "gens")]
|
||||
pub res_generations: Vec<u32>,
|
||||
pub res_generations: Vec<GenerationIdx>,
|
||||
}
|
||||
|
||||
/// Contains ids of element that were on a stream at the moment of an appropriate canon call.
|
||||
|
@ -48,13 +48,11 @@ impl CallResult {
|
||||
}
|
||||
|
||||
pub fn executed_stream(
|
||||
service_result_agg_cid: Rc<CID<ServiceResultAggregate>>,
|
||||
generation: u32,
|
||||
cid: Rc<CID<ServiceResultAggregate>>,
|
||||
generation: GenerationIdx,
|
||||
) -> CallResult {
|
||||
Self::executed_service_result(ValueRef::Stream {
|
||||
cid: service_result_agg_cid,
|
||||
generation,
|
||||
})
|
||||
let value = ValueRef::Stream { cid, generation };
|
||||
CallResult::Executed(value)
|
||||
}
|
||||
|
||||
pub fn executed_unused(value_cid: Rc<CID<JValue>>) -> CallResult {
|
||||
@ -87,7 +85,7 @@ impl ExecutedState {
|
||||
}
|
||||
|
||||
impl ApResult {
|
||||
pub fn new(res_generation: u32) -> Self {
|
||||
pub fn new(res_generation: GenerationIdx) -> Self {
|
||||
Self {
|
||||
res_generations: vec![res_generation],
|
||||
}
|
||||
|
88
crates/air-lib/interpreter-data/src/generation_idx.rs
Normal file
88
crates/air-lib/interpreter-data/src/generation_idx.rs
Normal file
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
|
||||
type GenerationIdxType = u32;
|
||||
|
||||
#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
#[repr(transparent)]
|
||||
pub struct GenerationIdx(GenerationIdxType);
|
||||
|
||||
impl GenerationIdx {
|
||||
pub fn checked_add(self, other: Self) -> Option<Self> {
|
||||
self.0.checked_add(other.0).map(Self)
|
||||
}
|
||||
|
||||
pub fn checked_sub(self, other: Self) -> Option<Self> {
|
||||
self.0.checked_sub(other.0).map(Self)
|
||||
}
|
||||
|
||||
pub fn next(self) -> Self {
|
||||
// TODO: check for overflow
|
||||
Self::from(self.0 as usize + 1)
|
||||
}
|
||||
|
||||
pub fn prev(self) -> Self {
|
||||
// TODO: check for overflow
|
||||
Self::from(self.0 as usize - 1)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd<usize> for GenerationIdx {
|
||||
fn partial_cmp(&self, other: &usize) -> Option<Ordering> {
|
||||
let self_as_usize: usize = (*self).into();
|
||||
self_as_usize.partial_cmp(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq<usize> for GenerationIdx {
|
||||
fn eq(&self, other: &usize) -> bool {
|
||||
let self_as_usize: usize = (*self).into();
|
||||
self_as_usize == *other
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: replace these two traits with try-* versions
|
||||
impl From<usize> for GenerationIdx {
|
||||
fn from(value: usize) -> Self {
|
||||
GenerationIdx(value as u32)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GenerationIdx> for usize {
|
||||
fn from(value: GenerationIdx) -> Self {
|
||||
value.0 as usize
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for GenerationIdx {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
Debug::fmt(&self.0, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for GenerationIdx {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
Display::fmt(&self.0, f)
|
||||
}
|
||||
}
|
@ -28,6 +28,7 @@
|
||||
|
||||
mod cid_store;
|
||||
mod executed_state;
|
||||
mod generation_idx;
|
||||
mod interpreter_data;
|
||||
mod stream_generations;
|
||||
mod trace;
|
||||
@ -35,6 +36,7 @@ mod trace_pos;
|
||||
|
||||
pub use cid_store::*;
|
||||
pub use executed_state::*;
|
||||
pub use generation_idx::*;
|
||||
pub use interpreter_data::*;
|
||||
pub use stream_generations::*;
|
||||
pub use trace::*;
|
||||
|
@ -14,13 +14,14 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use crate::GenerationIdx;
|
||||
use air_parser::AirPos;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Mapping from a stream name to it's generation count.
|
||||
/// Similar to pi-calculus non-restricted names/channels.
|
||||
pub type GlobalStreamGens = HashMap<String, u32>;
|
||||
pub type GlobalStreamGens = HashMap<String, GenerationIdx>;
|
||||
|
||||
/// Mapping from a stream name to
|
||||
/// position of a new instruction in a script that creates a scope for a stream
|
||||
@ -30,4 +31,4 @@ pub type GlobalStreamGens = HashMap<String, u32>;
|
||||
/// so it could be met several times during script execution. This field anchors iteration
|
||||
/// where it was met.
|
||||
/// Similar to pi-calculus restricted names/channels.
|
||||
pub type RestrictedStreamGens = HashMap<String, HashMap<AirPos, Vec<u32>>>;
|
||||
pub type RestrictedStreamGens = HashMap<String, HashMap<AirPos, Vec<GenerationIdx>>>;
|
||||
|
@ -31,8 +31,8 @@ use crate::SubTraceDesc;
|
||||
use air::ExecutionCidState;
|
||||
use air_interpreter_cid::value_to_json_cid;
|
||||
use air_interpreter_cid::CID;
|
||||
use air_interpreter_data::CanonCidAggregate;
|
||||
use air_interpreter_data::ServiceResultAggregate;
|
||||
use air_interpreter_data::{CanonCidAggregate, GenerationIdx};
|
||||
use avm_server::SecurityTetraplet;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@ -131,8 +131,8 @@ pub fn subtrace_desc(begin_pos: impl Into<TracePos>, subtrace_len: u32) -> SubTr
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ap(generation: u32) -> ExecutedState {
|
||||
let ap_result = ApResult::new(generation);
|
||||
pub fn ap(generation: impl Into<GenerationIdx>) -> ExecutedState {
|
||||
let ap_result = ApResult::new(generation.into());
|
||||
ExecutedState::Ap(ap_result)
|
||||
}
|
||||
|
||||
@ -357,7 +357,8 @@ impl ExecutedCallBuilder {
|
||||
value_aggregate_cid(self.result, self.tetraplet, self.args, cid_state);
|
||||
let value = ValueRef::Stream {
|
||||
cid: service_result_agg_cid,
|
||||
generation,
|
||||
// TODO: refactor it
|
||||
generation: (generation as usize).into(),
|
||||
};
|
||||
ExecutedState::Call(CallResult::Executed(value))
|
||||
}
|
||||
|
@ -14,6 +14,8 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
|
||||
use super::ExecutionTrace;
|
||||
use super::KeeperError;
|
||||
use super::KeeperResult;
|
||||
@ -33,7 +35,7 @@ impl MergeCtx {
|
||||
Self { slider }
|
||||
}
|
||||
|
||||
pub(crate) fn try_get_generation(&self, position: TracePos) -> KeeperResult<u32> {
|
||||
pub(crate) fn try_get_generation(&self, position: TracePos) -> KeeperResult<GenerationIdx> {
|
||||
use air_interpreter_data::*;
|
||||
|
||||
let state = self
|
||||
@ -46,7 +48,7 @@ impl MergeCtx {
|
||||
|
||||
match state {
|
||||
ExecutedState::Call(CallResult::Executed(ValueRef::Stream { generation, .. })) => Ok(*generation),
|
||||
// such Aps are always preceded by Fold where corresponding stream could be used,
|
||||
// such Aps are always preceded by Fold where corresponding stream could be used
|
||||
// so it's been already checked that res_generation is well-formed
|
||||
// and accessing 0th element is safe here
|
||||
ExecutedState::Ap(ap_result) => Ok(ap_result.res_generations[0]),
|
||||
|
@ -63,7 +63,7 @@ impl TraceHandler {
|
||||
pub fn update_generation(
|
||||
&mut self,
|
||||
trace_pos: TracePos,
|
||||
generation: u32,
|
||||
generation: GenerationIdx,
|
||||
) -> Result<(), GenerationCompatificationError> {
|
||||
let state = self
|
||||
.data_keeper
|
||||
|
@ -30,7 +30,7 @@ pub enum MergerApResult {
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MetApResult {
|
||||
pub generation: u32,
|
||||
pub generation: GenerationIdx,
|
||||
pub value_source: ValueSource,
|
||||
}
|
||||
|
||||
@ -83,7 +83,7 @@ fn prepare_merge_result(
|
||||
}
|
||||
|
||||
impl MetApResult {
|
||||
pub(crate) fn new(generation: u32, value_source: ValueSource) -> Self {
|
||||
pub(crate) fn new(generation: GenerationIdx, value_source: ValueSource) -> Self {
|
||||
Self {
|
||||
generation,
|
||||
value_source,
|
||||
|
@ -82,7 +82,7 @@ fn compute_lens_convolution(fold: &FoldResult, merge_ctx: &MergeCtx) -> MergeRes
|
||||
let subtraces_count = fold.lore.len();
|
||||
let mut lens = Vec::with_capacity(subtraces_count);
|
||||
let mut fold_states_count: usize = 0;
|
||||
let mut last_seen_generation = 0;
|
||||
let mut last_seen_generation = GenerationIdx::from(0);
|
||||
let mut last_seen_generation_pos = 0;
|
||||
let mut cum_after_len = 0;
|
||||
|
||||
@ -240,7 +240,7 @@ mod tests {
|
||||
|
||||
let fold_result = FoldResult { lore };
|
||||
|
||||
let slider = TraceSlider::new(vec![ExecutedState::Ap(ApResult::new(0))]);
|
||||
let slider = TraceSlider::new(vec![ExecutedState::Ap(ApResult::new(0.into()))]);
|
||||
let ctx = MergeCtx { slider };
|
||||
|
||||
let (all_states, convoluted_lens) =
|
||||
@ -285,9 +285,9 @@ mod tests {
|
||||
let fold_result = FoldResult { lore };
|
||||
|
||||
let slider = TraceSlider::new(vec![
|
||||
ExecutedState::Ap(ApResult::new(0)),
|
||||
ExecutedState::Ap(ApResult::new(1)),
|
||||
ExecutedState::Ap(ApResult::new(2)),
|
||||
ExecutedState::Ap(ApResult::new(0.into())),
|
||||
ExecutedState::Ap(ApResult::new(1.into())),
|
||||
ExecutedState::Ap(ApResult::new(2.into())),
|
||||
]);
|
||||
let ctx = MergeCtx { slider };
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user