feat(execution-engine)!: stream map to scalar conversion using canon instruction [fixes VM-294] (#610)

feat(execution-engine): Stream Map to Scalar conversion using canon instruction [fixes VM-294]
This commit is contained in:
raftedproc 2023-06-28 13:59:16 +03:00 committed by GitHub
parent 56a03364b1
commit fcb4c9dab4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2104 additions and 1333 deletions

View File

@ -16,6 +16,7 @@
use super::ExecutionResult; use super::ExecutionResult;
use super::ValueAggregate; use super::ValueAggregate;
use crate::execution_step::execution_context::stream_map_key::StreamMapKey;
use crate::ExecutionError; use crate::ExecutionError;
use crate::UncatchableError; use crate::UncatchableError;
@ -23,6 +24,9 @@ use air_interpreter_data::GenerationIdx;
use air_trace_handler::merger::ValueSource; use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler; use air_trace_handler::TraceHandler;
use std::borrow::Cow;
use std::collections::HashSet;
/// Streams are CRDT-like append only data structures. They are guaranteed to have the same order /// Streams are CRDT-like append only data structures. They are guaranteed to have the same order
/// of values on each peer. /// of values on each peer.
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
@ -211,6 +215,28 @@ impl Stream {
fn remove_empty_generations(&mut self) { fn remove_empty_generations(&mut self) {
self.values.retain(|values| !values.is_empty()); self.values.retain(|values| !values.is_empty());
} }
pub(crate) fn get_unique_map_keys_stream(&self) -> Cow<'_, Stream> {
let mut distinct_values = HashSet::new();
let mut new_values = vec![];
for values in self.values.iter() {
let distinct_values_vec = values
.iter()
.filter(|v| {
StreamMapKey::from_kvpair(v)
.map(|key| distinct_values.insert(key))
.unwrap_or(false)
})
.cloned()
.collect::<_>();
new_values.push(distinct_values_vec);
}
Cow::Owned(Stream {
values: new_values,
previous_gens_count: self.previous_gens_count,
})
}
} }
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]
@ -312,11 +338,14 @@ mod test {
use super::Stream; use super::Stream;
use super::ValueAggregate; use super::ValueAggregate;
use super::ValueSource; use super::ValueSource;
use crate::execution_step::boxed_value::stream_map::from_key_value;
use crate::execution_step::execution_context::stream_map_key::StreamMapKey;
use crate::execution_step::ServiceResultAggregate; use crate::execution_step::ServiceResultAggregate;
use air_interpreter_cid::CID; use air_interpreter_cid::CID;
use serde_json::json; use serde_json::json;
use std::borrow::Cow;
use std::rc::Rc; use std::rc::Rc;
#[test] #[test]
@ -397,4 +426,70 @@ mod test {
assert_eq!(stream_value_1, &value_2); assert_eq!(stream_value_1, &value_2);
assert_eq!(stream_value_2, &value_1); assert_eq!(stream_value_2, &value_1);
} }
#[test]
fn test_get_unique_map_keys_stream() {
let key_prefix = "some_key".to_string();
let values = (0..3)
.map(|id| {
let key = key_prefix.clone() + &id.to_string();
let key = StreamMapKey::Str(Cow::Borrowed(key.as_str()));
let value = json!([{"top_level": [{"first": 42 + id },{"second": 43 - id}]}]);
let obj = from_key_value(key, &value);
ValueAggregate::new(
obj,
<_>::default(),
0.into(),
air_interpreter_data::Provenance::literal(),
)
})
.collect::<Vec<_>>();
let mut stream = Stream::from_generations_count(5.into(), 5.into());
stream
.add_value(values[0].clone(), Generation::nth(0), ValueSource::CurrentData)
.unwrap();
stream
.add_value(values[0].clone(), Generation::nth(1), ValueSource::CurrentData)
.unwrap();
stream
.add_value(values[2].clone(), Generation::nth(1), ValueSource::CurrentData)
.unwrap();
stream
.add_value(values[2].clone(), Generation::nth(3), ValueSource::CurrentData)
.unwrap();
stream
.add_value(values[2].clone(), Generation::nth(4), ValueSource::CurrentData)
.unwrap();
stream
.add_value(values[1].clone(), Generation::nth(4), ValueSource::CurrentData)
.unwrap();
let unique_keys_only = stream.get_unique_map_keys_stream();
let mut iter = unique_keys_only.iter(Generation::Last).unwrap();
assert_eq!(&values[0], iter.next().unwrap());
assert_eq!(&values[2], iter.next().unwrap());
assert_eq!(&values[1], iter.next().unwrap());
assert_eq!(iter.next(), None);
let mut stream = Stream::from_generations_count(5.into(), 5.into());
stream
.add_value(values[0].clone(), Generation::nth(0), ValueSource::CurrentData)
.unwrap();
stream
.add_value(values[1].clone(), Generation::nth(2), ValueSource::CurrentData)
.unwrap();
stream
.add_value(values[2].clone(), Generation::nth(3), ValueSource::CurrentData)
.unwrap();
let unique_keys_only = stream.get_unique_map_keys_stream();
let mut iter = unique_keys_only.iter(Generation::Last).unwrap();
assert_eq!(&values[0], iter.next().unwrap());
assert_eq!(&values[1], iter.next().unwrap());
assert_eq!(&values[2], iter.next().unwrap());
assert_eq!(iter.next(), None);
}
} }

