feat(execution-engine)!: make fold convergent wrt errors (#351)

fold over a stream was not convergent when errors are produced inside a fold body. After iteration, it produces if there were several errors, only the last one is bubbled up. But on the same peer, there could different last (and first) errors, and it'll become non-convergent and moreover non-deterministic.

After this PR fold won't bubble any errors to make execution convergent and deterministic. To obtain errors from a fold body one should wrap this body into xor and push errors into some stream. Then fold over this stream and handle errors.
This commit is contained in:
Mike Voronov 2023-02-09 17:09:24 +03:00 committed by GitHub
parent 8f5217e3d6
commit 87f7e2f361
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 204 additions and 172 deletions

157
Cargo.lock generated
View File

@ -466,9 +466,9 @@ dependencies = [
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.11.1" version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535"
[[package]] [[package]]
name = "bytecount" name = "bytecount"
@ -499,9 +499,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.78" version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a20104e2335ce8a659d6dd92a51a767a0c062599c73b343fd152cb401e828c3d" checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
@ -602,7 +602,7 @@ checksum = "f13b9c79b5d1dd500d20ef541215a6423c75829ef43117e1b4d17fd8af0b5d76"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"clap_derive", "clap_derive",
"clap_lex 0.3.0", "clap_lex 0.3.1",
"is-terminal", "is-terminal",
"once_cell", "once_cell",
"strsim", "strsim",
@ -615,7 +615,7 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "684a277d672e91966334af371f1a7b5833f9aa00b07c84e92fbce95e00208ce8" checksum = "684a277d672e91966334af371f1a7b5833f9aa00b07c84e92fbce95e00208ce8"
dependencies = [ dependencies = [
"heck 0.4.0", "heck 0.4.1",
"proc-macro-error", "proc-macro-error",
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -633,9 +633,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_lex" name = "clap_lex"
version = "0.3.0" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" checksum = "783fe232adfca04f90f56201b26d79682d4cd2625e0bc7290b95123afe558ade"
dependencies = [ dependencies = [
"os_str_bytes", "os_str_bytes",
] ]
@ -943,9 +943,9 @@ dependencies = [
[[package]] [[package]]
name = "cxx" name = "cxx"
version = "1.0.85" version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5add3fc1717409d029b20c5b6903fc0c0b02fa6741d820054f4a2efa5e5816fd" checksum = "bc831ee6a32dd495436e317595e639a587aa9907bef96fe6e6abc290ab6204e9"
dependencies = [ dependencies = [
"cc", "cc",
"cxxbridge-flags", "cxxbridge-flags",
@ -955,9 +955,9 @@ dependencies = [
[[package]] [[package]]
name = "cxx-build" name = "cxx-build"
version = "1.0.85" version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c87959ba14bc6fbc61df77c3fcfe180fc32b93538c4f1031dd802ccb5f2ff0" checksum = "94331d54f1b1a8895cd81049f7eaaaef9d05a7dcb4d1fd08bf3ff0806246789d"
dependencies = [ dependencies = [
"cc", "cc",
"codespan-reporting", "codespan-reporting",
@ -970,15 +970,15 @@ dependencies = [
[[package]] [[package]]
name = "cxxbridge-flags" name = "cxxbridge-flags"
version = "1.0.85" version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69a3e162fde4e594ed2b07d0f83c6c67b745e7f28ce58c6df5e6b6bef99dfb59" checksum = "48dcd35ba14ca9b40d6e4b4b39961f23d835dbb8eed74565ded361d93e1feb8a"
[[package]] [[package]]
name = "cxxbridge-macro" name = "cxxbridge-macro"
version = "1.0.85" version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e7e2adeb6a0d4a282e581096b06e1791532b7d576dcde5ccd9382acf55db8e6" checksum = "81bbeb29798b407ccd82a3324ade1a7286e0d29851475990b612670f6f5124d2"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -987,9 +987,9 @@ dependencies = [
[[package]] [[package]]
name = "darling" name = "darling"
version = "0.14.2" version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0dd3cd20dc6b5a876612a6e5accfe7f3dd883db6d07acfbf14c128f61550dfa" checksum = "c0808e1bd8671fb44a113a14e13497557533369847788fa2ae912b6ebfce9fa8"
dependencies = [ dependencies = [
"darling_core", "darling_core",
"darling_macro", "darling_macro",
@ -997,9 +997,9 @@ dependencies = [
[[package]] [[package]]
name = "darling_core" name = "darling_core"
version = "0.14.2" version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a784d2ccaf7c98501746bf0be29b2022ba41fd62a2e622af997a03e9f972859f" checksum = "001d80444f28e193f30c2f293455da62dcf9a6b29918a4253152ae2b1de592cb"
dependencies = [ dependencies = [
"fnv", "fnv",
"ident_case", "ident_case",
@ -1011,9 +1011,9 @@ dependencies = [
[[package]] [[package]]
name = "darling_macro" name = "darling_macro"
version = "0.14.2" version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7618812407e9402654622dd402b0a89dff9ba93badd6540781526117b92aab7e" checksum = "b36230598a2d5de7ec1c6f51f72d8a99a9208daff41de2084d06e3fd3ea56685"
dependencies = [ dependencies = [
"darling_core", "darling_core",
"quote", "quote",
@ -1109,9 +1109,9 @@ dependencies = [
[[package]] [[package]]
name = "either" name = "either"
version = "1.8.0" version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
[[package]] [[package]]
name = "ena" name = "ena"
@ -1334,9 +1334,9 @@ dependencies = [
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.4.0" version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
@ -1356,6 +1356,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]] [[package]]
name = "hex" name = "hex"
version = "0.4.3" version = "0.4.3"
@ -1445,9 +1451,9 @@ dependencies = [
[[package]] [[package]]
name = "io-lifetimes" name = "io-lifetimes"
version = "1.0.3" version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys", "windows-sys",
@ -1455,11 +1461,11 @@ dependencies = [
[[package]] [[package]]
name = "is-terminal" name = "is-terminal"
version = "0.4.2" version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189" checksum = "22e18b0a45d56fe973d6db23972bf5bc46f988a4a2385deac9cc29572f09daef"
dependencies = [ dependencies = [
"hermit-abi 0.2.6", "hermit-abi 0.3.1",
"io-lifetimes", "io-lifetimes",
"rustix", "rustix",
"windows-sys", "windows-sys",
@ -2122,7 +2128,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [ dependencies = [
"lock_api 0.4.9", "lock_api 0.4.9",
"parking_lot_core 0.9.5", "parking_lot_core 0.9.7",
] ]
[[package]] [[package]]
@ -2155,9 +2161,9 @@ dependencies = [
[[package]] [[package]]
name = "parking_lot_core" name = "parking_lot_core"
version = "0.9.5" version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"libc", "libc",
@ -2174,9 +2180,9 @@ checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba"
[[package]] [[package]]
name = "petgraph" name = "petgraph"
version = "0.6.2" version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4"
dependencies = [ dependencies = [
"fixedbitset", "fixedbitset",
"indexmap", "indexmap",
@ -2312,9 +2318,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.49" version = "1.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@ -2368,9 +2374,9 @@ dependencies = [
[[package]] [[package]]
name = "rayon-core" name = "rayon-core"
version = "1.10.1" version = "1.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cac410af5d00ab6884528b4ab69d1e8e146e8d471201800fa1b4524126de6ad3" checksum = "356a0625f1954f730c0201cdab48611198dc6ce21f4acff55089b5a78e6e835b"
dependencies = [ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"crossbeam-deque", "crossbeam-deque",
@ -2441,9 +2447,9 @@ dependencies = [
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.36.5" version = "0.36.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3807b5d10909833d3e9acd1eb5fb988f79376ff10fce42937de71a449c4c588" checksum = "f43abb88211988493c1abb44a70efa56ff0ce98f233b7b276146f1f3f7ba9644"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"errno", "errno",
@ -2567,9 +2573,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.92" version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7434af0dc1cbd59268aa98b4c22c131c0584d2232f6fb166efb993e2832e896a" checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76"
dependencies = [ dependencies = [
"itoa 1.0.5", "itoa 1.0.5",
"ryu", "ryu",
@ -2578,9 +2584,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_with" name = "serde_with"
version = "2.1.0" version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25bf4a5a814902cd1014dbccfa4d4560fb8432c779471e96e035602519f82eef" checksum = "30d904179146de381af4c93d3af6ca4984b3152db687dacb9c3c35e86f39809c"
dependencies = [ dependencies = [
"base64", "base64",
"chrono", "chrono",
@ -2594,9 +2600,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_with_macros" name = "serde_with_macros"
version = "2.1.0" version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3452b4c0f6c1e357f73fdb87cd1efabaa12acf328c7a528e252893baeb3f4aa" checksum = "a1966009f3c05f095697c537312f5415d1e3ed31ce0a56942bac4c771c5c335e"
dependencies = [ dependencies = [
"darling", "darling",
"proc-macro2", "proc-macro2",
@ -2676,7 +2682,7 @@ version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
dependencies = [ dependencies = [
"heck 0.4.0", "heck 0.4.1",
"proc-macro2", "proc-macro2",
"quote", "quote",
"rustversion", "rustversion",
@ -2731,9 +2737,9 @@ dependencies = [
[[package]] [[package]]
name = "termcolor" name = "termcolor"
version = "1.1.3" version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
dependencies = [ dependencies = [
"winapi-util", "winapi-util",
] ]
@ -2841,9 +2847,9 @@ dependencies = [
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.5.10" version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1333c76748e868a4d9d1017b5ab53171dfd095f70c712fdb4653a406547f598f" checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234"
dependencies = [ dependencies = [
"serde", "serde",
] ]
@ -2949,9 +2955,9 @@ checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
[[package]] [[package]]
name = "unicode-segmentation" name = "unicode-segmentation"
version = "1.10.0" version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a" checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36"
[[package]] [[package]]
name = "unicode-width" name = "unicode-width"
@ -3351,9 +3357,18 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.42.0" version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
dependencies = [ dependencies = [
"windows_aarch64_gnullvm", "windows_aarch64_gnullvm",
"windows_aarch64_msvc", "windows_aarch64_msvc",
@ -3366,45 +3381,45 @@ dependencies = [
[[package]] [[package]]
name = "windows_aarch64_gnullvm" name = "windows_aarch64_gnullvm"
version = "0.42.0" version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.42.0" version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.42.0" version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.42.0" version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.42.0" version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
version = "0.42.0" version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.42.0" version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
[[package]] [[package]]
name = "yansi" name = "yansi"

View File

@ -70,7 +70,7 @@ impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
observer.update_completeness(exec_ctx); observer.update_completeness(exec_ctx);
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), self)?; trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), self)?;
observer.into_result() Ok(())
} }
} }
@ -106,8 +106,10 @@ fn execute_iterations<'i>(
exec_ctx, exec_ctx,
trace_ctx, trace_ctx,
); );
throw_error_if_not_catchable(result)?;
trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id), fold_stream)?; trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id), fold_stream)?;
generation_observer.observe_generation_results(exec_ctx.is_subgraph_complete(), result);
generation_observer.observe_completeness(exec_ctx.is_subgraph_complete());
} }
Ok(()) Ok(())
@ -128,3 +130,13 @@ fn remove_new_generation_if_non_empty<'ctx>(
stream.remove_last_generation_if_empty(); stream.remove_last_generation_if_empty();
stream stream
} }
/// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be
/// not deterministic.
fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> {
match result {
Ok(_) => Ok(()),
Err(error) if error.is_catchable() => Ok(()),
error @ Err(_) => error,
}
}

View File

@ -15,34 +15,23 @@
*/ */
use super::ExecutionCtx; use super::ExecutionCtx;
use super::ExecutionResult;
pub(super) struct FoldGenerationObserver { pub(super) struct FoldGenerationObserver {
subtree_complete: bool, subgraph_complete: bool,
// keeps either Ok or the last met error
result: ExecutionResult<()>,
} }
impl FoldGenerationObserver { impl FoldGenerationObserver {
pub(super) fn new() -> Self { pub(super) fn new() -> Self {
Self { Self {
subtree_complete: false, subgraph_complete: false,
result: Ok(()),
} }
} }
pub(super) fn observe_generation_results(&mut self, completeness: bool, result: ExecutionResult<()>) { pub(super) fn observe_completeness(&mut self, completeness: bool) {
self.subtree_complete |= completeness; self.subgraph_complete |= completeness;
if result.is_err() {
self.result = result;
}
} }
pub(super) fn update_completeness(&self, exec_ctx: &mut ExecutionCtx<'_>) { pub(super) fn update_completeness(self, exec_ctx: &mut ExecutionCtx<'_>) {
exec_ctx.set_subgraph_completeness(self.subtree_complete); exec_ctx.set_subgraph_completeness(self.subgraph_complete);
}
pub(super) fn into_result(self) -> ExecutionResult<()> {
self.result
} }
} }

View File

@ -263,10 +263,8 @@ fn recursive_stream_error_handling() {
let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1); let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1);
let vm_peer_id_2 = "vm_peer_id_2";
let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2);
let result_value = "result_value"; let result_value = "result_value";
let vm_peer_id_2 = "vm_peer_id_2";
let script = f!(r#" let script = f!(r#"
(xor (xor
(seq (seq
@ -281,17 +279,38 @@ fn recursive_stream_error_handling() {
(null)) (null))
(seq (seq
(ap value $stream) (ap value $stream)
(next iterator)))))) (next iterator)
)
)
)
)
)
(call "{vm_peer_id_2}" ("" "") ["{result_value}"])) (call "{vm_peer_id_2}" ("" "") ["{result_value}"]))
"#); "#);
let result = checked_call_vm!(vm_1, <_>::default(), &script, "", ""); let result = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
let result = checked_call_vm!(vm_2, <_>::default(), &script, "", result.data);
let actual_trace = trace_from_result(&result); let actual_trace = trace_from_result(&result);
let actual_last_state = &actual_trace[11.into()]; let expected_trace = vec![
let expected_last_state = executed_state::scalar_string(result_value); executed_state::stream_string("non_stop", 0),
executed_state::stream_string("non_stop", 1),
executed_state::fold(vec![
subtrace_lore(0, SubTraceDesc::new(3.into(), 2), SubTraceDesc::new(5.into(), 0)),
subtrace_lore(1, SubTraceDesc::new(5.into(), 2), SubTraceDesc::new(7.into(), 0)),
subtrace_lore(4, SubTraceDesc::new(7.into(), 2), SubTraceDesc::new(10.into(), 0)),
subtrace_lore(6, SubTraceDesc::new(9.into(), 1), SubTraceDesc::new(10.into(), 0)),
subtrace_lore(8, SubTraceDesc::new(10.into(), 1), SubTraceDesc::new(11.into(), 0)),
]),
executed_state::scalar_string("non_stop"),
executed_state::ap(2),
executed_state::scalar_string("non_stop"),
executed_state::ap(2),
executed_state::scalar_string("non_stop"),
executed_state::ap(3),
executed_state::service_failed(1, "error"),
executed_state::service_failed(1, "error"),
];
assert_eq!(actual_last_state, &expected_last_state); assert_eq!(actual_trace, expected_trace);
} }
#[test] #[test]

View File

@ -3,45 +3,64 @@
(seq (seq
(seq (seq
(seq (seq
(call "{0}" ("" "") ["stream_1"] stream_1_ingredients) (ap "a1" $stream_1)
(call "{0}" ("" "") ["stream_2"] stream_2_ingredients)) (ap "a2" $stream_1)
(call "{0}" ("" "") ["stream_3"] stream_3_ingredients)) )
(call "{0}" ("" "") ["stream_4"] stream_4_ingredients)) (seq
(seq (ap "b1" $stream_2)
(ap "b2" $stream_2)
)
)
(seq (seq
(seq (ap "c1" $stream_3)
(fold stream_1_ingredients v1 (ap "c2" $stream_3)
(seq )
(call "{1}" ("" "") [v1] $stream_1) )
(next v1))) (seq
(fold stream_2_ingredients v2 (ap "d1" $stream_4)
(seq (ap "d2" $stream_4)
(call "{1}" ("" "") [v2] $stream_2) )
(next v2)))) )
(fold stream_3_ingredients v3
(seq
(call "{1}" ("" "") [v3] $stream_3)
(next v3))))
(fold stream_4_ingredients v4
(seq
(call "{1}" ("" "") [v4] $stream_4)
(next v4)))))
(par (par
(xor (par
(fold $stream_1 v1 (xor
(seq (fold $stream_1 v1
(fold $stream_2 v2 (seq
(seq (fold $stream_2 v2
(seq (xor
(fold $stream_3 v3 (seq
(par (seq
(fold $stream_4 v4 (fold $stream_3 v3
(par (par
(call "{2}" ("" "") []) (fold $stream_4 v4
(next v4))) (par
(next v3))) (call "{fold_executor_id}" ("" "") [])
(call "{3}" ("error" "") [])) ; will trigger an error (next v4)
(next v2))) )
(next v1))) )
(call "{4}" ("" "") [%last_error%])) (next v3)))
(call "{5}" ("" "") ["last_peer"]))) (call "{error_trigger_id}" ("error" "") []) ; will trigger an error
)
(next v2)
)
(ap %last_error% $error_stream)
)
)
(next v1)
)
)
(call "{last_error_receiver_id}" ("" "") [%last_error%]) ; shouldn't be called
)
(seq
(call "{last_peer_checker_id}" ("" "") ["last_peer 2"])
(fold $error_stream error
(seq
(call "{last_peer_checker_id}" ("" "") [error])
(next error)
)
)
)
)
(call "{last_peer_checker_id}" ("" "") ["last_peer"])
)
)

View File

@ -179,58 +179,33 @@ fn par_early_exit() {
#[test] #[test]
fn fold_early_exit() { fn fold_early_exit() {
let variables_setter_id = "set_variable_id";
let stream_setter_id = "stream_setter_id";
let fold_executor_id = "fold_executor_id"; let fold_executor_id = "fold_executor_id";
let error_trigger_id = "error_trigger_id"; let error_trigger_id = "error_trigger_id";
let last_error_receiver_id = "last_error_receiver_id"; let last_error_receiver_id = "last_error_receiver_id";
let last_peer_checker_id = "last_peer_checker_id"; let last_peer_checker_id = "last_peer_checker_id";
let variables = maplit::hashmap!(
"stream_1".to_string() => json!(["a1", "a2"]),
"stream_2".to_string() => json!(["b1", "b2"]),
"stream_3".to_string() => json!(["c1", "c2"]),
"stream_4".to_string() => json!(["d1", "d2"]),
);
let mut variables_setter = create_avm(
set_variables_call_service(variables, VariableOptionSource::Argument(0)),
variables_setter_id,
);
let mut stream_setter = create_avm(echo_call_service(), stream_setter_id);
let mut fold_executor = create_avm(unit_call_service(), fold_executor_id); let mut fold_executor = create_avm(unit_call_service(), fold_executor_id);
let mut error_trigger = create_avm(fallible_call_service("error"), error_trigger_id); let mut error_trigger = create_avm(fallible_call_service("error"), error_trigger_id);
let mut last_error_receiver = create_avm(echo_call_service(), last_error_receiver_id);
let mut last_peer_checker = create_avm(echo_call_service(), last_peer_checker_id); let mut last_peer_checker = create_avm(echo_call_service(), last_peer_checker_id);
let script = format!( let script = format!(
include_str!("scripts/fold_early_exit.air"), include_str!("scripts/fold_early_exit.air"),
variables_setter_id, fold_executor_id = fold_executor_id,
stream_setter_id, error_trigger_id = error_trigger_id,
fold_executor_id, last_error_receiver_id = last_error_receiver_id,
error_trigger_id, last_peer_checker_id = last_peer_checker_id
last_error_receiver_id,
last_peer_checker_id
); );
let variables_setter_result = checked_call_vm!(variables_setter, <_>::default(), &script, "", ""); let fold_executor_result = checked_call_vm!(fold_executor, <_>::default(), &script, "", "");
let stream_setter_result = let error_trigger_result = checked_call_vm!(error_trigger, <_>::default(), &script, "", fold_executor_result.data);
checked_call_vm!(stream_setter, <_>::default(), &script, "", variables_setter_result.data); let fold_executor_result = checked_call_vm!(fold_executor, <_>::default(), &script, "", error_trigger_result.data);
let fold_executor_result = checked_call_vm!(fold_executor, <_>::default(), &script, "", stream_setter_result.data);
let error_trigger_result = checked_call_vm!(error_trigger, <_>::default(), &script, "", fold_executor_result.data); let error_trigger_result = checked_call_vm!(error_trigger, <_>::default(), &script, "", fold_executor_result.data);
let last_error_receiver_result = checked_call_vm!(
last_error_receiver,
<_>::default(),
&script,
"",
error_trigger_result.data
);
let last_peer_checker_result = checked_call_vm!( let last_peer_checker_result = checked_call_vm!(
last_peer_checker, last_peer_checker,
<_>::default(), <_>::default(),
&script, &script,
"", "",
last_error_receiver_result.data error_trigger_result.data
); );
let actual_trace = trace_from_result(&last_peer_checker_result); let actual_trace = trace_from_result(&last_peer_checker_result);
@ -240,8 +215,11 @@ fn fold_early_exit() {
"message": r#"Local service error, ret_code is 1, error message is '"failed result from fallible_call_service"'"#, "message": r#"Local service error, ret_code is 1, error message is '"failed result from fallible_call_service"'"#,
"peer_id": "error_trigger_id"})); "peer_id": "error_trigger_id"}));
let xor_right_subgraph_state_id = actual_trace.len() - 2; let bubbled_error_from_stream_1 = actual_trace.len() - 3;
assert_eq!(&actual_trace[xor_right_subgraph_state_id.into()], &expected_state); assert_eq!(&actual_trace[bubbled_error_from_stream_1.into()], &expected_state);
let bubbled_error_from_stream_2 = actual_trace.len() - 2;
assert_eq!(&actual_trace[bubbled_error_from_stream_2.into()], &expected_state);
} }
#[test] #[test]