feat(data)!: values are binary blobs (#775)

* Store deserialize values as raw JSON values

Values are parsed on-demand.
---------

Co-authored-by: Mike Voronov <michail.vms@gmail.com>
This commit is contained in:
Ivan Boldyrev 2023-12-26 15:42:40 +04:00 committed by GitHub
parent b331fa4351
commit f1c7b43a1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 210 additions and 38 deletions

View File

@ -24,6 +24,7 @@ use air_interpreter_data::CanonCidAggregate;
use air_interpreter_data::CanonResultCidAggregate;
use air_interpreter_data::CidInfo;
use air_interpreter_data::CidTracker;
use air_interpreter_data::RawValue;
use air_interpreter_data::ServiceResultCidAggregate;
use air_interpreter_data::TracePos;
use polyplets::SecurityTetraplet;
@ -32,7 +33,7 @@ use std::rc::Rc;
#[derive(Debug, Default, Clone)]
pub struct ExecutionCidState {
pub value_tracker: CidTracker<JValue>,
pub value_tracker: CidTracker<RawValue>,
pub tetraplet_tracker: CidTracker<SecurityTetraplet>,
pub canon_element_tracker: CidTracker<CanonCidAggregate>,
pub canon_result_tracker: CidTracker<CanonResultCidAggregate>,
@ -72,7 +73,8 @@ impl ExecutionCidState {
tetraplet: RcSecurityTetraplet,
argument_hash: Rc<str>,
) -> Result<CID<ServiceResultCidAggregate>, UncatchableError> {
let value_cid = self.value_tracker.track_value(value)?;
let vm_value = RawValue::from_value(value);
let value_cid = self.value_tracker.track_raw_value(vm_value);
let tetraplet_cid = self.tetraplet_tracker.track_value(tetraplet)?;
let service_result_agg = ServiceResultCidAggregate::new(value_cid, argument_hash, tetraplet_cid);
@ -85,7 +87,8 @@ impl ExecutionCidState {
&mut self,
canon_value: &ValueAggregate,
) -> Result<CID<CanonCidAggregate>, UncatchableError> {
let value_cid = self.value_tracker.track_value(canon_value.get_result().clone())?;
let vm_value = RawValue::from_value(canon_value.get_result().clone());
let value_cid = self.value_tracker.track_raw_value(vm_value);
let tetraplet = self.tetraplet_tracker.track_value(canon_value.get_tetraplet())?;
let canon_value_aggregate = CanonCidAggregate::new(value_cid, tetraplet, canon_value.get_provenance());
@ -94,10 +97,11 @@ impl ExecutionCidState {
.map_err(UncatchableError::from)
}
pub(crate) fn get_value_by_cid(&self, cid: &CID<JValue>) -> Result<Rc<JValue>, UncatchableError> {
pub(crate) fn get_value_by_cid(&self, cid: &CID<RawValue>) -> Result<Rc<JValue>, UncatchableError> {
self.value_tracker
.get(cid)
.ok_or_else(|| UncatchableError::ValueForCidNotFound("value", cid.get_inner()))
.map(|vm_value| vm_value.get_value())
}
pub(crate) fn get_tetraplet_by_cid(

View File

@ -329,7 +329,12 @@ fn fold_merge() {
};
let service_result_agg = data.cid_info.service_result_store.get(cid).unwrap();
let value = data.cid_info.value_store.get(&service_result_agg.value_cid).unwrap();
let value = data
.cid_info
.value_store
.get(&service_result_agg.value_cid)
.unwrap()
.get_value();
if let JValue::String(ref var_name) = &*value {
let current_count: usize = calls_count.get(var_name).copied().unwrap_or_default();

View File

@ -58,8 +58,8 @@ fn test_attack_replace_value() {
let mut mallory_cid_info = serde_json::to_value::<CidInfo>(mallory_cid_state.into()).unwrap();
let mut cnt = 0;
for (_cid, val) in mallory_cid_info["value_store"].as_object_mut().unwrap().iter_mut() {
if *val == "alice" {
*val = "evil".into();
if val.as_str().unwrap() == json!("alice").to_string() {
*val = json!("evil").to_string().into();
cnt += 1;
}
}
@ -95,7 +95,8 @@ fn test_attack_replace_value() {
&res,
PreparationError::CidStoreVerificationError(
CidVerificationError::ValueMismatch {
type_name: "serde_json::value::Value",
// fragile: it is OK if this exact string changes on compiler upgrade
type_name: "air_interpreter_data::raw_value::RawValue",
cid_repr: "bagaaihrayhxgqijfajraxivb7hxwshhbsdqk4j5zyqypb54zggmn5v7mmwxq".into(),
}
.into()

View File

@ -17,6 +17,7 @@
use air::interpreter_data::ExecutedState;
use air::ExecutionCidState;
use air::UncatchableError::*;
use air_interpreter_data::RawValue;
use air_interpreter_data::ValueRef;
use air_test_framework::AirScriptExecutor;
use air_test_utils::prelude::*;
@ -139,7 +140,9 @@ fn malformed_call_service_failed() {
// Craft an artificial incorrect error result
let value = json!("error");
let value_cid = cid_state.value_tracker.track_value(value.clone()).unwrap();
let value_cid = cid_state
.value_tracker
.track_raw_value(RawValue::from_value(value.clone()));
let tetraplet = SecurityTetraplet::literal_tetraplet(peer_id);
let tetraplet_cid = cid_state.tetraplet_tracker.track_value(tetraplet).unwrap();
let service_result_agg = ServiceResultCidAggregate {

View File

@ -28,7 +28,7 @@
mod verify;
pub use crate::verify::{verify_value, CidVerificationError};
pub use crate::verify::{verify_raw_value, verify_value, CidVerificationError};
use serde::Deserialize;
use serde::Serialize;
@ -131,7 +131,7 @@ pub fn value_to_json_cid<Val: Serialize + ?Sized>(
let digest = Code::Blake3_256
.wrap(&hash)
.expect("can't happend: incorrect hash length");
.expect("can't happen: incorrect hash length");
let cid = Cid::new_v1(JSON_CODEC, digest);
Ok(CID::new(cid.to_string()))
@ -152,6 +152,27 @@ pub(crate) fn value_json_hash<D: digest::Digest + std::io::Write, Val: Serialize
Ok(hash.to_vec())
}
pub fn raw_value_to_json_cid<Val>(raw_value: impl AsRef<[u8]>) -> CID<Val> {
use cid::Cid;
use multihash_codetable::{Code, MultihashDigest};
let hash = raw_value_hash::<blake3::Hasher>(raw_value);
let digest = Code::Blake3_256
.wrap(&hash)
.expect("can't happen: incorrect hash length");
let cid = Cid::new_v1(JSON_CODEC, digest);
CID::new(cid.to_string())
}
pub(crate) fn raw_value_hash<D: digest::Digest>(raw_value: impl AsRef<[u8]>) -> Vec<u8> {
let mut hasher = D::new();
hasher.update(raw_value);
let hash = hasher.finalize();
hash.to_vec()
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -54,6 +54,53 @@ pub fn verify_value<Val: Serialize>(
}
}
pub fn verify_raw_value<Val>(
cid: &CID<Val>,
raw_value: impl AsRef<[u8]>,
) -> Result<(), CidVerificationError> {
use digest::Digest;
use multihash_codetable::Code;
let real_cid: cid::Cid = cid.try_into()?;
let codec = real_cid.codec();
// we insist ATM that raw values should be JSON-encoded, but
// we do not validate that it is valid JSON data
if codec != JSON_CODEC {
return Err(CidVerificationError::UnsupportedCidCodec(codec));
}
let mhash = real_cid.hash();
let raw_code = mhash.code();
let code: Code = raw_code
.try_into()
.map_err(|_| CidVerificationError::UnsupportedHashCode(raw_code))?;
let expected_hash = match code {
Code::Sha2_256 => {
let mut hasher = sha2::Sha256::new();
hasher.update(raw_value);
hasher.finalize().to_vec()
}
Code::Blake3_256 => {
let mut hasher = blake3::Hasher::new();
hasher.update(raw_value.as_ref());
hasher.finalize().to_vec()
}
_ => return Err(CidVerificationError::UnsupportedHashCode(raw_code)),
};
// actually, multihash may contain less bytes than the full hash; to avoid abuse, we reject such multihashes
if expected_hash == mhash.digest() {
Ok(())
} else {
Err(CidVerificationError::ValueMismatch {
type_name: std::any::type_name::<Val>(),
cid_repr: cid.get_inner(),
})
}
}
fn verify_json_value<Val: Serialize>(
mhash: &multihash_codetable::Multihash,
value: &Val,

View File

@ -24,7 +24,7 @@ polyplets = { version = "0.5.1", path = "../polyplets" }
fluence-keypair = { version = "0.10.4", default-features = false }
serde = {version = "1.0.190", features = ["derive", "rc"]}
serde_json = "1.0.108"
serde_json = { version = "1.0.95", features = ["raw_value"] }
semver = { version = "1.0.17", features = ["serde"] }
once_cell = "1.17.1"
tracing = "0.1.40"

View File

@ -19,7 +19,7 @@ use crate::CidStoreVerificationError;
use crate::CanonCidAggregate;
use crate::CanonResultCidAggregate;
use crate::JValue;
use crate::RawValue;
use crate::ServiceResultCidAggregate;
use polyplets::SecurityTetraplet;
@ -29,7 +29,7 @@ use serde::Serialize;
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CidInfo {
/// Map CID to value.
pub value_store: CidStore<JValue>,
pub value_store: CidStore<RawValue>,
/// Map CID to a tetraplet.
pub tetraplet_store: CidStore<SecurityTetraplet>,
@ -57,7 +57,7 @@ impl CidInfo {
}
fn verify_value_store(&self) -> Result<(), CidStoreVerificationError> {
self.value_store.verify()
self.value_store.verify_raw_value()
}
fn verify_tetraplet_store(&self) -> Result<(), CidStoreVerificationError> {

View File

@ -15,8 +15,11 @@
*/
use crate::JValue;
use crate::RawValue;
use air_interpreter_cid::raw_value_to_json_cid;
use air_interpreter_cid::value_to_json_cid;
use air_interpreter_cid::verify_raw_value;
use air_interpreter_cid::verify_value;
use air_interpreter_cid::CidCalculationError;
use air_interpreter_cid::CidRef;
@ -79,6 +82,15 @@ impl<Val: Serialize> CidStore<Val> {
}
}
impl CidStore<RawValue> {
pub fn verify_raw_value(&self) -> Result<(), CidStoreVerificationError> {
for (cid, value) in &self.0 {
verify_raw_value(cid, value.as_inner())?;
}
Ok(())
}
}
#[derive(ThisError, Debug)]
pub enum CidStoreVerificationError {
#[error(transparent)]
@ -134,6 +146,15 @@ impl<Val: Serialize> CidTracker<Val> {
}
}
impl CidTracker<RawValue> {
pub fn track_raw_value(&mut self, value: impl Into<Rc<RawValue>>) -> CID<RawValue> {
let value = value.into();
let cid = raw_value_to_json_cid(value.as_inner());
self.cids.insert(cid.clone(), value);
cid
}
}
impl<Val> Default for CidTracker<Val> {
fn default() -> Self {
Self {

View File

@ -19,6 +19,7 @@ mod se_de;
use crate::GenerationIdx;
use crate::JValue;
use crate::RawValue;
use crate::TracePos;
use air_interpreter_cid::CID;
@ -114,7 +115,7 @@ impl CallServiceFailed {
#[serde(rename_all = "snake_case")]
/// A proof of service result execution result.
pub struct ServiceResultCidAggregate {
pub value_cid: CID<JValue>,
pub value_cid: CID<RawValue>,
/// Hash of the call arguments.
pub argument_hash: Rc<str>,
/// The tetraplet of the call result.
@ -203,7 +204,7 @@ pub struct CanonResultCidAggregate {
/// The type Canon trace CID refers to.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CanonCidAggregate {
pub value: CID<serde_json::Value>,
pub value: CID<RawValue>,
pub tetraplet: CID<SecurityTetraplet>,
pub provenance: Provenance,
}

View File

@ -15,6 +15,7 @@
*/
use super::*;
use crate::RawValue;
impl ParResult {
pub fn new(left_size: u32, right_size: u32) -> Self {
@ -122,7 +123,7 @@ impl CanonResultCidAggregate {
impl CanonCidAggregate {
pub fn new(
value: CID<serde_json::Value>,
value: CID<RawValue>,
tetraplet: CID<SecurityTetraplet>,
provenance: Provenance,
) -> Self {
@ -136,7 +137,7 @@ impl CanonCidAggregate {
impl ServiceResultCidAggregate {
pub fn new(
value_cid: CID<JValue>,
value_cid: CID<RawValue>,
argument_hash: Rc<str>,
tetraplet_cid: CID<SecurityTetraplet>,
) -> Self {

View File

@ -31,6 +31,7 @@ mod cid_store;
mod executed_state;
mod generation_idx;
mod interpreter_data;
mod raw_value;
mod trace;
mod trace_pos;
@ -39,6 +40,7 @@ pub use cid_store::*;
pub use executed_state::*;
pub use generation_idx::*;
pub use interpreter_data::*;
pub use raw_value::*;
pub use trace::*;
pub use trace_pos::*;

View File

@ -0,0 +1,70 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::JValue;
use serde::Deserialize;
use serde::Serialize;
use std::cell::RefCell;
use std::rc::Rc;
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(transparent)]
pub struct RawValue {
raw: Box<str>,
#[serde(skip)]
parsed: RefCell<Option<Rc<JValue>>>,
}
impl RawValue {
pub fn from_value(value: impl Into<Rc<JValue>>) -> Self {
let value = value.into();
let raw = value.to_string().into();
Self {
raw,
parsed: Some(value).into(),
}
}
pub fn get_value(&self) -> Rc<JValue> {
let mut parsed_guard = self.parsed.borrow_mut();
let parsed_value = parsed_guard
.get_or_insert_with(|| serde_json::from_str(&self.raw).expect("TODO handle error"));
parsed_value.clone()
}
pub(crate) fn as_inner(&self) -> &str {
&self.raw
}
}
impl From<JValue> for RawValue {
fn from(value: JValue) -> Self {
Self::from_value(value)
}
}
impl PartialEq for RawValue {
fn eq(&self, other: &Self) -> bool {
self.get_value() == other.get_value()
}
}
// TODO is it implemented for JValue?
impl Eq for RawValue {}

View File

@ -36,6 +36,7 @@ use air_interpreter_cid::CID;
use air_interpreter_data::CanonCidAggregate;
use air_interpreter_data::GenerationIdx;
use air_interpreter_data::Provenance;
use air_interpreter_data::RawValue;
use air_interpreter_data::ServiceResultCidAggregate;
use avm_server::SecurityTetraplet;
use serde::Deserialize;
@ -47,15 +48,12 @@ pub fn simple_value_aggregate_cid(
result: impl Into<serde_json::Value>,
cid_state: &mut ExecutionCidState,
) -> CID<ServiceResultCidAggregate> {
let value_cid = cid_state
.value_tracker
.track_value(Rc::new(result.into()))
.unwrap();
let value = result.into();
let vm_value = RawValue::from_value(value);
let value_cid = cid_state.value_tracker.track_raw_value(vm_value);
let tetraplet = SecurityTetraplet::default();
let tetraplet_cid = cid_state
.tetraplet_tracker
.track_value(Rc::new(tetraplet))
.unwrap();
let tetraplet_cid = cid_state.tetraplet_tracker.track_value(tetraplet).unwrap();
let service_result_agg = ServiceResultCidAggregate {
value_cid,
argument_hash: "".into(),
@ -63,7 +61,7 @@ pub fn simple_value_aggregate_cid(
};
cid_state
.service_result_agg_tracker
.track_value(Rc::new(service_result_agg))
.track_value(service_result_agg)
.unwrap()
}
@ -73,14 +71,10 @@ pub fn value_aggregate_cid(
args: Vec<serde_json::Value>,
cid_state: &mut ExecutionCidState,
) -> CID<ServiceResultCidAggregate> {
let value_cid = cid_state
.value_tracker
.track_value(Rc::new(result.into()))
.unwrap();
let tetraplet_cid = cid_state
.tetraplet_tracker
.track_value(Rc::new(tetraplet))
.unwrap();
let value = result.into();
let vm_value = RawValue::from_value(value);
let value_cid = cid_state.value_tracker.track_raw_value(vm_value);
let tetraplet_cid = cid_state.tetraplet_tracker.track_value(tetraplet).unwrap();
let arguments = serde_json::Value::Array(args);
let argument_hash = value_to_json_cid(&arguments).unwrap().get_inner();
@ -93,7 +87,7 @@ pub fn value_aggregate_cid(
cid_state
.service_result_agg_tracker
.track_value(Rc::new(service_result_agg))
.track_value(service_result_agg)
.unwrap()
}
@ -181,7 +175,9 @@ pub fn canon_tracked(
.values
.iter()
.map(|value| {
let value_cid = cid_state.value_tracker.track_value(value.result.clone())?;
let vm_value = RawValue::from_value(value.result.clone());
let value_cid = cid_state.value_tracker.track_raw_value(vm_value);
let tetraplet_cid = cid_state
.tetraplet_tracker
.track_value(value.tetraplet.clone())?;