View File

@ -25,9 +25,10 @@ use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler; use air_trace_handler::TraceHandler;
use serde_json::json; use serde_json::json;
use std::borrow::Cow;
use std::rc::Rc; use std::rc::Rc;
fn from_key_value(key: StreamMapKey<'_>, value: &JValue) -> Rc<JValue> { pub(super) fn from_key_value(key: StreamMapKey<'_>, value: &JValue) -> Rc<JValue> {
Rc::new(json!({ "key": key, "value": value })) Rc::new(json!({ "key": key, "value": value }))
} }
@ -80,6 +81,10 @@ impl StreamMap {
pub(crate) fn get_mut_stream_ref(&mut self) -> &mut Stream { pub(crate) fn get_mut_stream_ref(&mut self) -> &mut Stream {
&mut self.stream &mut self.stream
} }
pub(crate) fn get_unique_map_keys_stream(&mut self) -> Cow<'_, Stream> {
self.stream.get_unique_map_keys_stream()
}
} }
use std::fmt; use std::fmt;

View File

@ -15,6 +15,7 @@
*/ */
use crate::execution_step::execution_context::stream_maps_variables::errors::unsupported_map_key_type; use crate::execution_step::execution_context::stream_maps_variables::errors::unsupported_map_key_type;
use crate::execution_step::ValueAggregate;
use crate::CatchableError; use crate::CatchableError;
use crate::ExecutionError; use crate::ExecutionError;
use crate::JValue; use crate::JValue;
@ -22,7 +23,7 @@ use crate::JValue;
use serde::Serialize; use serde::Serialize;
use std::borrow::Cow; use std::borrow::Cow;
#[derive(Clone)] #[derive(Clone, PartialEq, Eq, Hash)]
pub(crate) enum StreamMapKey<'i> { pub(crate) enum StreamMapKey<'i> {
Str(Cow<'i, str>), Str(Cow<'i, str>),
U64(u64), U64(u64),
@ -38,6 +39,17 @@ impl<'i> StreamMapKey<'i> {
_ => Err(CatchableError::StreamMapError(unsupported_map_key_type(map_name)).into()), _ => Err(CatchableError::StreamMapError(unsupported_map_key_type(map_name)).into()),
} }
} }
pub(crate) fn from_kvpair(value: &ValueAggregate) -> Option<Self> {
let object = value.get_result().as_object()?;
let key = object.get("key")?.to_owned();
match key {
JValue::String(s) => Some(StreamMapKey::Str(Cow::Owned(s))),
JValue::Number(n) if n.is_i64() => Some(StreamMapKey::I64(n.as_i64().unwrap())),
JValue::Number(n) if n.is_u64() => Some(StreamMapKey::U64(n.as_u64().unwrap())),
_ => None,
}
}
} }
impl From<i64> for StreamMapKey<'_> { impl From<i64> for StreamMapKey<'_> {

View File

@ -19,6 +19,7 @@ use super::ExecutionResult;
use super::TraceHandler; use super::TraceHandler;
use crate::execution_step::boxed_value::CanonStream; use crate::execution_step::boxed_value::CanonStream;
use crate::execution_step::boxed_value::CanonStreamWithProvenance; use crate::execution_step::boxed_value::CanonStreamWithProvenance;
use crate::execution_step::instructions::resolve_peer_id_to_string;
use crate::execution_step::Stream; use crate::execution_step::Stream;
use crate::log_instruction; use crate::log_instruction;
use crate::trace_to_exec_err; use crate::trace_to_exec_err;
@ -29,28 +30,53 @@ use air_interpreter_data::CanonCidAggregate;
use air_interpreter_data::CanonResult; use air_interpreter_data::CanonResult;
use air_interpreter_data::CanonResultCidAggregate; use air_interpreter_data::CanonResultCidAggregate;
use air_parser::ast; use air_parser::ast;
use air_parser::ast::ResolvableToPeerIdVariable;
use air_parser::AirPos;
use air_trace_handler::merger::MergerCanonResult; use air_trace_handler::merger::MergerCanonResult;
use std::borrow::Cow; use std::borrow::Cow;
use std::rc::Rc; use std::rc::Rc;
pub(super) type CanonEpilogClosure<'ctx> =
dyn for<'i> Fn(StreamWithSerializedView, &mut ExecutionCtx<'i>, &mut TraceHandler) -> ExecutionResult<()> + 'ctx;
impl<'i> super::ExecutableInstruction<'i> for ast::Canon<'i> { impl<'i> super::ExecutableInstruction<'i> for ast::Canon<'i> {
#[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))] #[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))]
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(call, exec_ctx, trace_ctx); log_instruction!(call, exec_ctx, trace_ctx);
let canon_result = trace_to_exec_err!(trace_ctx.meet_canon_start(), self)?; let canon_result = trace_to_exec_err!(trace_ctx.meet_canon_start(), self)?;
let epilog: &CanonEpilogClosure<'_> = &|stream_with_positions: StreamWithSerializedView,
exec_ctx: &mut ExecutionCtx<'_>,
trace_ctx: &mut TraceHandler|
-> ExecutionResult<()> {
let StreamWithSerializedView {
canon_stream,
canon_result_cid,
} = stream_with_positions;
let value = CanonStreamWithProvenance::new(canon_stream, canon_result_cid.clone());
exec_ctx.scalars.set_canon_value(self.canon_stream.name, value)?;
trace_ctx.meet_canon_end(CanonResult::new(canon_result_cid));
Ok(())
};
match canon_result { match canon_result {
MergerCanonResult::CanonResult(canon_result_cid) => { MergerCanonResult::CanonResult(canon_result_cid) => {
handle_seen_canon(self, canon_result_cid, exec_ctx, trace_ctx) handle_seen_canon(epilog, canon_result_cid, exec_ctx, trace_ctx)
}
MergerCanonResult::Empty => {
let get_stream_or_default: Box<GetStreamClosure<'_>> =
get_stream_or_default_function(self.stream.name, self.stream.position);
handle_unseen_canon(epilog, &get_stream_or_default, &self.peer_id, exec_ctx, trace_ctx)
} }
MergerCanonResult::Empty => handle_unseen_canon(self, exec_ctx, trace_ctx),
} }
} }
} }
fn handle_seen_canon( pub(super) fn handle_seen_canon(
ast_canon: &ast::Canon<'_>, epilog: &CanonEpilogClosure<'_>,
canon_result_cid: Rc<CID<CanonResultCidAggregate>>, canon_result_cid: Rc<CID<CanonResultCidAggregate>>,
exec_ctx: &mut ExecutionCtx<'_>, exec_ctx: &mut ExecutionCtx<'_>,
trace_ctx: &mut TraceHandler, trace_ctx: &mut TraceHandler,
@ -74,15 +100,17 @@ fn handle_seen_canon(
canon_result_cid, canon_result_cid,
}; };
epilog(ast_canon.canon_stream.name, canon_stream_with_se, exec_ctx, trace_ctx) epilog(canon_stream_with_se, exec_ctx, trace_ctx)
} }
fn handle_unseen_canon( pub(super) fn handle_unseen_canon(
ast_canon: &ast::Canon<'_>, epilog: &CanonEpilogClosure<'_>,
get_stream_or_default: &GetStreamClosure<'_>,
peer_id: &ResolvableToPeerIdVariable<'_>,
exec_ctx: &mut ExecutionCtx<'_>, exec_ctx: &mut ExecutionCtx<'_>,
trace_ctx: &mut TraceHandler, trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> { ) -> ExecutionResult<()> {
let peer_id = crate::execution_step::instructions::resolve_peer_id_to_string(&ast_canon.peer_id, exec_ctx)?; let peer_id = resolve_peer_id_to_string(peer_id, exec_ctx)?;
if exec_ctx.run_parameters.current_peer_id.as_str() != peer_id { if exec_ctx.run_parameters.current_peer_id.as_str() != peer_id {
exec_ctx.make_subgraph_incomplete(); exec_ctx.make_subgraph_incomplete();
@ -96,41 +124,21 @@ fn handle_unseen_canon(
return Ok(()); return Ok(());
} }
let stream_with_positions = create_canon_stream_from_name(ast_canon, peer_id, exec_ctx)?; let stream_with_positions = create_canon_stream_from_name(get_stream_or_default, peer_id, exec_ctx)?;
epilog(ast_canon.canon_stream.name, stream_with_positions, exec_ctx, trace_ctx) epilog(stream_with_positions, exec_ctx, trace_ctx)
} }
fn epilog( pub(super) struct StreamWithSerializedView {
canon_stream_name: &str, pub(super) canon_stream: CanonStream,
stream_with_positions: StreamWithSerializedView, pub(super) canon_result_cid: Rc<CID<CanonResultCidAggregate>>,
exec_ctx: &mut ExecutionCtx<'_>,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> {
let StreamWithSerializedView {
canon_stream,
canon_result_cid,
} = stream_with_positions;
exec_ctx.scalars.set_canon_value(
canon_stream_name,
CanonStreamWithProvenance::new(canon_stream, canon_result_cid.clone()),
)?;
trace_ctx.meet_canon_end(CanonResult::new(canon_result_cid));
Ok(())
}
struct StreamWithSerializedView {
canon_stream: CanonStream,
canon_result_cid: Rc<CID<CanonResultCidAggregate>>,
} }
fn create_canon_stream_from_name( fn create_canon_stream_from_name(
ast_canon: &ast::Canon<'_>, get_stream_or_default: &GetStreamClosure<'_>,
peer_id: String, peer_id: String,
exec_ctx: &mut ExecutionCtx<'_>, exec_ctx: &mut ExecutionCtx<'_>,
) -> ExecutionResult<StreamWithSerializedView> { ) -> ExecutionResult<StreamWithSerializedView> {
let stream = get_stream_or_default(ast_canon, exec_ctx); let stream = get_stream_or_default(exec_ctx);
let canon_stream = CanonStream::from_stream(stream.as_ref(), peer_id); let canon_stream = CanonStream::from_stream(stream.as_ref(), peer_id);
@ -175,10 +183,21 @@ fn create_canon_stream_from_name(
Ok(result) Ok(result)
} }
/// This function gets a stream from context or return a default empty stream, pub(super) type GetStreamClosure<'obj> = dyn for<'ctx> Fn(&'ctx mut ExecutionCtx<'_>) -> Cow<'ctx, Stream> + 'obj;
/// it's crucial for deterministic behaviour, for more info see
/// github.com/fluencelabs/aquavm/issues/346 /// The resulting closure gets underlying stream in a context
fn get_stream_or_default<'ctx>(ast_canon: &ast::Canon<'_>, exec_ctx: &'ctx ExecutionCtx<'_>) -> Cow<'ctx, Stream> { /// or returns a default empty stream,
let maybe_stream = exec_ctx.streams.get(ast_canon.stream.name, ast_canon.stream.position); /// it is crucial for deterministic behaviour, for more info see
maybe_stream.map(Cow::Borrowed).unwrap_or_default() /// github.com/fluencelabs/aquavm/issues/346.
fn get_stream_or_default_function<'obj, 'n: 'obj>(
stream_name: &'n str,
position: AirPos,
) -> Box<GetStreamClosure<'obj>> {
Box::new(move |exec_ctx: &mut ExecutionCtx<'_>| -> Cow<'_, Stream> {
exec_ctx
.streams
.get(stream_name, position)
.map(Cow::Borrowed)
.unwrap_or_default()
})
} }

