mirror of
https://github.com/fluencelabs/aquavm
synced 2024-12-04 15:20:16 +00:00
Implement scalars in lambda for streams (#212)
This commit is contained in:
parent
58aef82b1e
commit
4f90f194c7
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -214,7 +214,7 @@ version = "0.1.0"
|
||||
|
||||
[[package]]
|
||||
name = "avm-server"
|
||||
version = "0.15.0"
|
||||
version = "0.14.0"
|
||||
dependencies = [
|
||||
"air-interpreter-interface",
|
||||
"avm-data-store",
|
||||
|
@ -128,11 +128,11 @@ fn create_scalar_lambda_iterable<'ctx>(
|
||||
scalar_name: &str,
|
||||
lambda: &LambdaAST<'_>,
|
||||
) -> ExecutionResult<FoldIterableScalar> {
|
||||
use crate::execution_step::lambda_applier::select;
|
||||
use crate::execution_step::lambda_applier::select_from_scalar;
|
||||
|
||||
match exec_ctx.scalars.get(scalar_name)? {
|
||||
ScalarRef::Value(variable) => {
|
||||
let jvalues = select(&variable.result, lambda.iter(), exec_ctx)?;
|
||||
let jvalues = select_from_scalar(&variable.result, lambda.iter(), exec_ctx)?;
|
||||
from_jvalue(jvalues, variable.tetraplet.clone(), lambda)
|
||||
}
|
||||
ScalarRef::IterableValue(fold_state) => {
|
||||
|
@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use super::select;
|
||||
use super::select_from_scalar;
|
||||
use super::ExecutionResult;
|
||||
use super::IterableItem;
|
||||
use super::JValuable;
|
||||
@ -37,7 +37,7 @@ impl<'ctx> JValuable for IterableItem<'ctx> {
|
||||
RcValue((jvalue, ..)) => jvalue.deref(),
|
||||
};
|
||||
|
||||
let selected_value = select(jvalue, lambda.iter(), exec_ctx)?;
|
||||
let selected_value = select_from_scalar(jvalue, lambda.iter(), exec_ctx)?;
|
||||
Ok(selected_value)
|
||||
}
|
||||
|
||||
@ -54,7 +54,7 @@ impl<'ctx> JValuable for IterableItem<'ctx> {
|
||||
RcValue((jvalue, tetraplet, _)) => (jvalue.deref(), tetraplet),
|
||||
};
|
||||
|
||||
let selected_value = select(jvalue, lambda.iter(), exec_ctx)?;
|
||||
let selected_value = select_from_scalar(jvalue, lambda.iter(), exec_ctx)?;
|
||||
Ok((selected_value, tetraplet.clone()))
|
||||
}
|
||||
|
||||
|
@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use super::select;
|
||||
use super::select_from_scalar;
|
||||
use super::ExecutionResult;
|
||||
use super::JValuable;
|
||||
use super::LambdaAST;
|
||||
@ -31,7 +31,7 @@ use std::ops::Deref;
|
||||
|
||||
impl JValuable for ValueAggregate {
|
||||
fn apply_lambda<'i>(&self, lambda: &LambdaAST<'_>, exec_ctx: &ExecutionCtx<'i>) -> ExecutionResult<&JValue> {
|
||||
let selected_value = select(&self.result, lambda.iter(), exec_ctx)?;
|
||||
let selected_value = select_from_scalar(&self.result, lambda.iter(), exec_ctx)?;
|
||||
Ok(selected_value)
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ impl JValuable for ValueAggregate {
|
||||
lambda: &LambdaAST<'_>,
|
||||
exec_ctx: &ExecutionCtx<'i>,
|
||||
) -> ExecutionResult<(&JValue, RSecurityTetraplet)> {
|
||||
let selected_value = select(&self.result, lambda.iter(), exec_ctx)?;
|
||||
let selected_value = select_from_scalar(&self.result, lambda.iter(), exec_ctx)?;
|
||||
let tetraplet = self.tetraplet.clone();
|
||||
tetraplet.borrow_mut().add_lambda(&format_ast(lambda));
|
||||
|
||||
|
@ -34,17 +34,19 @@ pub(crate) fn select_from_stream<'value, 'i>(
|
||||
lambda: &LambdaAST<'_>,
|
||||
exec_ctx: &ExecutionCtx<'i>,
|
||||
) -> ExecutionResult<StreamSelectResult<'value>> {
|
||||
use ValueAccessor::*;
|
||||
|
||||
let (prefix, body) = lambda.split_first();
|
||||
let idx = match prefix {
|
||||
ArrayAccess { idx } => *idx,
|
||||
FieldAccessByName { field_name } => {
|
||||
ValueAccessor::ArrayAccess { idx } => *idx,
|
||||
ValueAccessor::FieldAccessByName { field_name } => {
|
||||
return lambda_to_execution_error!(Err(LambdaError::FieldAccessorAppliedToStream {
|
||||
field_name: field_name.to_string(),
|
||||
}));
|
||||
}
|
||||
_ => unreachable!("should not execute if parsing succeeded. QED."),
|
||||
ValueAccessor::FieldAccessByScalar { scalar_name } => {
|
||||
let scalar = exec_ctx.scalars.get(scalar_name)?;
|
||||
lambda_to_execution_error!(try_scalar_ref_as_idx(scalar))?
|
||||
}
|
||||
ValueAccessor::Error => unreachable!("should not execute if parsing succeeded. QED."),
|
||||
};
|
||||
|
||||
let stream_size = stream.len();
|
||||
@ -53,12 +55,12 @@ pub(crate) fn select_from_stream<'value, 'i>(
|
||||
.nth(idx as usize)
|
||||
.ok_or(LambdaError::StreamNotHaveEnoughValues { stream_size, idx }))?;
|
||||
|
||||
let result = select(value, body.iter(), exec_ctx)?;
|
||||
let result = select_from_scalar(value, body.iter(), exec_ctx)?;
|
||||
let select_result = StreamSelectResult::new(result, idx);
|
||||
Ok(select_result)
|
||||
}
|
||||
|
||||
pub(crate) fn select<'value, 'accessor, 'i>(
|
||||
pub(crate) fn select_from_scalar<'value, 'accessor, 'i>(
|
||||
mut value: &'value JValue,
|
||||
lambda: impl Iterator<Item = &'accessor ValueAccessor<'accessor>>,
|
||||
exec_ctx: &ExecutionCtx<'i>,
|
||||
|
@ -48,4 +48,7 @@ pub enum LambdaError {
|
||||
|
||||
#[error("scalar accessor `{scalar_accessor}` should has number or string type")]
|
||||
ScalarAccessorHasInvalidType { scalar_accessor: JValue },
|
||||
|
||||
#[error("stream accessor `{scalar_accessor}` should has number (u32) type")]
|
||||
StreamAccessorHasInvalidType { scalar_accessor: JValue },
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ pub use errors::LambdaError;
|
||||
|
||||
pub(crate) type LambdaResult<T> = std::result::Result<T, LambdaError>;
|
||||
|
||||
pub(crate) use applier::select;
|
||||
pub(crate) use applier::select_from_scalar;
|
||||
pub(crate) use applier::select_from_stream;
|
||||
|
||||
#[macro_export]
|
||||
|
@ -68,6 +68,17 @@ pub(super) fn select_by_scalar<'value, 'i>(
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn try_scalar_ref_as_idx(scalar: ScalarRef<'_>) -> LambdaResult<u32> {
|
||||
match scalar {
|
||||
ScalarRef::Value(accessor) => try_jvalue_as_idx(&accessor.result),
|
||||
ScalarRef::IterableValue(accessor) => {
|
||||
// it's safe because iterable always point to valid value
|
||||
let accessor = accessor.iterable.peek().unwrap().into_resolved_result();
|
||||
try_jvalue_as_idx(&accessor.result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn select_by_jvalue<'value>(value: &'value JValue, accessor: &JValue) -> LambdaResult<&'value JValue> {
|
||||
match accessor {
|
||||
JValue::String(string_accessor) => try_jvalue_with_field_name(value, string_accessor),
|
||||
@ -81,6 +92,15 @@ fn select_by_jvalue<'value>(value: &'value JValue, accessor: &JValue) -> LambdaR
|
||||
}
|
||||
}
|
||||
|
||||
fn try_jvalue_as_idx(jvalue: &JValue) -> LambdaResult<u32> {
|
||||
match jvalue {
|
||||
JValue::Number(number) => try_number_to_u32(number),
|
||||
scalar_accessor => Err(LambdaError::StreamAccessorHasInvalidType {
|
||||
scalar_accessor: scalar_accessor.clone(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn try_number_to_u32(accessor: &serde_json::Number) -> LambdaResult<u32> {
|
||||
use std::convert::TryFrom;
|
||||
|
||||
|
@ -26,7 +26,7 @@ use crate::SecurityTetraplet;
|
||||
|
||||
use air_parser::ast;
|
||||
|
||||
use crate::execution_step::lambda_applier::select;
|
||||
use crate::execution_step::lambda_applier::select_from_scalar;
|
||||
use serde_json::json;
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
@ -71,7 +71,7 @@ pub(crate) fn prepare_last_error<'i>(
|
||||
let LastError { error, tetraplet } = ctx.last_error();
|
||||
|
||||
let jvalue = match error_accessor {
|
||||
Some(error_accessor) => select(error.as_ref(), error_accessor.iter(), ctx)?,
|
||||
Some(error_accessor) => select_from_scalar(error.as_ref(), error_accessor.iter(), ctx)?,
|
||||
None => error.as_ref(),
|
||||
};
|
||||
|
||||
|
@ -83,7 +83,7 @@ fn lambda_with_string_scalar() {
|
||||
fn lambda_with_number_scalar() {
|
||||
let set_variable_peer_id = "set_variable";
|
||||
let variables = maplit::hashmap! {
|
||||
"string_accessor".to_string() => json!(1u32),
|
||||
"number_accessor".to_string() => json!(1u32),
|
||||
"value".to_string() => json!([0, 1, 2])
|
||||
};
|
||||
let mut set_variable_vm = create_avm(
|
||||
@ -97,10 +97,10 @@ fn lambda_with_number_scalar() {
|
||||
let script = f!(r#"
|
||||
(seq
|
||||
(seq
|
||||
(call "{set_variable_peer_id}" ("" "string_accessor") [] string_accessor)
|
||||
(call "{set_variable_peer_id}" ("" "number_accessor") [] number_accessor)
|
||||
(call "{set_variable_peer_id}" ("" "value") [] value)
|
||||
)
|
||||
(call "{local_peer_id}" ("" "") [value.$.[string_accessor]])
|
||||
(call "{local_peer_id}" ("" "") [value.$.[number_accessor]])
|
||||
)
|
||||
"#);
|
||||
|
||||
@ -111,6 +111,91 @@ fn lambda_with_number_scalar() {
|
||||
assert_eq!(&trace[2], &executed_state::scalar_number(1u32));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lambda_with_number_stream() {
|
||||
let set_variable_peer_id = "set_variable";
|
||||
let variables = maplit::hashmap! {
|
||||
"number_accessor".to_string() => json!(1),
|
||||
"iterable".to_string() => json!([1,2,3]),
|
||||
};
|
||||
let mut set_variable_vm = create_avm(
|
||||
set_variables_call_service(variables, VariableOptionSource::FunctionName),
|
||||
set_variable_peer_id,
|
||||
);
|
||||
|
||||
let local_peer_id = "local_peer_id";
|
||||
let mut local_vm = create_avm(echo_call_service(), local_peer_id);
|
||||
|
||||
let script = f!(r#"
|
||||
(seq
|
||||
(seq
|
||||
(call "{set_variable_peer_id}" ("" "number_accessor") [] number_accessor)
|
||||
(seq
|
||||
(call "{set_variable_peer_id}" ("" "iterable") [] iterable)
|
||||
(fold iterable iterator
|
||||
(seq
|
||||
(call "{local_peer_id}" ("" "") [iterator] $stream)
|
||||
(next iterator)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
(call "{local_peer_id}" ("" "") [$stream.$.[number_accessor]])
|
||||
)
|
||||
"#);
|
||||
|
||||
let result = checked_call_vm!(set_variable_vm, "asd", &script, "", "");
|
||||
let result = checked_call_vm!(local_vm, "asd", script, "", result.data);
|
||||
let actual_trace = trace_from_result(&result);
|
||||
|
||||
assert_eq!(&actual_trace[5], &executed_state::scalar_number(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lambda_with_number_stream_and_followed_scalar() {
|
||||
let set_variable_peer_id = "set_variable";
|
||||
let checkable_value = 1337;
|
||||
let variables = maplit::hashmap! {
|
||||
"number_accessor".to_string() => json!(1),
|
||||
"iterable".to_string() => json!([1,2,3]),
|
||||
"value".to_string() => json!({"field_1": checkable_value, "field_2": 31337}),
|
||||
};
|
||||
let mut set_variable_vm = create_avm(
|
||||
set_variables_call_service(variables, VariableOptionSource::FunctionName),
|
||||
set_variable_peer_id,
|
||||
);
|
||||
|
||||
let local_peer_id = "local_peer_id";
|
||||
let mut local_vm = create_avm(echo_call_service(), local_peer_id);
|
||||
|
||||
let script = f!(r#"
|
||||
(seq
|
||||
(seq
|
||||
(seq
|
||||
(call "{set_variable_peer_id}" ("" "number_accessor") [] number_accessor)
|
||||
(call "{set_variable_peer_id}" ("" "value") [] value)
|
||||
)
|
||||
(seq
|
||||
(call "{set_variable_peer_id}" ("" "iterable") [] iterable)
|
||||
(fold iterable iterator
|
||||
(seq
|
||||
(call "{local_peer_id}" ("" "") [value] $stream) ;; place 3 complex values in a stream
|
||||
(next iterator)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
(call "{local_peer_id}" ("" "") [$stream.$.[number_accessor].field_1]) ;; get the 2nd value and then access its field
|
||||
)
|
||||
"#);
|
||||
|
||||
let result = checked_call_vm!(set_variable_vm, "asd", &script, "", "");
|
||||
let result = checked_call_vm!(local_vm, "asd", script, "", result.data);
|
||||
let actual_trace = trace_from_result(&result);
|
||||
|
||||
assert_eq!(&actual_trace[6], &executed_state::scalar_number(checkable_value));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lambda_with_scalar_join() {
|
||||
let set_variable_peer_id = "set_variable";
|
||||
@ -142,3 +227,43 @@ fn lambda_with_scalar_join() {
|
||||
|
||||
assert_eq!(&trace[3], &executed_state::request_sent_by("set_variable"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lambda_with_stream_join() {
|
||||
let set_variable_peer_id = "set_variable";
|
||||
let variables = maplit::hashmap! {
|
||||
"number_accessor".to_string() => json!(1),
|
||||
"iterable".to_string() => json!([1,2,3]),
|
||||
};
|
||||
let mut set_variable_vm = create_avm(
|
||||
set_variables_call_service(variables, VariableOptionSource::FunctionName),
|
||||
set_variable_peer_id,
|
||||
);
|
||||
|
||||
let local_peer_id = "local_peer_id";
|
||||
let mut local_vm = create_avm(echo_call_service(), local_peer_id);
|
||||
|
||||
let script = f!(r#"
|
||||
(seq
|
||||
(par
|
||||
(call "non_exist_peer_id" ("" "number_accessor") [] number_accessor)
|
||||
(seq
|
||||
(call "{set_variable_peer_id}" ("" "iterable") [] iterable)
|
||||
(fold iterable iterator
|
||||
(seq
|
||||
(ap "value" $stream)
|
||||
(next iterator)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
(call "{local_peer_id}" ("" "") [$stream.$.[number_accessor]])
|
||||
)
|
||||
"#);
|
||||
|
||||
let result = checked_call_vm!(set_variable_vm, "asd", &script, "", "");
|
||||
let result = checked_call_vm!(local_vm, "asd", script, "", result.data);
|
||||
let actual_trace = trace_from_result(&result);
|
||||
|
||||
assert_eq!(&actual_trace[6], &executed_state::request_sent_by("set_variable"));
|
||||
}
|
||||
|
90
air/tests/test_module/issues/issue_211.rs
Normal file
90
air/tests/test_module/issues/issue_211.rs
Normal file
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright 2021 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use air_test_utils::prelude::*;
|
||||
|
||||
use fstrings::f;
|
||||
use fstrings::format_args_f;
|
||||
|
||||
#[test]
|
||||
// test for github.com/fluencelabs/aquavm/issues/211
|
||||
// On the versions < 0.20.1 it just crashes
|
||||
fn issue_211() {
|
||||
let peer_1_id = "peer_1_id";
|
||||
let variables_mapping = maplit::hashmap! {
|
||||
"idx".to_string() => json!(2),
|
||||
"nodes".to_string() => json!([1,2,3]),
|
||||
};
|
||||
|
||||
let mut peer_1 = create_avm(
|
||||
set_variables_call_service(variables_mapping, VariableOptionSource::FunctionName),
|
||||
peer_1_id,
|
||||
);
|
||||
|
||||
let script = f!(r#"
|
||||
(xor
|
||||
(seq
|
||||
(seq
|
||||
(seq
|
||||
(seq
|
||||
(null)
|
||||
(call %init_peer_id% ("getDataSrv" "idx") [] idx)
|
||||
)
|
||||
(call %init_peer_id% ("getDataSrv" "nodes") [] nodes)
|
||||
)
|
||||
(new $nodes2
|
||||
(seq
|
||||
(seq
|
||||
(par
|
||||
(fold nodes node
|
||||
(par
|
||||
(ap node $nodes2)
|
||||
(next node)
|
||||
)
|
||||
)
|
||||
(null)
|
||||
)
|
||||
(call %init_peer_id% ("op" "noop") [$nodes2.$.[idx]! nodes])
|
||||
)
|
||||
(call %init_peer_id% ("op" "identity") [$nodes2] nodes2-fix)
|
||||
)
|
||||
)
|
||||
)
|
||||
(null)
|
||||
)
|
||||
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
|
||||
)
|
||||
"#);
|
||||
|
||||
let result = checked_call_vm!(peer_1, peer_1_id, &script, "", "");
|
||||
|
||||
let expected_trace = vec![
|
||||
executed_state::scalar_number(2),
|
||||
executed_state::scalar(json!([1, 2, 3])),
|
||||
executed_state::par(6, 0),
|
||||
executed_state::par(1, 4),
|
||||
executed_state::ap(Some(0)),
|
||||
executed_state::par(1, 2),
|
||||
executed_state::ap(Some(0)),
|
||||
executed_state::par(1, 0),
|
||||
executed_state::ap(Some(0)),
|
||||
executed_state::scalar_string("default result from set_variables_call_service"),
|
||||
executed_state::scalar_string("default result from set_variables_call_service"),
|
||||
];
|
||||
|
||||
let actual_trace = trace_from_result(&result);
|
||||
assert_eq!(actual_trace, expected_trace);
|
||||
}
|
@ -20,3 +20,4 @@ mod issue_177;
|
||||
mod issue_178;
|
||||
mod issue_180;
|
||||
mod issue_206;
|
||||
mod issue_211;
|
||||
|
Loading…
Reference in New Issue
Block a user