mirror of
https://github.com/fluencelabs/aquavm
synced 2024-12-04 23:20:18 +00:00
Allow joining by index and key (#110)
This commit is contained in:
parent
6228fcc024
commit
519535ccb4
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -989,9 +989,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "jsonpath_lib-fl"
|
name = "jsonpath_lib-fl"
|
||||||
version = "0.2.5"
|
version = "0.3.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e81233a3c2e1f4579f1fdb856eeec115dcb23817374268212ebad691bd53e664"
|
checksum = "33dcf980221b25366e8f0df601cf0df6ffcc97242cbbe4d139a79a7f0de5107f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"array_tool",
|
"array_tool",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
|
@ -19,7 +19,7 @@ fluence = { version = "0.6.8", features = ["logger"] }
|
|||||||
serde = { version = "=1.0.118", features = [ "derive", "rc" ] }
|
serde = { version = "=1.0.118", features = [ "derive", "rc" ] }
|
||||||
serde_json = "=1.0.61"
|
serde_json = "=1.0.61"
|
||||||
|
|
||||||
jsonpath_lib-fl = "=0.2.5"
|
jsonpath_lib-fl = "=0.3.7"
|
||||||
|
|
||||||
boolinator = "2.4.0"
|
boolinator = "2.4.0"
|
||||||
log = "0.4.11"
|
log = "0.4.11"
|
||||||
|
@ -93,7 +93,7 @@ pub(crate) enum ExecutionError {
|
|||||||
MismatchWithoutXorError,
|
MismatchWithoutXorError,
|
||||||
|
|
||||||
/// This error type is produced by a mismatch to notify xor that compared values aren't equal.
|
/// This error type is produced by a mismatch to notify xor that compared values aren't equal.
|
||||||
#[error("jvalue '{0}' can't be flattened, to be flattened a jvalue should have an array type and consist only one value")]
|
#[error("jvalue '{0}' can't be flattened, to be flattened a jvalue should have an array type and consist of zero or one values")]
|
||||||
FlatteningError(JValue),
|
FlatteningError(JValue),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ fn prepare_json_path<'i>(
|
|||||||
let (jvalue, tetraplets) = resolved.apply_json_path_with_tetraplets(json_path)?;
|
let (jvalue, tetraplets) = resolved.apply_json_path_with_tetraplets(json_path)?;
|
||||||
|
|
||||||
let jvalue = if should_flatten {
|
let jvalue = if should_flatten {
|
||||||
if jvalue.len() != 1 {
|
if jvalue.len() > 1 {
|
||||||
let jvalue = jvalue.into_iter().cloned().collect::<Vec<_>>();
|
let jvalue = jvalue.into_iter().cloned().collect::<Vec<_>>();
|
||||||
return crate::exec_err!(ExecutionError::FlatteningError(JValue::Array(jvalue)));
|
return crate::exec_err!(ExecutionError::FlatteningError(JValue::Array(jvalue)));
|
||||||
}
|
}
|
||||||
|
@ -161,6 +161,37 @@ fn flattening_streams() {
|
|||||||
assert_eq!(closure_call_args.args_var, Rc::new(RefCell::new(vec![0, 1])));
|
assert_eq!(closure_call_args.args_var, Rc::new(RefCell::new(vec![0, 1])));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn flattening_empty_values() {
|
||||||
|
let stream_value = json!(
|
||||||
|
{"args": []}
|
||||||
|
);
|
||||||
|
|
||||||
|
let stream_value = serde_json::to_string(&stream_value).expect("the default serializer shouldn't fail");
|
||||||
|
let set_variable_peer_id = "set_variable";
|
||||||
|
let mut set_variable_vm = create_avm(set_variable_call_service(stream_value), set_variable_peer_id);
|
||||||
|
|
||||||
|
let closure_call_args = ClosureCallArgs::default();
|
||||||
|
let local_peer_id = "local_peer_id";
|
||||||
|
let mut local_vm = create_avm(create_check_service_closure(closure_call_args.clone()), local_peer_id);
|
||||||
|
|
||||||
|
let script = format!(
|
||||||
|
r#"
|
||||||
|
(seq
|
||||||
|
(call "{0}" ("" "") [] $stream)
|
||||||
|
(call "{1}" ("" "") [$stream.$.args!]) ; here $stream.$.args returns an empty array
|
||||||
|
)
|
||||||
|
"#,
|
||||||
|
set_variable_peer_id, local_peer_id
|
||||||
|
);
|
||||||
|
|
||||||
|
let res = call_vm!(set_variable_vm, "asd", script.clone(), "", "");
|
||||||
|
let res = call_vm!(local_vm, "asd", script.clone(), "", res.data);
|
||||||
|
|
||||||
|
assert_eq!(res.ret_code, 0);
|
||||||
|
assert_eq!(closure_call_args.args_var, Rc::new(RefCell::new(vec![])));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_handling_non_flattening_values() {
|
fn test_handling_non_flattening_values() {
|
||||||
let stream_value = json!(
|
let stream_value = json!(
|
||||||
@ -203,7 +234,7 @@ fn test_handling_non_flattening_values() {
|
|||||||
assert_eq!(
|
assert_eq!(
|
||||||
res.error_message,
|
res.error_message,
|
||||||
String::from(
|
String::from(
|
||||||
r#"jvalue '[{"peer_id":"local_peer_id","service_id":"local_service_id","function_name":"local_function_name","args":[0,1]},{"peer_id":"local_peer_id","service_id":"local_service_id","function_name":"local_function_name","args":[0,1]},{"peer_id":"local_peer_id","service_id":"local_service_id","function_name":"local_function_name","args":[0,1]}]' can't be flattened, to be flattened a jvalue should have an array type and consist only one value"#
|
r#"jvalue '[{"peer_id":"local_peer_id","service_id":"local_service_id","function_name":"local_function_name","args":[0,1]},{"peer_id":"local_peer_id","service_id":"local_service_id","function_name":"local_function_name","args":[0,1]},{"peer_id":"local_peer_id","service_id":"local_service_id","function_name":"local_function_name","args":[0,1]}]' can't be flattened, to be flattened a jvalue should have an array type and consist of zero or one values"#
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -70,3 +70,39 @@ fn non_wait_on_json_path() {
|
|||||||
|
|
||||||
assert_eq!(res.next_peer_pks, vec![init_peer_id.to_string()]);
|
assert_eq!(res.next_peer_pks, vec![init_peer_id.to_string()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn wait_on_stream_json_path_by_id() {
|
||||||
|
let local_peer_id = "local_peer_id";
|
||||||
|
let mut local_vm = create_avm(unit_call_service(), local_peer_id);
|
||||||
|
|
||||||
|
let non_join_stream_script = format!(
|
||||||
|
r#"
|
||||||
|
(par
|
||||||
|
(call "{0}" ("" "") [] $status)
|
||||||
|
(call "{0}" ("history" "add") [$status.$[0]!])
|
||||||
|
)"#,
|
||||||
|
local_peer_id
|
||||||
|
);
|
||||||
|
|
||||||
|
let res = call_vm!(local_vm, "", non_join_stream_script, "", "");
|
||||||
|
let trace: ExecutionTrace = serde_json::from_slice(&res.data).expect("should be valid json");
|
||||||
|
|
||||||
|
assert_eq!(res.ret_code, 0);
|
||||||
|
assert_eq!(trace.len(), 3);
|
||||||
|
|
||||||
|
let join_stream_script = format!(
|
||||||
|
r#"
|
||||||
|
(par
|
||||||
|
(call "{0}" ("" "") [] $status)
|
||||||
|
(call "{0}" ("history" "add") [$status.$[1]!]) ; $status stream here has only one value
|
||||||
|
)"#,
|
||||||
|
local_peer_id
|
||||||
|
);
|
||||||
|
|
||||||
|
let res = call_vm!(local_vm, "", join_stream_script, "", "");
|
||||||
|
let trace: ExecutionTrace = serde_json::from_slice(&res.data).expect("should be valid json");
|
||||||
|
|
||||||
|
assert_eq!(res.ret_code, 0);
|
||||||
|
assert_eq!(trace.len(), 2); // par and the first call emit traces, second call doesn't
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user