View File

@ -0,0 +1,101 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::canon::handle_seen_canon;
use super::canon::handle_unseen_canon;
use super::canon::GetStreamClosure;
use super::ExecutionCtx;
use super::ExecutionResult;
use super::TraceHandler;
use crate::execution_step::boxed_value::JValuable;
use crate::execution_step::instructions::canon::CanonEpilogClosure;
use crate::execution_step::instructions::canon::StreamWithSerializedView;
use crate::execution_step::CanonResultAggregate;
use crate::execution_step::Stream;
use crate::execution_step::ValueAggregate;
use crate::log_instruction;
use crate::trace_to_exec_err;
use crate::UncatchableError;
use air_interpreter_data::CanonResult;
use air_parser::ast;
use air_parser::AirPos;
use air_trace_handler::merger::MergerCanonResult;
use std::borrow::Cow;
use std::rc::Rc;
impl<'i> super::ExecutableInstruction<'i> for ast::CanonStreamMapScalar<'i> {
#[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))]
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(call, exec_ctx, trace_ctx);
let canon_result = trace_to_exec_err!(trace_ctx.meet_canon_start(), self)?;
let epilog: &CanonEpilogClosure<'_> = &|stream_with_positions: StreamWithSerializedView,
exec_ctx: &mut ExecutionCtx<'_>,
trace_ctx: &mut TraceHandler|
-> ExecutionResult<()> {
let StreamWithSerializedView {
canon_stream,
canon_result_cid,
} = stream_with_positions;
let value = JValuable::as_jvalue(&&canon_stream).into_owned();
let tetraplet = canon_stream.tetraplet().clone();
let position = trace_ctx.trace_pos().map_err(UncatchableError::from)?;
let value = CanonResultAggregate::new(
Rc::new(value),
tetraplet.peer_pk.as_str().into(),
&tetraplet.json_path,
position,
);
let result = ValueAggregate::from_canon_result(value, canon_result_cid.clone());
exec_ctx.scalars.set_scalar_value(self.scalar.name, result)?;
trace_ctx.meet_canon_end(CanonResult::new(canon_result_cid));
Ok(())
};
match canon_result {
MergerCanonResult::CanonResult(canon_result_cid) => {
handle_seen_canon(epilog, canon_result_cid, exec_ctx, trace_ctx)
}
MergerCanonResult::Empty => {
let get_stream_or_default: Box<GetStreamClosure<'_>> =
get_stream_or_default_function(self.stream_map.name, self.stream_map.position);
handle_unseen_canon(epilog, &get_stream_or_default, &self.peer_id, exec_ctx, trace_ctx)
}
}
}
}
/// The resulting closure gets underlying stream from a StreamMap in a context
/// or returns a default empty stream,
/// it is crucial for deterministic behaviour, for more info see
/// github.com/fluencelabs/aquavm/issues/346.
fn get_stream_or_default_function<'obj, 'n: 'obj>(
stream_map_name: &'n str,
position: AirPos,
) -> Box<GetStreamClosure<'obj>> {
Box::new(move |exec_ctx: &mut ExecutionCtx<'_>| -> Cow<'_, Stream> {
exec_ctx
.stream_maps
.get_mut(stream_map_name, position)
.map(|stream_map| stream_map.get_unique_map_keys_stream())
.or_else(<_>::default)
.unwrap()
})
}

