fix(trace-handler): fix fold and canon compatibility (#357)

Fixes bug of traces divergence when `canon` is used inside `fold`.

Closes #356.
This commit is contained in:
Mike Voronov 2022-10-09 12:56:12 +03:00 committed by GitHub
parent cd598c28ae
commit 910f1665eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 741 additions and 311 deletions

223
Cargo.lock generated
View File

@ -4,9 +4,9 @@ version = 3
[[package]]
name = "aho-corasick"
version = "0.7.18"
version = "0.7.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e"
dependencies = [
"memchr",
]
@ -63,7 +63,7 @@ version = "0.1.1"
dependencies = [
"air-beautifier",
"anyhow",
"clap 4.0.2",
"clap 4.0.11",
]
[[package]]
@ -104,7 +104,7 @@ dependencies = [
[[package]]
name = "air-interpreter-interface"
version = "0.11.1"
version = "0.11.2"
dependencies = [
"fluence-it-types",
"marine-rs-sdk",
@ -203,7 +203,7 @@ dependencies = [
"anyhow",
"avm-data-store",
"avm-interface",
"clap 4.0.2",
"clap 4.0.11",
"itertools",
"serde",
"serde_json",
@ -418,9 +418,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.10.0"
version = "3.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3"
checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d"
[[package]]
name = "bytecount"
@ -535,9 +535,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.0.2"
version = "4.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c9484ccdc4cb8e7b117cbd0eb150c7c0f04464854e4679aeb50ef03b32d003"
checksum = "4ed45cc2c62a3eff523e718d8576ba762c83a3146151093283ac62ae11933a73"
dependencies = [
"atty",
"bitflags",
@ -550,9 +550,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.0.1"
version = "4.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca689d7434ce44517a12a89456b2be4d1ea1cafcd8f581978c03d45f5a5c12a7"
checksum = "db342ce9fda24fb191e2ed4e102055a4d381c1086a06630174cd8da8d5d917ce"
dependencies = [
"heck 0.4.0",
"proc-macro-error",
@ -784,26 +784,24 @@ dependencies = [
[[package]]
name = "crossbeam-epoch"
version = "0.9.10"
version = "0.9.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "045ebe27666471bb549370b4b0b3e51b07f56325befa4284db65fc89c02511b1"
checksum = "f916dfc5d356b0ed9dae65f1db9fc9770aa2851d2662b988ccf4fe3516e86348"
dependencies = [
"autocfg",
"cfg-if 1.0.0",
"crossbeam-utils",
"memoffset",
"once_cell",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.11"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc"
checksum = "edbafec5fa1f196ca66527c1b12c2ec4745ca14b50f1ad8f9f6f720b55d11fac"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
]
[[package]]
@ -854,6 +852,50 @@ dependencies = [
"syn",
]
[[package]]
name = "cxx"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f39818dcfc97d45b03953c1292efc4e80954e1583c4aa770bac1383e2310a4"
dependencies = [
"cc",
"cxxbridge-flags",
"cxxbridge-macro",
"link-cplusplus",
]
[[package]]
name = "cxx-build"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e580d70777c116df50c390d1211993f62d40302881e54d4b79727acb83d0199"
dependencies = [
"cc",
"codespan-reporting",
"once_cell",
"proc-macro2",
"quote",
"scratch",
"syn",
]
[[package]]
name = "cxxbridge-flags"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56a46460b88d1cec95112c8c363f0e2c39afdb237f60583b0b36343bf627ea9c"
[[package]]
name = "cxxbridge-macro"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "747b608fecf06b0d72d440f27acc99288207324b793be2c17991839f3d4995ea"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "darling"
version = "0.14.1"
@ -942,9 +984,9 @@ dependencies = [
[[package]]
name = "either"
version = "1.7.0"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f107b87b6afc2a64fd13cac55fe06d6c8859f12d4b14cbcdd2c67d0976781be"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
[[package]]
name = "ena"
@ -970,9 +1012,9 @@ dependencies = [
[[package]]
name = "erased-serde"
version = "0.3.22"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "003000e712ad0f95857bd4d2ef8d1890069e06554101697d12050668b2f6f020"
checksum = "54558e0ba96fbe24280072642eceb9d7d442e32c7ec0ea9e7ecd7b4ea2cf4e11"
dependencies = [
"serde",
]
@ -1197,18 +1239,28 @@ dependencies = [
[[package]]
name = "iana-time-zone"
version = "0.1.48"
version = "0.1.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a0714f28b1ee39ccec0770ccb544eb02c9ef2c82bb096230eefcffa6468b0"
checksum = "f5a6ef98976b22b3b7f2f3a806f858cb862044cfa66805aa3ad84cb3d3b785ed"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"once_cell",
"wasm-bindgen",
"winapi",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde6edd6cef363e9359ed3c98ba64590ba9eecba2293eb5a723ab32aee8926aa"
dependencies = [
"cxx",
"cxx-build",
]
[[package]]
name = "id-arena"
version = "2.2.1"
@ -1300,9 +1352,9 @@ checksum = "729c74bb4236418898a219c6d96f14cba77456dd7c04a2e99e65e9c643709b56"
[[package]]
name = "itertools"
version = "0.10.4"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8bf247779e67a9082a4790b45e71ac7cfd1321331a5c856a74a9faebdab78d0"
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
dependencies = [
"either",
]
@ -1315,9 +1367,9 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "itoa"
version = "1.0.3"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754"
checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
[[package]]
name = "js-sys"
@ -1387,9 +1439,18 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.127"
version = "0.2.134"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "505e71a4706fa491e9b1b55f51b95d4037d0821ee40131190475f692b35b009b"
checksum = "329c933548736bc49fd575ee68c89e8be4d260064184389a5b77517cddd99ffb"
[[package]]
name = "link-cplusplus"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369"
dependencies = [
"cc",
]
[[package]]
name = "lock_api"
@ -1402,9 +1463,9 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.7"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
dependencies = [
"autocfg",
"scopeguard",
@ -1795,9 +1856,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.14.0"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f7254b99e31cad77da24b08ebf628882739a608578bb1bcdfc1f9c21260d7c0"
checksum = "e82dad04139b71a90c080c8463fe0dc7902db5192d939bd0950f074d014339e1"
[[package]]
name = "oorandom"
@ -1807,9 +1868,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "os_str_bytes"
version = "6.2.0"
version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "648001efe5d5c0102d8cea768e348da85d90af8ba91f0bea908f157951493cd4"
checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
[[package]]
name = "output_vt100"
@ -1859,7 +1920,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api 0.4.7",
"lock_api 0.4.9",
"parking_lot_core 0.8.5",
]
@ -1869,7 +1930,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api 0.4.7",
"lock_api 0.4.9",
"parking_lot_core 0.9.3",
]
@ -1953,9 +2014,9 @@ checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "plotters"
version = "0.3.3"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "716b4eeb6c4a1d3ecc956f75b43ec2e8e8ba80026413e70a3f41fd3313d3492b"
checksum = "2538b639e642295546c50fcd545198c9d64ee2a38620a628724a3b266d5fbf97"
dependencies = [
"num-traits",
"plotters-backend",
@ -1972,9 +2033,9 @@ checksum = "193228616381fecdc1224c62e96946dfbc73ff4384fba576e052ff8c1bea8142"
[[package]]
name = "plotters-svg"
version = "0.3.2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0918736323d1baff32ee0eade54984f6f201ad7e97d5cfb5d6ab4a358529615"
checksum = "f9a81d2759aae1dae668f783c308bc5c8ebd191ff4184aaa1b37f65a6ae5a56f"
dependencies = [
"plotters-backend",
]
@ -2038,9 +2099,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro2"
version = "1.0.43"
version = "1.0.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab"
checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b"
dependencies = [
"unicode-ident",
]
@ -2200,6 +2261,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898"
[[package]]
name = "semver"
version = "0.9.0"
@ -2226,9 +2293,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.144"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860"
checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b"
dependencies = [
"serde_derive",
]
@ -2264,9 +2331,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.144"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00"
checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c"
dependencies = [
"proc-macro2",
"quote",
@ -2279,7 +2346,7 @@ version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e55a28e3aaef9d5ce0506d0a14dbba8054ddc7e499ef522dd8b26859ec9d4a44"
dependencies = [
"itoa 1.0.3",
"itoa 1.0.4",
"ryu",
"serde",
]
@ -2297,7 +2364,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with_macros",
"time 0.3.11",
"time 0.3.15",
]
[[package]]
@ -2329,9 +2396,9 @@ checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "smallvec"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "static_assertions"
@ -2388,9 +2455,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "1.0.99"
version = "1.0.102"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13"
checksum = "3fcd952facd492f9be3ef0d0b7032a6e442ee9b361d4acc2b1d0c4aaa5f613a1"
dependencies = [
"proc-macro2",
"quote",
@ -2440,18 +2507,18 @@ checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16"
[[package]]
name = "thiserror"
version = "1.0.35"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c53f98874615aea268107765aa1ed8f6116782501d18e53d08b471733bea6c85"
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.35"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8b463991b4eab2d801e724172285ec4195c650e8ec79b149e6c2a8e6dd3f783"
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
dependencies = [
"proc-macro2",
"quote",
@ -2480,11 +2547,11 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.11"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72c91f41dcb2f096c05f0873d667dceec1087ce5bcf984ec8ffb19acddbb3217"
checksum = "d634a985c4d4238ec39cacaed2e7ae552fbd3c476b552c1deac3021b7d7eaf0c"
dependencies = [
"itoa 1.0.3",
"itoa 1.0.4",
"libc",
"num_threads",
"serde",
@ -2520,9 +2587,9 @@ dependencies = [
[[package]]
name = "tracing"
version = "0.1.36"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",
@ -2532,9 +2599,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.22"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2"
checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a"
dependencies = [
"proc-macro2",
"quote",
@ -2543,9 +2610,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.29"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7"
checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
dependencies = [
"once_cell",
"valuable",
@ -2563,9 +2630,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60db860322da191b40952ad9affe65ea23e7dd6a5c442c2c42865810c6ab8e6b"
checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70"
dependencies = [
"matchers",
"once_cell",
@ -2575,7 +2642,7 @@ dependencies = [
"sharded-slab",
"smallvec",
"thread_local",
"time 0.3.11",
"time 0.3.15",
"tracing",
"tracing-core",
"tracing-serde",
@ -2613,27 +2680,27 @@ dependencies = [
[[package]]
name = "unicode-ident"
version = "1.0.3"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
[[package]]
name = "unicode-segmentation"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a"
[[package]]
name = "unicode-width"
version = "0.1.9"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973"
checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b"
[[package]]
name = "unicode-xid"
version = "0.2.3"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "valuable"

View File

@ -79,11 +79,8 @@ fn populate_context<'ctx>(
match ap_result {
ast::ApResult::Scalar(scalar) => exec_ctx.scalars.set_scalar_value(scalar.name, result).map(|_| None),
ast::ApResult::Stream(stream) => {
let generation = ap_result_to_generation(merger_ap_result);
exec_ctx
.streams
.add_stream_value(result, generation, stream.name, stream.position)
.map(Some)
let value_descriptor = generate_value_descriptor(result, stream, merger_ap_result);
exec_ctx.streams.add_stream_value(value_descriptor).map(Some)
}
}
}

View File

@ -15,27 +15,39 @@
*/
use super::ExecutionResult;
use crate::execution_step::execution_context::StreamValueDescriptor;
use crate::execution_step::Generation;
use crate::execution_step::ValueAggregate;
use air_parser::ast;
use air_parser::ast::Ap;
use air_trace_handler::merger::MergerApResult;
pub(super) fn ap_result_to_generation(ap_result: &MergerApResult) -> Generation {
pub(super) fn generate_value_descriptor<'stream>(
value: ValueAggregate,
stream: &'stream ast::Stream<'_>,
ap_result: &MergerApResult,
) -> StreamValueDescriptor<'stream> {
use air_trace_handler::merger::ValueSource;
let met_result = match ap_result {
MergerApResult::NotMet => return Generation::Last,
MergerApResult::Met(met_result) => met_result,
};
match met_result.value_source {
ValueSource::PreviousData => Generation::Nth(met_result.generation),
ValueSource::CurrentData => Generation::Last,
match ap_result {
MergerApResult::NotMet => StreamValueDescriptor::new(
value,
stream.name,
ValueSource::PreviousData,
Generation::Last,
stream.position,
),
MergerApResult::Met(met_result) => StreamValueDescriptor::new(
value,
stream.name,
met_result.value_source,
Generation::Nth(met_result.generation),
stream.position,
),
}
}
pub(super) fn try_match_trace_to_instr(merger_ap_result: &MergerApResult, instr: &Ap<'_>) -> ExecutionResult<()> {
pub(super) fn try_match_trace_to_instr(merger_ap_result: &MergerApResult, instr: &ast::Ap<'_>) -> ExecutionResult<()> {
use crate::execution_step::UncatchableError::ApResultNotCorrespondToInstr;
use ast::ApResult;

View File

@ -39,10 +39,14 @@ pub(crate) fn populate_context_from_peer_service_result<'i>(
Ok(CallResult::executed_scalar(result_value))
}
CallOutputValue::Stream(stream) => {
let generation =
exec_ctx
.streams
.add_stream_value(executed_result, Generation::Last, stream.name, stream.position)?;
let value_descriptor = StreamValueDescriptor::new(
executed_result,
stream.name,
ValueSource::PreviousData,
Generation::Last,
stream.position,
);
let generation = exec_ctx.streams.add_stream_value(value_descriptor)?;
Ok(CallResult::executed_stream(result_value, generation))
}
// by the internal conventions if call has no output value,
@ -67,11 +71,14 @@ pub(crate) fn populate_context_from_data<'i>(
}
(CallOutputValue::Stream(stream), Value::Stream { value, generation }) => {
let result = ValueAggregate::new(value.clone(), tetraplet, trace_pos);
let adjusted_generation = maybe_adjust_generation(generation, value_source);
let resulted_generation =
exec_ctx
.streams
.add_stream_value(result, adjusted_generation, stream.name, stream.position)?;
let value_descriptor = StreamValueDescriptor::new(
result,
stream.name,
value_source,
Generation::Nth(generation),
stream.position,
);
let resulted_generation = exec_ctx.streams.add_stream_value(value_descriptor)?;
let result = Value::Stream {
value,
@ -96,10 +103,3 @@ pub(crate) fn handle_remote_call<'i>(peer_pk: String, exec_ctx: &mut ExecutionCt
let new_call_result = CallResult::sent_peer_id(exec_ctx.run_parameters.current_peer_id.clone());
trace_ctx.meet_call_end(new_call_result);
}
fn maybe_adjust_generation(prev_stream_generation: u32, value_source: ValueSource) -> Generation {
match value_source {
ValueSource::PreviousData => Generation::Nth(prev_stream_generation),
ValueSource::CurrentData => Generation::Last,
}
}

View File

@ -32,7 +32,7 @@ impl<'i> super::ExecutableInstruction<'i> for New<'i> {
// any error. It's highly important to distinguish between global and restricted streams
// at the end of execution to make a correct data.
let instruction_result = self.instruction.execute(exec_ctx, trace_ctx);
let epilog_result = epilog(self, exec_ctx);
let epilog_result = epilog(self, exec_ctx, trace_ctx);
match (instruction_result, epilog_result) {
(Ok(()), Ok(())) => Ok(()),
@ -62,11 +62,13 @@ fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
exec_ctx.tracker.meet_new(position);
}
fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) -> ExecutionResult<()> {
fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
let position = new.span.left;
match &new.argument {
NewArgument::Stream(stream) => {
exec_ctx.streams.meet_scope_end(stream.name.to_string(), position);
exec_ctx
.streams
.meet_scope_end(stream.name.to_string(), position, trace_ctx)?;
Ok(())
}
NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_end_scalar(scalar.name),

View File

@ -17,9 +17,13 @@
use super::ExecutionResult;
use super::ValueAggregate;
use crate::execution_step::CatchableError;
use crate::ExecutionError;
use crate::JValue;
use crate::UncatchableError;
use air_interpreter_data::TracePos;
use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler;
use std::collections::HashMap;
use std::fmt::Formatter;
@ -34,35 +38,54 @@ pub struct Stream {
/// obtained values from a current_data that were not present in prev_data becomes a new generation.
values: Vec<Vec<ValueAggregate>>,
/// Count of values from previous data.
previous_gens_count: usize,
/// This map is intended to support canonicalized stream creation, such streams has
/// corresponding value positions in a data and this field are used to create such streams.
values_by_pos: HashMap<TracePos, StreamValueLocation>,
}
impl Stream {
pub(crate) fn from_generations_count(count: usize) -> Self {
pub(crate) fn from_generations_count(previous_count: usize, current_count: usize) -> Self {
let last_generation_count = 1;
// TODO: bubble up an overflow error instead of expect
let overall_count = previous_count
.checked_add(current_count)
.and_then(|value| value.checked_add(last_generation_count))
.expect("it shouldn't overflow");
Self {
values: vec![vec![]; count + 1],
values: vec![vec![]; overall_count],
previous_gens_count: previous_count,
values_by_pos: HashMap::new(),
}
}
// streams created with this ctor assumed to have only one generation,
// for streams that have values in
pub(crate) fn from_value(value: ValueAggregate) -> Self {
let values_by_pos = maplit::hashmap! {
value.trace_pos => StreamValueLocation::new(0, 0),
};
Self {
values: vec![vec![value]],
previous_gens_count: 0,
values_by_pos,
}
}
// if generation is None, value would be added to the last generation, otherwise it would
// be added to given generation
pub(crate) fn add_value(&mut self, value: ValueAggregate, generation: Generation) -> ExecutionResult<u32> {
let generation = match generation {
Generation::Last => self.values.len() - 1,
Generation::Nth(id) => id as usize,
pub(crate) fn add_value(
&mut self,
value: ValueAggregate,
generation: Generation,
source: ValueSource,
) -> ExecutionResult<u32> {
let generation = match (generation, source) {
(Generation::Last, _) => self.values.len() - 1,
(Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen as usize,
(Generation::Nth(current_gen), ValueSource::CurrentData) => self.previous_gens_count + current_gen as usize,
};
if generation >= self.values.len() {
@ -182,6 +205,26 @@ impl Stream {
Some(iter)
}
/// Removes empty generations updating data and returns final generation count.
pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<usize> {
self.remove_empty_generations();
for (generation, values) in self.values.iter().enumerate() {
for value in values.iter() {
trace_ctx
.update_generation(value.trace_pos, generation as u32)
.map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompatificationError(e)))?;
}
}
Ok(self.values.len())
}
/// Removes empty generations from current values.
fn remove_empty_generations(&mut self) {
self.values.retain(|values| !values.is_empty());
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
@ -276,16 +319,21 @@ mod test {
use serde_json::json;
use air_trace_handler::merger::ValueSource;
use std::rc::Rc;
#[test]
fn test_slice_iter() {
let value_1 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
let value_2 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
let mut stream = Stream::from_generations_count(2);
let mut stream = Stream::from_generations_count(2, 0);
stream.add_value(value_1, Generation::Nth(0)).unwrap();
stream.add_value(value_2, Generation::Nth(1)).unwrap();
stream
.add_value(value_1, Generation::Nth(0), ValueSource::PreviousData)
.unwrap();
stream
.add_value(value_2, Generation::Nth(1), ValueSource::PreviousData)
.unwrap();
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1)).unwrap();
assert_eq!(slice.len, 2);
@ -302,7 +350,7 @@ mod test {
#[test]
fn test_slice_on_empty_stream() {
let stream = Stream::from_generations_count(2);
let stream = Stream::from_generations_count(2, 0);
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1));
assert!(slice.is_none());
@ -316,4 +364,28 @@ mod test {
let slice = stream.slice_iter(Generation::Last, Generation::Last);
assert!(slice.is_none());
}
#[test]
fn generation_from_current_data() {
let value_1 = ValueAggregate::new(Rc::new(json!("value_1")), <_>::default(), 1.into());
let value_2 = ValueAggregate::new(Rc::new(json!("value_2")), <_>::default(), 2.into());
let mut stream = Stream::from_generations_count(5, 5);
stream
.add_value(value_1.clone(), Generation::Nth(2), ValueSource::CurrentData)
.unwrap();
stream
.add_value(value_2.clone(), Generation::Nth(4), ValueSource::PreviousData)
.unwrap();
let generations_count = stream.generations_count();
assert_eq!(generations_count, 2);
let mut iter = stream.iter(Generation::Last).unwrap();
let stream_value_1 = iter.next().unwrap();
let stream_value_2 = iter.next().unwrap();
assert_eq!(stream_value_1, &value_2);
assert_eq!(stream_value_2, &value_1);
}
}

View File

@ -19,6 +19,7 @@ use crate::ToErrorCode;
use air_interpreter_data::TracePos;
use air_interpreter_data::Value;
use air_trace_handler::merger::MergerApResult;
use air_trace_handler::GenerationCompatificationError;
use air_trace_handler::TraceHandlerError;
use strum::IntoEnumIterator;
use strum_macros::EnumDiscriminants;
@ -40,6 +41,10 @@ pub enum UncatchableError {
instruction: String,
},
/// These errors are related to internal bug in the interpreter when result trace is corrupted.
#[error(transparent)]
GenerationCompatificationError(#[from] GenerationCompatificationError),
/// Fold state wasn't found for such iterator name.
#[error("fold state not found for this iterable '{0}'")]
FoldStateNotFound(String),

View File

@ -66,9 +66,19 @@ pub(crate) struct ExecutionCtx<'i> {
}
impl<'i> ExecutionCtx<'i> {
pub(crate) fn new(prev_data: &InterpreterData, call_results: CallResults, run_parameters: RunParameters) -> Self {
pub(crate) fn new(
prev_data: &InterpreterData,
current_data: &InterpreterData,
call_results: CallResults,
run_parameters: RunParameters,
) -> Self {
let run_parameters = RcRunParameters::from_run_parameters(run_parameters);
let streams = Streams::from_data(&prev_data.global_streams, prev_data.restricted_streams.clone());
let streams = Streams::from_data(
&prev_data.global_streams,
&current_data.global_streams,
prev_data.restricted_streams.clone(),
current_data.restricted_streams.clone(),
);
Self {
run_parameters,

View File

@ -14,14 +14,21 @@
* limitations under the License.
*/
mod stream_descriptor;
mod stream_value_descriptor;
mod utils;
use crate::execution_step::ExecutionResult;
use crate::execution_step::Generation;
use crate::execution_step::Stream;
use crate::execution_step::ValueAggregate;
use crate::ExecutionError;
use stream_descriptor::*;
pub(crate) use stream_value_descriptor::StreamValueDescriptor;
use air_interpreter_data::GlobalStreamGens;
use air_interpreter_data::RestrictedStreamGens;
use air_parser::ast::Span;
use air_parser::AirPos;
use air_trace_handler::TraceHandler;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
@ -33,37 +40,33 @@ pub(crate) struct Streams {
// TODO: use shared string (Rc<String>) to avoid copying.
streams: HashMap<String, Vec<StreamDescriptor>>,
/// Contains stream generation that private stream should have at the scope start.
data_restricted_stream_gens: RestrictedStreamGens,
/// Contains stream generations from previous data that a restricted stream
/// should have at the scope start.
previous_restricted_stream_gens: RestrictedStreamGens,
/// Contains stream generations from current data that a restricted stream
/// should have at the scope start.
current_restricted_stream_gens: RestrictedStreamGens,
/// Contains stream generations that each private stream had at the scope end.
/// Then it's placed into data
collected_restricted_stream_gens: RestrictedStreamGens,
}
struct StreamDescriptor {
pub(self) span: Span,
pub(self) stream: Stream,
new_restricted_stream_gens: RestrictedStreamGens,
}
impl Streams {
pub(crate) fn from_data(
global_streams: &GlobalStreamGens,
data_restricted_stream_gens: RestrictedStreamGens,
previous_global_streams: &GlobalStreamGens,
current_global_streams: &GlobalStreamGens,
previous_restricted_stream_gens: RestrictedStreamGens,
current_restricted_stream_gens: RestrictedStreamGens,
) -> Self {
let global_streams = global_streams
.iter()
.map(|(stream_name, &generations_count)| {
let global_stream = Stream::from_generations_count(generations_count as usize);
let descriptor = StreamDescriptor::global(global_stream);
(stream_name.to_string(), vec![descriptor])
})
.collect::<HashMap<_, _>>();
let streams = utils::merge_global_streams(previous_global_streams, current_global_streams);
Self {
streams: global_streams,
data_restricted_stream_gens,
collected_restricted_stream_gens: <_>::default(),
streams,
previous_restricted_stream_gens,
current_restricted_stream_gens,
new_restricted_stream_gens: <_>::default(),
}
}
@ -79,15 +82,17 @@ impl Streams {
.and_then(|descriptors| find_closest_mut(descriptors.iter_mut(), position))
}
pub(crate) fn add_stream_value(
&mut self,
value: ValueAggregate,
generation: Generation,
stream_name: &str,
position: AirPos,
) -> ExecutionResult<u32> {
match self.get_mut(stream_name, position) {
Some(stream) => stream.add_value(value, generation),
pub(crate) fn add_stream_value(&mut self, value_descriptor: StreamValueDescriptor<'_>) -> ExecutionResult<u32> {
let StreamValueDescriptor {
value,
name,
source,
generation,
position,
} = value_descriptor;
match self.get_mut(name, position) {
Some(stream) => stream.add_value(value, generation, source),
None => {
// streams could be created in three ways:
// - after met new instruction with stream name that isn't present in streams
@ -98,7 +103,7 @@ impl Streams {
// it means that a new global one should be created.
let stream = Stream::from_value(value);
let descriptor = StreamDescriptor::global(stream);
self.streams.insert(stream_name.to_string(), vec![descriptor]);
self.streams.insert(name.to_string(), vec![descriptor]);
let generation = 0;
Ok(generation)
}
@ -107,11 +112,10 @@ impl Streams {
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: u32) {
let name = name.into();
let generations_count = self
.stream_generation_from_data(&name, span.left, iteration as usize)
.unwrap_or_default();
let (prev_gens_count, current_gens_count) =
self.stream_generation_from_data(&name, span.left, iteration as usize);
let new_stream = Stream::from_generations_count(generations_count as usize);
let new_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
let new_descriptor = StreamDescriptor::restricted(new_stream, span);
match self.streams.entry(name) {
Occupied(mut entry) => {
@ -123,7 +127,12 @@ impl Streams {
}
}
pub(crate) fn meet_scope_end(&mut self, name: String, position: AirPos) {
pub(crate) fn meet_scope_end(
&mut self,
name: String,
position: AirPos,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> {
// unwraps are safe here because met_scope_end must be called after met_scope_start
let stream_descriptors = self.streams.get_mut(&name).unwrap();
// delete a stream after exit from a scope
@ -132,39 +141,61 @@ impl Streams {
// streams should contain only non-empty stream embodiments
self.streams.remove(&name);
}
let gens_count = last_descriptor.stream.compactify(trace_ctx)?;
self.collect_stream_generation(name, position, last_descriptor.stream.generations_count() as u32);
self.collect_stream_generation(name, position, gens_count as u32);
Ok(())
}
/// This method must be called at the end of execution, because it contains logic to collect
/// all global streams depending on their presence in a streams field.
pub(crate) fn into_streams_data(self) -> (GlobalStreamGens, RestrictedStreamGens) {
pub(crate) fn into_streams_data(
self,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<(GlobalStreamGens, RestrictedStreamGens)> {
// since it's called at the end of execution, streams contains only global ones,
// because all private's been deleted after exiting a scope
let global_streams = self
.streams
.into_iter()
.map(|(name, mut descriptors)| {
.map(|(name, mut descriptors)| -> Result<_, ExecutionError> {
// unwrap is safe here because of invariant that streams contains non-empty vectors,
// moreover it must contain only one value, because this method is called at the end
// of the execution
let generation = descriptors.pop().unwrap().stream.generations_count();
(name, generation as u32)
let stream = descriptors.pop().unwrap().stream;
let gens_count = stream.compactify(trace_ctx)?;
Ok((name, gens_count as u32))
})
.collect::<GlobalStreamGens>();
.collect::<Result<GlobalStreamGens, _>>()?;
(global_streams, self.collected_restricted_stream_gens)
Ok((global_streams, self.new_restricted_stream_gens))
}
fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> Option<u32> {
self.data_restricted_stream_gens
fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> (u32, u32) {
let previous_generation =
Self::restricted_stream_generation(&self.previous_restricted_stream_gens, name, position, iteration)
.unwrap_or_default();
let current_generation =
Self::restricted_stream_generation(&self.current_restricted_stream_gens, name, position, iteration)
.unwrap_or_default();
(previous_generation, current_generation)
}
fn restricted_stream_generation(
restricted_stream_gens: &RestrictedStreamGens,
name: &str,
position: AirPos,
iteration: usize,
) -> Option<u32> {
restricted_stream_gens
.get(name)
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
.copied()
}
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: u32) {
match self.collected_restricted_stream_gens.entry(name) {
match self.new_restricted_stream_gens.entry(name) {
Occupied(mut streams) => match streams.get_mut().entry(position) {
Occupied(mut iterations) => iterations.get_mut().push(generation),
Vacant(entry) => {
@ -181,48 +212,6 @@ impl Streams {
}
}
impl StreamDescriptor {
pub(self) fn global(stream: Stream) -> Self {
Self {
span: Span::new(0.into(), usize::MAX.into()),
stream,
}
}
pub(self) fn restricted(stream: Stream, span: Span) -> Self {
Self { span, stream }
}
}
fn find_closest<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d StreamDescriptor>,
position: AirPos,
) -> Option<&'d Stream> {
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
for descriptor in descriptors.rev() {
if descriptor.span.contains_position(position) {
return Some(&descriptor.stream);
}
}
None
}
fn find_closest_mut<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d mut StreamDescriptor>,
position: AirPos,
) -> Option<&'d mut Stream> {
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
for descriptor in descriptors.rev() {
if descriptor.span.contains_position(position) {
return Some(&mut descriptor.stream);
}
}
None
}
use air_parser::ast::Span;
use std::fmt;
impl fmt::Display for Streams {
@ -235,9 +224,3 @@ impl fmt::Display for Streams {
Ok(())
}
}
impl fmt::Display for StreamDescriptor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, " <{}> - <{}>: {}", self.span.left, self.span.right, self.stream)
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::execution_step::Stream;
use air_parser::ast::Span;
use air_parser::AirPos;
use std::fmt;
pub(super) struct StreamDescriptor {
pub(super) span: Span,
pub(super) stream: Stream,
}
impl StreamDescriptor {
pub(super) fn global(stream: Stream) -> Self {
Self {
span: Span::new(0.into(), usize::MAX.into()),
stream,
}
}
pub(super) fn restricted(stream: Stream, span: Span) -> Self {
Self { span, stream }
}
}
impl fmt::Display for StreamDescriptor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, " <{}> - <{}>: {}", self.span.left, self.span.right, self.stream)
}
}
pub(super) fn find_closest<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d StreamDescriptor>,
position: AirPos,
) -> Option<&'d Stream> {
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
for descriptor in descriptors.rev() {
if descriptor.span.contains_position(position) {
return Some(&descriptor.stream);
}
}
None
}
pub(super) fn find_closest_mut<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d mut StreamDescriptor>,
position: AirPos,
) -> Option<&'d mut Stream> {
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
for descriptor in descriptors.rev() {
if descriptor.span.contains_position(position) {
return Some(&mut descriptor.stream);
}
}
None
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::execution_step::Generation;
use crate::execution_step::ValueAggregate;
use air_parser::AirPos;
use air_trace_handler::merger::ValueSource;
pub(crate) struct StreamValueDescriptor<'stream_name> {
pub value: ValueAggregate,
pub name: &'stream_name str,
pub source: ValueSource,
pub generation: Generation,
pub position: AirPos,
}
impl<'stream_name> StreamValueDescriptor<'stream_name> {
pub fn new(
value: ValueAggregate,
name: &'stream_name str,
source: ValueSource,
generation: Generation,
position: AirPos,
) -> Self {
Self {
value,
name,
source,
generation,
position,
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::StreamDescriptor;
use crate::execution_step::Stream;
use air_interpreter_data::GlobalStreamGens;
use std::collections::HashMap;
pub(super) fn merge_global_streams(
previous_global_streams: &GlobalStreamGens,
current_global_streams: &GlobalStreamGens,
) -> HashMap<String, Vec<StreamDescriptor>> {
let mut global_streams = previous_global_streams
.iter()
.map(|(stream_name, &prev_gens_count)| {
let current_gens_count = current_global_streams.get(stream_name).cloned().unwrap_or_default();
let global_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
let descriptor = StreamDescriptor::global(global_stream);
(stream_name.to_string(), vec![descriptor])
})
.collect::<HashMap<_, _>>();
for (stream_name, &current_gens_count) in current_global_streams {
if previous_global_streams.contains_key(stream_name) {
continue;
}
let global_stream = Stream::from_generations_count(0, current_gens_count as usize);
let descriptor = StreamDescriptor::global(global_stream);
global_streams.insert(stream_name.clone(), vec![descriptor]);
}
global_streams
}

View File

@ -17,6 +17,7 @@
use super::FarewellError;
use crate::execution_step::ExecutionCtx;
use crate::execution_step::TraceHandler;
use crate::ExecutionError;
use crate::InterpreterOutcome;
use crate::ToErrorCode;
use crate::INTERPRETER_SUCCESS;
@ -81,11 +82,19 @@ pub(crate) fn from_execution_error(
#[tracing::instrument(skip(exec_ctx, trace_handler), level = "info")]
fn populate_outcome_from_contexts(
exec_ctx: ExecutionCtx<'_>,
trace_handler: TraceHandler,
mut trace_handler: TraceHandler,
ret_code: i64,
error_message: String,
) -> InterpreterOutcome {
let (global_streams, restricted_streams) = exec_ctx.streams.into_streams_data();
let maybe_gens = exec_ctx
.streams
.into_streams_data(&mut trace_handler)
.map_err(execution_error_into_outcome);
let (global_streams, restricted_streams) = match maybe_gens {
Ok(gens) => gens,
Err(outcome) => return outcome,
};
let data = InterpreterData::from_execution_result(
trace_handler.into_result_trace(),
global_streams,
@ -104,13 +113,13 @@ fn populate_outcome_from_contexts(
"serde_json::to_vec(call_results)",
);
InterpreterOutcome {
ret_code,
error_message,
data,
next_peer_pks,
call_requests,
}
InterpreterOutcome::new(ret_code, error_message, data, next_peer_pks, call_requests)
}
// this method is called only if there is an internal error in the interpreter and
// new execution trace was corrupted
fn execution_error_into_outcome(error: ExecutionError) -> InterpreterOutcome {
InterpreterOutcome::new(error.to_error_code(), error.to_string(), vec![], vec![], vec![])
}
/// Deduplicate values in a supplied vector.

View File

@ -45,7 +45,7 @@ pub(crate) fn prepare<'i>(
let air: Instruction<'i> = *air_parser::parse(raw_air).map_err(PreparationError::AIRParseError)?;
let exec_ctx = make_exec_ctx(&prev_data, call_results, run_parameters)?;
let exec_ctx = make_exec_ctx(&prev_data, &current_data, call_results, run_parameters)?;
let trace_handler = TraceHandler::from_data(prev_data, current_data);
let result = PreparationDescriptor {
@ -66,12 +66,13 @@ fn try_to_data(raw_data: &[u8]) -> PreparationResult<InterpreterData> {
#[tracing::instrument(skip_all)]
fn make_exec_ctx(
prev_data: &InterpreterData,
current_data: &InterpreterData,
call_results: &[u8],
run_parameters: RunParameters,
) -> PreparationResult<ExecutionCtx<'static>> {
let call_results = serde_json::from_slice(call_results)
.map_err(|e| PreparationError::CallResultsDeFailed(e, call_results.to_vec()))?;
let ctx = ExecutionCtx::new(prev_data, call_results, run_parameters);
let ctx = ExecutionCtx::new(prev_data, current_data, call_results, run_parameters);
Ok(ctx)
}

View File

@ -290,8 +290,8 @@ fn fold_merge() {
.get("$stream_2")
.expect("$stream_2 should be present in data");
assert_eq!(*stream_1_generations, 4);
assert_eq!(*stream_2_generations, 3);
assert_eq!(*stream_1_generations, 8);
assert_eq!(*stream_2_generations, 6);
let mut fold_states_count = 0;
let mut calls_count = HashMap::new();

View File

@ -115,9 +115,9 @@ fn par_early_exit() {
executed_state::par(5, 1),
executed_state::par(3, 1),
executed_state::par(1, 1),
executed_state::stream_string("1", 0),
executed_state::stream_string("2", 0),
executed_state::stream_string("1", 0),
executed_state::stream_string("1", 1),
executed_state::stream_string("2", 2),
executed_state::stream_string("1", 1),
executed_state::stream_string("success result from fallible_call_service", 0),
executed_state::service_failed(1, "failed result from fallible_call_service"),
executed_state::stream_string("success result from fallible_call_service", 0),
@ -155,7 +155,7 @@ fn par_early_exit() {
trace_error: TraceHandlerError::MergeError(MergeError::IncorrectCallResult(CallResultError::ValuesNotEqual {
prev_value: Value::Stream {
value: rc!(json!("1")),
generation: 0,
generation: 1,
},
current_value: Value::Stream {
value: rc!(json!("non_exist_value")),
@ -228,63 +228,14 @@ fn fold_early_exit() {
);
let actual_trace = trace_from_result(&last_peer_checker_result);
let unit_call_service_result = "result from unit_call_service";
let expected_trace = vec![
executed_state::scalar_string_array(vec!["a1", "a2"]),
executed_state::scalar_string_array(vec!["b1", "b2"]),
executed_state::scalar_string_array(vec!["c1", "c2"]),
executed_state::scalar_string_array(vec!["d1", "d2"]),
executed_state::stream_string("a1", 0),
executed_state::stream_string("a2", 0),
executed_state::stream_string("b1", 0),
executed_state::stream_string("b2", 0),
executed_state::stream_string("c1", 0),
executed_state::stream_string("c2", 0),
executed_state::stream_string("d1", 0),
executed_state::stream_string("d2", 0),
executed_state::par(17, 1),
executed_state::fold(vec![executed_state::subtrace_lore(
4,
subtrace_desc(14, 15),
subtrace_desc(29, 0),
)]),
executed_state::fold(vec![executed_state::subtrace_lore(
6,
subtrace_desc(15, 14),
subtrace_desc(29, 0),
)]),
executed_state::fold(vec![
executed_state::subtrace_lore(8, subtrace_desc(16, 6), subtrace_desc(28, 0)),
executed_state::subtrace_lore(9, subtrace_desc(22, 6), subtrace_desc(28, 0)),
]),
executed_state::par(5, 6),
executed_state::fold(vec![
executed_state::subtrace_lore(10, subtrace_desc(18, 2), subtrace_desc(22, 0)),
executed_state::subtrace_lore(11, subtrace_desc(20, 2), subtrace_desc(22, 0)),
]),
executed_state::par(1, 2),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(5, 0),
executed_state::fold(vec![
executed_state::subtrace_lore(10, subtrace_desc(24, 2), subtrace_desc(28, 0)),
executed_state::subtrace_lore(11, subtrace_desc(26, 2), subtrace_desc(28, 0)),
]),
executed_state::par(1, 2),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
executed_state::service_failed(1, "failed result from fallible_call_service"),
executed_state::scalar(json!({
let expected_state = executed_state::scalar(json!({
"error_code": 10000i64,
"instruction" : r#"call "error_trigger_id" ("error" "") [] "#,
"message": r#"Local service error, ret_code is 1, error message is '"failed result from fallible_call_service"'"#,
"peer_id": "error_trigger_id"})),
executed_state::scalar_string("last_peer"),
];
"peer_id": "error_trigger_id"}));
assert_eq!(actual_trace, expected_trace);
let xor_right_subgraph_state_id = actual_trace.len() - 2;
assert_eq!(&actual_trace[xor_right_subgraph_state_id.into()], &expected_state);
}
#[test]
@ -351,44 +302,44 @@ fn fold_par_early_exit() {
executed_state::scalar_string_array(vec!["c1", "c2"]),
executed_state::scalar_string_array(vec!["d1", "d2"]),
executed_state::stream_string("a1", 0),
executed_state::stream_string("a2", 0),
executed_state::stream_string("a2", 1),
executed_state::stream_string("b1", 0),
executed_state::stream_string("b2", 0),
executed_state::stream_string("b2", 1),
executed_state::stream_string("c1", 0),
executed_state::stream_string("c2", 0),
executed_state::stream_string("c2", 1),
executed_state::stream_string("d1", 0),
executed_state::stream_string("d2", 0),
executed_state::stream_string("d2", 1),
executed_state::par(69, 1),
executed_state::fold(vec![
executed_state::subtrace_lore(4, subtrace_desc(14, 34), subtrace_desc(82, 0)),
executed_state::subtrace_lore(4, subtrace_desc(14, 34), subtrace_desc(48, 0)),
executed_state::subtrace_lore(5, subtrace_desc(48, 34), subtrace_desc(82, 0)),
]),
executed_state::par(33, 34),
executed_state::par(33, 0),
executed_state::fold(vec![
executed_state::subtrace_lore(6, subtrace_desc(16, 16), subtrace_desc(48, 0)),
executed_state::subtrace_lore(6, subtrace_desc(16, 16), subtrace_desc(32, 0)),
executed_state::subtrace_lore(7, subtrace_desc(32, 16), subtrace_desc(48, 0)),
]),
executed_state::par(15, 16),
executed_state::par(15, 0),
executed_state::par(13, 1),
executed_state::fold(vec![
executed_state::subtrace_lore(8, subtrace_desc(19, 6), subtrace_desc(31, 0)),
executed_state::subtrace_lore(8, subtrace_desc(19, 6), subtrace_desc(25, 0)),
executed_state::subtrace_lore(9, subtrace_desc(25, 6), subtrace_desc(31, 0)),
]),
executed_state::par(5, 6),
executed_state::par(5, 0),
executed_state::fold(vec![
executed_state::subtrace_lore(10, subtrace_desc(21, 2), subtrace_desc(25, 0)),
executed_state::subtrace_lore(10, subtrace_desc(21, 2), subtrace_desc(23, 0)),
executed_state::subtrace_lore(11, subtrace_desc(23, 2), subtrace_desc(25, 0)),
]),
executed_state::par(1, 2),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(5, 0),
executed_state::fold(vec![
executed_state::subtrace_lore(10, subtrace_desc(27, 2), subtrace_desc(31, 0)),
executed_state::subtrace_lore(10, subtrace_desc(27, 2), subtrace_desc(29, 0)),
executed_state::subtrace_lore(11, subtrace_desc(29, 2), subtrace_desc(31, 0)),
]),
executed_state::par(1, 2),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
@ -396,7 +347,7 @@ fn fold_par_early_exit() {
executed_state::par(15, 0),
executed_state::par(13, 1),
executed_state::fold(vec![
executed_state::subtrace_lore(8, subtrace_desc(35, 6), subtrace_desc(47, 0)),
executed_state::subtrace_lore(8, subtrace_desc(35, 6), subtrace_desc(41, 0)),
executed_state::subtrace_lore(9, subtrace_desc(41, 6), subtrace_desc(47, 0)),
]),
];

View File

@ -120,7 +120,7 @@ fn canon_fixes_stream_correct() {
executed_state::stream_number(1, 0),
executed_state::par(1, 1),
executed_state::stream_number(2, 1),
executed_state::stream_number(3, 1),
executed_state::stream_number(3, 2),
executed_state::scalar_number(4),
executed_state::canon(vec![3.into(), 4.into()]),
executed_state::par(1, 1),

View File

@ -55,7 +55,7 @@ fn issue_302() {
executed_state::par(1, 3),
executed_state::stream_number(2, 1),
executed_state::stream_number(1, 0),
executed_state::stream_number(0, 1),
executed_state::stream_number(0, 2),
executed_state::scalar(json!([1, 2, 0])),
];
assert_eq!(actual_trace.deref(), expected_trace);

View File

@ -0,0 +1,66 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air_test_framework::TestExecutor;
use air_test_utils::prelude::*;
#[test]
fn issue_356() {
let script = r#"
(seq
(seq
(call "relay" ("kad" "neighborhood") ["relay"] neighs_top) ; ok = ["p1"]
(seq
(call "p1" ("kad" "neighborhood") ["p1"] neighs_inner) ; ok =["p1"]
(par
(call "relay" ("peer" "identify") ["relay"] $external_addresses) ; behaviour = echo
(call "p1" ("peer" "identify") ["p1"] $external_addresses) ; behaviour = echo
)
)
)
(seq
(new $monotonic_stream
(fold $external_addresses elem
(seq
(ap "asd" $monotonic_stream)
(seq
(canon "relay" $monotonic_stream #result)
(null)
)
)
)
)
(call "client" ("return" "") [$external_addresses neighs_inner] x) ; ok = null
)
)
"#;
let engine = TestExecutor::new(
TestRunParameters::from_init_peer_id("client"),
vec![],
vec!["p1", "p2", "p3"].into_iter().map(Into::into),
&script,
)
.unwrap();
for _ in 0..7 {
for peer in ["client", "relay", "p1", "p2"] {
for outcome in engine.execution_iter(peer).unwrap() {
assert_eq!(outcome.ret_code, 0, "{:?}", outcome);
}
}
}
}

View File

@ -34,3 +34,4 @@ mod issue_306;
mod issue_331;
mod issue_346;
mod issue_348;
mod issue_356;

View File

@ -14,6 +14,8 @@
* limitations under the License.
*/
extern crate core;
// https://matklad.github.io/2021/02/27/delete-cargo-integration-tests.html
mod features;
mod instructions;

View File

@ -16,7 +16,7 @@ name = "avm_interface"
path = "src/lib.rs"
[dependencies]
air-interpreter-interface = { version = "0.11.1", path = "../../crates/air-lib/interpreter-interface", default-features = false }
air-interpreter-interface = { version = "0.11.2", path = "../../crates/air-lib/interpreter-interface", default-features = false }
air-utils = { version = "0.1.0", path = "../../crates/air-lib/utils" }
avm-data-store = { version = "0.4.1", path = "../../crates/data-store" }
polyplets = { version = "0.3.2", path = "../../crates/air-lib/polyplets" }

View File

@ -16,7 +16,7 @@ name = "avm_server"
path = "src/lib.rs"
[dependencies]
air-interpreter-interface = { version = "0.11.1", path = "../../crates/air-lib/interpreter-interface" }
air-interpreter-interface = { version = "0.11.2", path = "../../crates/air-lib/interpreter-interface" }
air-utils = { version = "0.1.0", path = "../../crates/air-lib/utils" }
avm-data-store = { version = "0.4.1", path = "../../crates/data-store" }
marine-runtime = "0.23.0"

View File

@ -31,6 +31,10 @@ impl ExecutionTrace {
self.0.get(usize::from(index))
}
pub fn get_mut(&mut self, index: TracePos) -> Option<&mut ExecutedState> {
self.0.get_mut(usize::from(index))
}
pub fn pop(&mut self) -> Option<ExecutedState> {
self.0.pop()
}

View File

@ -1,7 +1,7 @@
[package]
name = "air-interpreter-interface"
description = "Interface of the AIR interpreter"
version = "0.11.1"
version = "0.11.2"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"

View File

@ -45,6 +45,24 @@ pub struct InterpreterOutcome {
pub call_requests: Vec<u8>,
}
impl InterpreterOutcome {
pub fn new(
ret_code: i64,
error_message: String,
data: Vec<u8>,
next_peer_pks: Vec<String>,
call_requests: Vec<u8>,
) -> Self {
Self {
ret_code,
error_message,
data,
next_peer_pks,
call_requests,
}
}
}
#[cfg(feature = "marine")]
impl InterpreterOutcome {
pub fn from_ivalue(ivalue: IValue) -> Result<Self, String> {

View File

@ -132,5 +132,12 @@ pub fn is_interpreter_succeded(result: &RawAVMOutcome) -> bool {
}
pub fn check_error(result: &RawAVMOutcome, error: impl ToErrorCode + ToString) -> bool {
println!(
"{} == {} && {} == {}",
result.ret_code,
error.to_error_code(),
result.error_message,
error.to_string()
);
result.ret_code == error.to_error_code() && result.error_message == error.to_string()
}

View File

@ -18,6 +18,8 @@ use super::data_keeper::KeeperError;
use super::merger::MergeError;
use super::state_automata::StateFSMError;
use air_interpreter_data::ExecutedState;
use air_interpreter_data::TracePos;
use thiserror::Error as ThisError;
/// Errors arose out of merging previous data with a new.
@ -33,3 +35,25 @@ pub enum TraceHandlerError {
#[error(transparent)]
StateFSMError(#[from] StateFSMError),
}
#[derive(ThisError, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum GenerationCompatificationError {
#[error("trying to change generation of an invalid trace position {0}")]
TracePosPointsToNowhere(TracePos),
#[error(
"trying to change generation of a state {state} on {position} position, the state doesn't contain generation"
)]
TracePosPointsToInvalidState { position: TracePos, state: ExecutedState },
}
impl GenerationCompatificationError {
pub fn points_to_nowhere(position: TracePos) -> Self {
GenerationCompatificationError::TracePosPointsToNowhere(position)
}
pub fn points_to_invalid_state(position: TracePos, state: ExecutedState) -> Self {
GenerationCompatificationError::TracePosPointsToInvalidState { position, state }
}
}

View File

@ -55,6 +55,34 @@ impl TraceHandler {
(prev_len, current_len)
}
pub fn update_generation(
&mut self,
trace_pos: TracePos,
generation: u32,
) -> Result<(), GenerationCompatificationError> {
let state = self
.data_keeper
.result_trace
.get_mut(trace_pos)
.ok_or_else(|| GenerationCompatificationError::points_to_nowhere(trace_pos))?;
match state {
ExecutedState::Ap(ap_result) => ap_result.res_generations = vec![generation],
ExecutedState::Call(CallResult::Executed(Value::Stream {
generation: call_generation,
..
})) => *call_generation = generation,
state => {
return Err(GenerationCompatificationError::points_to_invalid_state(
trace_pos,
state.clone(),
))
}
}
Ok(())
}
}
impl TraceHandler {

View File

@ -32,6 +32,7 @@ mod handler;
pub mod merger;
mod state_automata;
pub use errors::GenerationCompatificationError;
pub use errors::TraceHandlerError;
pub use handler::TraceHandler;
pub use state_automata::SubgraphType;