View File

@ -18,6 +18,7 @@ mod ap;
mod ap_map; mod ap_map;
mod call; mod call;
mod canon; mod canon;
mod canon_stream_map_scalar;
mod compare_matchable; mod compare_matchable;
mod fail; mod fail;
mod fold; mod fold;
@ -80,6 +81,7 @@ impl<'i> ExecutableInstruction<'i> for Instruction<'i> {
Instruction::Ap(ap) => execute!(self, ap, exec_ctx, trace_ctx), Instruction::Ap(ap) => execute!(self, ap, exec_ctx, trace_ctx),
Instruction::ApMap(ap_map) => execute!(self, ap_map, exec_ctx, trace_ctx), Instruction::ApMap(ap_map) => execute!(self, ap_map, exec_ctx, trace_ctx),
Instruction::Canon(canon) => execute!(self, canon, exec_ctx, trace_ctx), Instruction::Canon(canon) => execute!(self, canon, exec_ctx, trace_ctx),
Instruction::CanonStreamMapScalar(canon) => execute!(self, canon, exec_ctx, trace_ctx),
Instruction::Fail(fail) => execute!(self, fail, exec_ctx, trace_ctx), Instruction::Fail(fail) => execute!(self, fail, exec_ctx, trace_ctx),
Instruction::FoldScalar(fold) => execute!(self, fold, exec_ctx, trace_ctx), Instruction::FoldScalar(fold) => execute!(self, fold, exec_ctx, trace_ctx),
Instruction::FoldStream(fold) => execute!(self, fold, exec_ctx, trace_ctx), Instruction::FoldStream(fold) => execute!(self, fold, exec_ctx, trace_ctx),

View File

@ -14,8 +14,8 @@
* limitations under the License. * limitations under the License.
*/ */
use air::ExecutionCidState;
use air_test_utils::prelude::*; use air_test_utils::prelude::*;
use fstrings::f; use fstrings::f;
use fstrings::format_args_f; use fstrings::format_args_f;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
@ -364,3 +364,225 @@ fn canon_over_later_defined_stream() {
]; ];
assert_eq!(actual_trace, expected_trace); assert_eq!(actual_trace, expected_trace);
} }
#[test]
fn canon_map_scalar() {
let vm_peer_id_1 = "vm_peer_id_1";
let mut peer_vm_1 = create_avm(echo_call_service(), vm_peer_id_1);
let script = f!(r#"
(seq
(seq
(seq
(seq
(ap ("k" "v1") %map)
(ap ("k" "v2") %map)
)
(seq
(ap (42 "v3") %map)
(ap (42 "v4") %map)
)
)
(seq
(ap (-42 "v5") %map)
(ap (-42 "v6") %map)
)
)
(seq
(canon "{vm_peer_id_1}" %map scalar)
(call "{vm_peer_id_1}" ("m1" "f1") [scalar] output)
)
)
"#);
let result = checked_call_vm!(peer_vm_1, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let mut cid_state: ExecutionCidState = ExecutionCidState::new();
let value1 = json!({"key": "k", "value": "v1"});
let value2 = json!({"key": 42, "value": "v3"});
let value3 = json!({"key": -42, "value": "v5"});
let value4 = json!([value1, value2, value3]);
let expected_trace = ExecutionTrace::from(vec![
executed_state::ap(0),
executed_state::ap(0),
executed_state::ap(0),
executed_state::ap(0),
executed_state::ap(0),
executed_state::ap(0),
canon_tracked(
json!({"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "vm_peer_id_1", "service_id": ""},
"values": [
{
"result": {
"key": "k",
"value": "v1"
},
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "", "service_id": ""},
"provenance": Provenance::Literal,
},
{
"result": {
"key": 42,
"value": "v3"
},
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "", "service_id": ""},
"provenance": Provenance::Literal,
}, {
"result": {
"key": -42,
"value": "v5"
},
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "", "service_id": ""},
"provenance": Provenance::Literal,
}]}),
&mut cid_state,
),
scalar_tracked!(
value4.clone(),
cid_state,
peer = vm_peer_id_1,
service = "m1",
function = "f1",
args = vec![value4]
),
]);
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn canon_map_scalar_with_par() {
let vm_peer_id_1 = "vm_peer_id_1";
let vm_peer_id_2 = "vm_peer_id_2";
let mut peer_vm_1 = create_avm(echo_call_service(), vm_peer_id_1);
let mut peer_vm_2 = create_avm(echo_call_service(), vm_peer_id_2);
let script = f!(r#"
(par
(seq
(seq
(ap ("k" "v1") %map)
(ap (-42 "v2") %map)
)
(seq
(canon "{vm_peer_id_1}" %map scalar)
(call "{vm_peer_id_1}" ("m1" "f1") [scalar] output)
)
)
(seq
(seq
(ap (42 "v3") %map)
(ap ("42" "v4") %map)
)
(seq
(canon "{vm_peer_id_2}" %map scalar1)
(call "{vm_peer_id_2}" ("m2" "f2") [scalar1] output1)
)
)
)
"#);
let result = checked_call_vm!(peer_vm_1, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let mut cid_state: ExecutionCidState = ExecutionCidState::new();
let value1 = json!([{"key": "k", "value": "v1"}, {"key": -42, "value": "v2"}]);
let mut states_vec = vec![
executed_state::par(4, 2),
executed_state::ap(0),
executed_state::ap(0),
canon_tracked(
json!({"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "vm_peer_id_1", "service_id": ""},
"values": [
{
"result": {
"key": "k",
"value": "v1"
},
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "", "service_id": ""},
"provenance": Provenance::Literal,
},
{
"result": {
"key": -42,
"value": "v2"
},
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "", "service_id": ""},
"provenance": Provenance::Literal,
}]}),
&mut cid_state,
),
scalar_tracked!(
value1.clone(),
cid_state,
peer = vm_peer_id_1,
service = "m1",
function = "f1",
args = vec![value1.clone()]
),
executed_state::ap(0),
executed_state::ap(0),
];
let expected_trace = ExecutionTrace::from(states_vec.clone());
assert_eq!(actual_trace, expected_trace);
let result = checked_call_vm!(peer_vm_2, <_>::default(), &script, "", result.data);
let actual_trace = trace_from_result(&result);
let value2 = json!([{"key": "k", "value": "v1"}, {"key": -42, "value": "v2"},{"key": 42, "value": "v3"}, {"key": "42", "value": "v4"}]);
states_vec[0] = executed_state::par(4, 4);
states_vec.extend(vec![
canon_tracked(
json!({"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "vm_peer_id_2", "service_id": ""},
"values": [
{
"result": {
"key": "k",
"value": "v1"
},
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "", "service_id": ""},
"provenance": Provenance::Literal,
},
{
"result": {
"key": -42,
"value": "v2"
},
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "", "service_id": ""},
"provenance": Provenance::Literal,
},
{
"result": {
"key": 42,
"value": "v3"
},
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "", "service_id": ""},
"provenance": Provenance::Literal,
},
{
"result": {
"key": "42",
"value": "v4"
},
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "", "service_id": ""},
"provenance": Provenance::Literal,
}]}),
&mut cid_state,
),
scalar_tracked!(
value2.clone(),
cid_state,
peer = vm_peer_id_2,
service = "m2",
function = "f2",
args = vec![value2.clone()]
),
]);
let expected_trace = ExecutionTrace::from(states_vec.clone());
assert_eq!(actual_trace, expected_trace);
}

View File

@ -31,6 +31,7 @@ pub enum Instruction<'i> {
Ap(Ap<'i>), Ap(Ap<'i>),
ApMap(ApMap<'i>), ApMap(ApMap<'i>),
Canon(Canon<'i>), Canon(Canon<'i>),
CanonStreamMapScalar(CanonStreamMapScalar<'i>),
Seq(Seq<'i>), Seq(Seq<'i>),
Par(Par<'i>), Par(Par<'i>),
Xor(Xor<'i>), Xor(Xor<'i>),
@ -78,6 +79,14 @@ pub struct Canon<'i> {
pub canon_stream: CanonStream<'i>, pub canon_stream: CanonStream<'i>,
} }
/// (canon peer_id #stream_map scalar)
#[derive(Serialize, Debug, PartialEq, Eq)]
pub struct CanonStreamMapScalar<'i> {
pub peer_id: ResolvableToPeerIdVariable<'i>,
pub stream_map: StreamMap<'i>,
pub scalar: Scalar<'i>,
}
/// (seq instruction instruction) /// (seq instruction instruction)
#[derive(Serialize, Debug, PartialEq)] #[derive(Serialize, Debug, PartialEq)]
pub struct Seq<'i>(pub Box<Instruction<'i>>, pub Box<Instruction<'i>>); pub struct Seq<'i>(pub Box<Instruction<'i>>, pub Box<Instruction<'i>>);

View File

@ -56,6 +56,20 @@ impl<'i> Canon<'i> {
} }
} }
impl<'i> CanonStreamMapScalar<'i> {
pub fn new(
peer_id: ResolvableToPeerIdVariable<'i>,
stream_map: StreamMap<'i>,
scalar: Scalar<'i>,
) -> Self {
Self {
peer_id,
stream_map,
scalar,
}
}
}
impl<'i> Seq<'i> { impl<'i> Seq<'i> {
pub fn new( pub fn new(
left_instruction: Box<Instruction<'i>>, left_instruction: Box<Instruction<'i>>,

View File

@ -25,6 +25,7 @@ impl fmt::Display for Instruction<'_> {
match self { match self {
Call(call) => write!(f, "{call}"), Call(call) => write!(f, "{call}"),
Canon(canon) => write!(f, "{canon}"), Canon(canon) => write!(f, "{canon}"),
CanonStreamMapScalar(canon_stream_map_scalar) => write!(f, "{canon_stream_map_scalar}"),
Ap(ap) => write!(f, "{ap}"), Ap(ap) => write!(f, "{ap}"),
ApMap(ap_map) => write!(f, "{ap_map}"), ApMap(ap_map) => write!(f, "{ap_map}"),
Seq(seq) => write!(f, "{seq}"), Seq(seq) => write!(f, "{seq}"),
@ -64,6 +65,16 @@ impl fmt::Display for Canon<'_> {
} }
} }
impl fmt::Display for CanonStreamMapScalar<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"canon {} {} {}",
self.peer_id, self.stream_map, self.scalar
)
}
}
impl fmt::Display for Ap<'_> { impl fmt::Display for Ap<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ap {} {}", self.argument, self.result) write!(f, "ap {} {}", self.argument, self.result)

View File

@ -34,6 +34,15 @@ Instr: Box<Instruction<'input>> = {
Box::new(Instruction::Canon(canon)) Box::new(Instruction::Canon(canon))
}, },
<left: @L> "(" canon <peer_id:ResolvableToPeerIdVariable> <stream_map:StreamMapArgument> <scalar_pair:Scalar> ")" <right: @R> => {
let scalar = Scalar::new(scalar_pair.0, scalar_pair.1);
let canon = CanonStreamMapScalar::new(peer_id, stream_map, scalar);
let span = Span::new(left, right);
validator.met_canon_map_scalar(&canon, span);
Box::new(Instruction::CanonStreamMapScalar(canon))
},
<left: @L> "(" ap <arg:ApArgument> <result:ApResult> ")" <right: @R> => { <left: @L> "(" ap <arg:ApArgument> <result:ApResult> ")" <right: @R> => {
let apply = Ap::new(arg, result); let apply = Ap::new(arg, result);
@ -253,6 +262,10 @@ StreamArgument: Stream<'input> = {
<stream:Stream> => Stream::new(stream.0, stream.1), <stream:Stream> => Stream::new(stream.0, stream.1),
} }
StreamMapArgument: StreamMap<'input> = {
<stream_map:StreamMap> => StreamMap::new(stream_map.0, stream_map.1),
}
CanonStreamArgument: CanonStream<'input> = { CanonStreamArgument: CanonStream<'input> = {
<canon_stream:CanonStream> => CanonStream::new(canon_stream.0, canon_stream.1), <canon_stream:CanonStream> => CanonStream::new(canon_stream.0, canon_stream.1),
} }

File diff suppressed because it is too large Load Diff

View File

@ -58,3 +58,22 @@ fn canon_with_variable_peer_id() {
assert_eq!(actual, expected); assert_eq!(actual, expected);
} }
#[test]
fn canon_with_stream_map() {
let peer_id = "peer_id";
let stream_map = "%stream_map";
let scalar = "scalar";
let source_code = f!(r#"
(canon {peer_id} {stream_map} {scalar})
"#);
let actual = parse(&source_code);
let expected = canon_stream_map_scalar(
ResolvableToPeerIdVariable::Scalar(Scalar::new(peer_id, 16.into())),
StreamMap::new(stream_map, 24.into()),
Scalar::new(scalar, 36.into()),
);
assert_eq!(actual, expected, "{:#?} {:#?}", actual, expected);
}

View File

@ -223,6 +223,18 @@ pub(super) fn canon<'i>(
}) })
} }
pub(super) fn canon_stream_map_scalar<'i>(
peer_pk: ResolvableToPeerIdVariable<'i>,
stream_map: StreamMap<'i>,
scalar: Scalar<'i>,
) -> Instruction<'i> {
Instruction::CanonStreamMapScalar(CanonStreamMapScalar {
peer_id: peer_pk,
stream_map,
scalar,
})
}
pub(super) fn binary_instruction<'i, 'b>( pub(super) fn binary_instruction<'i, 'b>(
name: &'i str, name: &'i str,
) -> impl Fn(Instruction<'b>, Instruction<'b>) -> Instruction<'b> { ) -> impl Fn(Instruction<'b>, Instruction<'b>) -> Instruction<'b> {

View File

@ -83,12 +83,20 @@ impl<'i> VariableValidator<'i> {
}; };
} }
// canon doesn't check stream to be defined, because empty streams are considered to be emtpy // canon doesn't check stream to be defined, because empty streams are considered to be empty
// and it's useful for code generation // and it is useful for code generation
pub(super) fn met_canon(&mut self, canon: &Canon<'i>, span: Span) { pub(super) fn met_canon(&mut self, canon: &Canon<'i>, span: Span) {
self.met_variable_name_definition(canon.canon_stream.name, span); self.met_variable_name_definition(canon.canon_stream.name, span);
} }
pub(super) fn met_canon_map_scalar(
&mut self,
canon_stream_map_scalar: &CanonStreamMapScalar<'i>,
span: Span,
) {
self.met_variable_name_definition(canon_stream_map_scalar.scalar.name, span);
}
pub(super) fn met_match(&mut self, match_: &Match<'i>, span: Span) { pub(super) fn met_match(&mut self, match_: &Match<'i>, span: Span) {
self.met_matchable(&match_.left_value, span); self.met_matchable(&match_.left_value, span);
self.met_matchable(&match_.right_value, span); self.met_matchable(&match_.right_value, span);

View File

@ -127,6 +127,7 @@ impl<W: io::Write> Beautifier<W> {
ast::Instruction::Ap(ap) => self.beautify_simple(ap, indent), ast::Instruction::Ap(ap) => self.beautify_simple(ap, indent),
ast::Instruction::ApMap(ap_map) => self.beautify_simple(ap_map, indent), ast::Instruction::ApMap(ap_map) => self.beautify_simple(ap_map, indent),
ast::Instruction::Canon(canon) => self.beautify_simple(canon, indent), ast::Instruction::Canon(canon) => self.beautify_simple(canon, indent),
ast::Instruction::CanonStreamMapScalar(canon) => self.beautify_simple(canon, indent),
ast::Instruction::Seq(seq) => self.beautify_seq(seq, indent), ast::Instruction::Seq(seq) => self.beautify_seq(seq, indent),
ast::Instruction::Par(par) => self.beautify_par(par, indent), ast::Instruction::Par(par) => self.beautify_par(par, indent),
ast::Instruction::Xor(xor) => self.beautify_xor(xor, indent), ast::Instruction::Xor(xor) => self.beautify_xor(xor, indent),