From d302f269ad0c0e0dc876d0627591e805724a0727 Mon Sep 17 00:00:00 2001 From: DieMyst Date: Tue, 21 Feb 2023 16:31:21 +0400 Subject: [PATCH] quorum implementation --- .../gateway/README.md | 8 +- .../gateway/aqua-compiled/rpc.d.ts | 80 +- .../gateway/aqua-compiled/rpc.js | 691 +++++++++++++++++- .../gateway/aqua/rpc.aqua | 26 +- .../gateway/config.json | 7 +- .../gateway/src/config.js | 5 +- .../gateway/src/index.js | 58 +- 7 files changed, 821 insertions(+), 54 deletions(-) diff --git a/aqua-examples/decentralized-blockchain-gateway/gateway/README.md b/aqua-examples/decentralized-blockchain-gateway/gateway/README.md index a8f0ad3..03660eb 100644 --- a/aqua-examples/decentralized-blockchain-gateway/gateway/README.md +++ b/aqua-examples/decentralized-blockchain-gateway/gateway/README.md @@ -28,13 +28,19 @@ where config is: "serviceId": "eth-rpc serviceId", "port": 3000, "counterServiceId": null, - "counterPeerId": null + "counterPeerId": null, + "quorumServiceId": null, + "quorumPeerId": null, + "quorumNumber": null } ``` `counterServiceId` and `counterPeerId` is credentials to counter service for `round-robin` mode. Will be used local counter if undefined. +`quorumServiceId` and `quorumPeerId` is credentials to counter service for `round-robin` mode. Will be used local counter if undefined. +`quorumNumber` is `2` by default. ## Mode `random` - choose providers randomly `round-robin` - choose providers in circle order +`quorum` - call all providers and choose the result that is the same from `>= quorumNumber` providers. Or return an error. diff --git a/aqua-examples/decentralized-blockchain-gateway/gateway/aqua-compiled/rpc.d.ts b/aqua-examples/decentralized-blockchain-gateway/gateway/aqua-compiled/rpc.d.ts index 04fffc8..87b46ec 100644 --- a/aqua-examples/decentralized-blockchain-gateway/gateway/aqua-compiled/rpc.d.ts +++ b/aqua-examples/decentralized-blockchain-gateway/gateway/aqua-compiled/rpc.d.ts @@ -17,6 +17,14 @@ import { // Services +export interface NumOpDef { + identity: (n: number, callParams: CallParams$$<'n'>) => number | Promise; +} +export function registerNumOp(service: NumOpDef): void; +export function registerNumOp(serviceId: string, service: NumOpDef): void; +export function registerNumOp(peer: IFluenceClient$$, service: NumOpDef): void; +export function registerNumOp(peer: IFluenceClient$$, serviceId: string, service: NumOpDef): void; + export interface LoggerDef { log: (s: string[], callParams: CallParams$$<'s'>) => void | Promise; logCall: (s: string, callParams: CallParams$$<'s'>) => void | Promise; @@ -26,6 +34,14 @@ export function registerLogger(serviceId: string, service: LoggerDef): void; export function registerLogger(peer: IFluenceClient$$, service: LoggerDef): void; export function registerLogger(peer: IFluenceClient$$, serviceId: string, service: LoggerDef): void; +export interface QuorumCheckerDef { + check: (results: { error: string; success: boolean; value: string; }[], minResults: number, callParams: CallParams$$<'results' | 'minResults'>) => { error: string; results: { error: string; success: boolean; value: string; }[]; value: string; } | Promise<{ error: string; results: { error: string; success: boolean; value: string; }[]; value: string; }>; +} +export function registerQuorumChecker(service: QuorumCheckerDef): void; +export function registerQuorumChecker(serviceId: string, service: QuorumCheckerDef): void; +export function registerQuorumChecker(peer: IFluenceClient$$, service: QuorumCheckerDef): void; +export function registerQuorumChecker(peer: IFluenceClient$$, serviceId: string, service: QuorumCheckerDef): void; + export interface EthCallerDef { eth_call: (uri: string, method: string, jsonArgs: string[], callParams: CallParams$$<'uri' | 'method' | 'jsonArgs'>) => { error: string; success: boolean; value: string; } | Promise<{ error: string; success: boolean; value: string; }>; } @@ -40,14 +56,6 @@ export function registerCounter(serviceId: string, service: CounterDef): void; export function registerCounter(peer: IFluenceClient$$, service: CounterDef): void; export function registerCounter(peer: IFluenceClient$$, serviceId: string, service: CounterDef): void; -export interface NumOpDef { - identity: (n: number, callParams: CallParams$$<'n'>) => number | Promise; -} -export function registerNumOp(service: NumOpDef): void; -export function registerNumOp(serviceId: string, service: NumOpDef): void; -export function registerNumOp(peer: IFluenceClient$$, service: NumOpDef): void; -export function registerNumOp(peer: IFluenceClient$$, serviceId: string, service: NumOpDef): void; - // Functions @@ -85,6 +93,33 @@ export function empty( ): Promise; +export type QuorumEthResult = { error: string; results: { error: string; success: boolean; value: string; }[]; value: string; } +export function quorumEth( + uris: string[], + quorumNumber: number, + timeout: number, + method: string, + jsonArgs: string[], + serviceId: string, + quorumServiceId: string, + quorumPeerId: string, + config?: {ttl?: number} +): Promise; + +export function quorumEth( + peer: IFluenceClient$$, + uris: string[], + quorumNumber: number, + timeout: number, + method: string, + jsonArgs: string[], + serviceId: string, + quorumServiceId: string, + quorumPeerId: string, + config?: {ttl?: number} +): Promise; + + export type RandomLoadBalancingResult = { error: string; success: boolean; value: string; } export function randomLoadBalancing( uris: string[], @@ -125,6 +160,35 @@ export function randomLoadBalancingEth( ): Promise; +export type QuorumResult = { error: string; results: { error: string; success: boolean; value: string; }[]; value: string; } +export function quorum( + uris: string[], + quorumNumber: number, + timeout: number, + method: string, + jsonArgs: string[], + serviceId: string, + quorumServiceId: string, + quorumPeerId: string, + callFunc: (arg0: string, arg1: string, arg2: string[], arg3: string, callParams: CallParams$$<'arg0' | 'arg1' | 'arg2' | 'arg3'>) => { error: string; success: boolean; value: string; } | Promise<{ error: string; success: boolean; value: string; }>, + config?: {ttl?: number} +): Promise; + +export function quorum( + peer: IFluenceClient$$, + uris: string[], + quorumNumber: number, + timeout: number, + method: string, + jsonArgs: string[], + serviceId: string, + quorumServiceId: string, + quorumPeerId: string, + callFunc: (arg0: string, arg1: string, arg2: string[], arg3: string, callParams: CallParams$$<'arg0' | 'arg1' | 'arg2' | 'arg3'>) => { error: string; success: boolean; value: string; } | Promise<{ error: string; success: boolean; value: string; }>, + config?: {ttl?: number} +): Promise; + + export type RoundRobinResult = { error: string; success: boolean; value: string; } export function roundRobin( uris: string[], diff --git a/aqua-examples/decentralized-blockchain-gateway/gateway/aqua-compiled/rpc.js b/aqua-examples/decentralized-blockchain-gateway/gateway/aqua-compiled/rpc.js index e440916..df1d3a9 100644 --- a/aqua-examples/decentralized-blockchain-gateway/gateway/aqua-compiled/rpc.js +++ b/aqua-examples/decentralized-blockchain-gateway/gateway/aqua-compiled/rpc.js @@ -20,6 +20,45 @@ import { +export function registerNumOp(...args) { + registerService$$( + args, + { + "defaultServiceId" : "op", + "functions" : { + "tag" : "labeledProduct", + "fields" : { + "identity" : { + "tag" : "arrow", + "domain" : { + "tag" : "labeledProduct", + "fields" : { + "n" : { + "tag" : "scalar", + "name" : "u64" + } + } + }, + "codomain" : { + "tag" : "unlabeledProduct", + "items" : [ + { + "tag" : "scalar", + "name" : "i64" + } + ] + } + } + } + } +} + ); +} + + + + + export function registerLogger(...args) { registerService$$( args, @@ -71,6 +110,97 @@ export function registerLogger(...args) { +export function registerQuorumChecker(...args) { + registerService$$( + args, + { + "defaultServiceId" : "quorum", + "functions" : { + "tag" : "labeledProduct", + "fields" : { + "check" : { + "tag" : "arrow", + "domain" : { + "tag" : "labeledProduct", + "fields" : { + "results" : { + "tag" : "array", + "type" : { + "tag" : "struct", + "name" : "EthResult", + "fields" : { + "error" : { + "tag" : "scalar", + "name" : "string" + }, + "success" : { + "tag" : "scalar", + "name" : "bool" + }, + "value" : { + "tag" : "scalar", + "name" : "string" + } + } + } + }, + "minResults" : { + "tag" : "scalar", + "name" : "u32" + } + } + }, + "codomain" : { + "tag" : "unlabeledProduct", + "items" : [ + { + "tag" : "struct", + "name" : "QuorumResult", + "fields" : { + "error" : { + "tag" : "scalar", + "name" : "string" + }, + "results" : { + "tag" : "array", + "type" : { + "tag" : "struct", + "name" : "EthResult", + "fields" : { + "error" : { + "tag" : "scalar", + "name" : "string" + }, + "success" : { + "tag" : "scalar", + "name" : "bool" + }, + "value" : { + "tag" : "scalar", + "name" : "string" + } + } + } + }, + "value" : { + "tag" : "scalar", + "name" : "string" + } + } + } + ] + } + } + } + } +} + ); +} + + + + + export function registerEthCaller(...args) { registerService$$( args, @@ -163,45 +293,6 @@ export function registerCounter(...args) { ); } - - - - -export function registerNumOp(...args) { - registerService$$( - args, - { - "defaultServiceId" : "op", - "functions" : { - "tag" : "labeledProduct", - "fields" : { - "identity" : { - "tag" : "arrow", - "domain" : { - "tag" : "labeledProduct", - "fields" : { - "n" : { - "tag" : "scalar", - "name" : "u64" - } - } - }, - "codomain" : { - "tag" : "unlabeledProduct", - "items" : [ - { - "tag" : "scalar", - "name" : "i64" - } - ] - } - } - } - } -} - ); -} - // Functions export function roundRobinEth(...args) { @@ -417,6 +508,242 @@ export function empty(...args) { } +export function quorumEth(...args) { + + let script = ` + (xor + (seq + (seq + (seq + (seq + (seq + (seq + (seq + (seq + (seq + (seq + (call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-) + (call %init_peer_id% ("getDataSrv" "uris") [] uris) + ) + (call %init_peer_id% ("getDataSrv" "quorumNumber") [] quorumNumber) + ) + (call %init_peer_id% ("getDataSrv" "timeout") [] timeout) + ) + (call %init_peer_id% ("getDataSrv" "method") [] method) + ) + (call %init_peer_id% ("getDataSrv" "jsonArgs") [] jsonArgs) + ) + (call %init_peer_id% ("getDataSrv" "serviceId") [] serviceId) + ) + (call %init_peer_id% ("getDataSrv" "quorumServiceId") [] quorumServiceId) + ) + (call %init_peer_id% ("getDataSrv" "quorumPeerId") [] quorumPeerId) + ) + (new $results + (seq + (seq + (seq + (xor + (par + (fold uris uri-0 + (par + (seq + (seq + (xor + (call -relay- (serviceId "eth_call") [uri-0 method jsonArgs] res) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1]) + ) + (ap res $results) + ) + (call %init_peer_id% ("op" "noop") []) + ) + (next uri-0) + ) + (never) + ) + (null) + ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) + ) + (par + (seq + (seq + (seq + (seq + (seq + (seq + (ap uris uris_to_functor) + (ap uris_to_functor.length uris_length) + ) + (call %init_peer_id% ("math" "sub") [uris_length 1] sub) + ) + (new $results_test + (seq + (seq + (seq + (call %init_peer_id% ("math" "add") [sub 1] results_incr) + (fold $results s + (seq + (seq + (ap s $results_test) + (canon %init_peer_id% $results_test #results_iter_canon) + ) + (xor + (match #results_iter_canon.length results_incr + (null) + ) + (next s) + ) + ) + (never) + ) + ) + (canon %init_peer_id% $results_test #results_result_canon) + ) + (ap #results_result_canon results_gate) + ) + ) + ) + (ap uris uris_to_functor-0) + ) + (ap uris_to_functor-0.length uris_length-0) + ) + (call %init_peer_id% ("math" "sub") [uris_length-0 1] sub-0) + ) + (call %init_peer_id% ("peer" "timeout") [timeout ""]) + ) + ) + (call -relay- ("op" "noop") []) + ) + (xor + (seq + (seq + (canon quorumPeerId $results #results_canon) + (call quorumPeerId ("quorum" "check") [#results_canon quorumNumber] quorumResult) + ) + (call -relay- ("op" "noop") []) + ) + (seq + (call -relay- ("op" "noop") []) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) + ) + ) + ) + ) + ) + (xor + (call %init_peer_id% ("callbackSrv" "response") [quorumResult]) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4]) + ) + ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5]) + ) + ` + return callFunction$$( + args, + { + "functionName" : "quorumEth", + "arrow" : { + "tag" : "arrow", + "domain" : { + "tag" : "labeledProduct", + "fields" : { + "uris" : { + "tag" : "array", + "type" : { + "tag" : "scalar", + "name" : "string" + } + }, + "quorumNumber" : { + "tag" : "scalar", + "name" : "u32" + }, + "timeout" : { + "tag" : "scalar", + "name" : "u32" + }, + "method" : { + "tag" : "scalar", + "name" : "string" + }, + "jsonArgs" : { + "tag" : "array", + "type" : { + "tag" : "scalar", + "name" : "string" + } + }, + "serviceId" : { + "tag" : "scalar", + "name" : "string" + }, + "quorumServiceId" : { + "tag" : "scalar", + "name" : "string" + }, + "quorumPeerId" : { + "tag" : "scalar", + "name" : "string" + } + } + }, + "codomain" : { + "tag" : "unlabeledProduct", + "items" : [ + { + "tag" : "struct", + "name" : "QuorumResult", + "fields" : { + "error" : { + "tag" : "scalar", + "name" : "string" + }, + "results" : { + "tag" : "array", + "type" : { + "tag" : "struct", + "name" : "EthResult", + "fields" : { + "error" : { + "tag" : "scalar", + "name" : "string" + }, + "success" : { + "tag" : "scalar", + "name" : "bool" + }, + "value" : { + "tag" : "scalar", + "name" : "string" + } + } + } + }, + "value" : { + "tag" : "scalar", + "name" : "string" + } + } + } + ] + } + }, + "names" : { + "relay" : "-relay-", + "getDataSrv" : "getDataSrv", + "callbackSrv" : "callbackSrv", + "responseSrv" : "callbackSrv", + "responseFnName" : "response", + "errorHandlingSrv" : "errorHandlingSrv", + "errorFnName" : "error" + } +}, + script + ) +} + + export function randomLoadBalancing(...args) { let script = ` @@ -711,6 +1038,292 @@ export function randomLoadBalancingEth(...args) { } +export function quorum(...args) { + + let script = ` + (xor + (seq + (seq + (seq + (seq + (seq + (seq + (seq + (seq + (seq + (seq + (call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-) + (call %init_peer_id% ("getDataSrv" "uris") [] uris) + ) + (call %init_peer_id% ("getDataSrv" "quorumNumber") [] quorumNumber) + ) + (call %init_peer_id% ("getDataSrv" "timeout") [] timeout) + ) + (call %init_peer_id% ("getDataSrv" "method") [] method) + ) + (call %init_peer_id% ("getDataSrv" "jsonArgs") [] jsonArgs) + ) + (call %init_peer_id% ("getDataSrv" "serviceId") [] serviceId) + ) + (call %init_peer_id% ("getDataSrv" "quorumServiceId") [] quorumServiceId) + ) + (call %init_peer_id% ("getDataSrv" "quorumPeerId") [] quorumPeerId) + ) + (new $results + (seq + (seq + (seq + (xor + (par + (fold uris uri-0 + (par + (seq + (seq + (xor + (call %init_peer_id% ("callbackSrv" "callFunc") [uri-0 method jsonArgs serviceId] init_call_res0) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1]) + ) + (ap init_call_res0 $results) + ) + (call %init_peer_id% ("op" "noop") []) + ) + (next uri-0) + ) + (never) + ) + (null) + ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) + ) + (par + (seq + (seq + (seq + (seq + (seq + (seq + (ap uris uris_to_functor) + (ap uris_to_functor.length uris_length) + ) + (call %init_peer_id% ("math" "sub") [uris_length 1] sub) + ) + (new $results_test + (seq + (seq + (seq + (call %init_peer_id% ("math" "add") [sub 1] results_incr) + (fold $results s + (seq + (seq + (ap s $results_test) + (canon %init_peer_id% $results_test #results_iter_canon) + ) + (xor + (match #results_iter_canon.length results_incr + (null) + ) + (next s) + ) + ) + (never) + ) + ) + (canon %init_peer_id% $results_test #results_result_canon) + ) + (ap #results_result_canon results_gate) + ) + ) + ) + (ap uris uris_to_functor-0) + ) + (ap uris_to_functor-0.length uris_length-0) + ) + (call %init_peer_id% ("math" "sub") [uris_length-0 1] sub-0) + ) + (call %init_peer_id% ("peer" "timeout") [timeout ""]) + ) + ) + (call -relay- ("op" "noop") []) + ) + (xor + (seq + (seq + (canon quorumPeerId $results #results_canon) + (call quorumPeerId ("quorum" "check") [#results_canon quorumNumber] quorumResult) + ) + (call -relay- ("op" "noop") []) + ) + (seq + (call -relay- ("op" "noop") []) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) + ) + ) + ) + ) + ) + (xor + (call %init_peer_id% ("callbackSrv" "response") [quorumResult]) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4]) + ) + ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5]) + ) + ` + return callFunction$$( + args, + { + "functionName" : "quorum", + "arrow" : { + "tag" : "arrow", + "domain" : { + "tag" : "labeledProduct", + "fields" : { + "uris" : { + "tag" : "array", + "type" : { + "tag" : "scalar", + "name" : "string" + } + }, + "quorumNumber" : { + "tag" : "scalar", + "name" : "u32" + }, + "timeout" : { + "tag" : "scalar", + "name" : "u32" + }, + "method" : { + "tag" : "scalar", + "name" : "string" + }, + "jsonArgs" : { + "tag" : "array", + "type" : { + "tag" : "scalar", + "name" : "string" + } + }, + "serviceId" : { + "tag" : "scalar", + "name" : "string" + }, + "quorumServiceId" : { + "tag" : "scalar", + "name" : "string" + }, + "quorumPeerId" : { + "tag" : "scalar", + "name" : "string" + }, + "callFunc" : { + "tag" : "arrow", + "domain" : { + "tag" : "unlabeledProduct", + "items" : [ + { + "tag" : "scalar", + "name" : "string" + }, + { + "tag" : "scalar", + "name" : "string" + }, + { + "tag" : "array", + "type" : { + "tag" : "scalar", + "name" : "string" + } + }, + { + "tag" : "scalar", + "name" : "string" + } + ] + }, + "codomain" : { + "tag" : "unlabeledProduct", + "items" : [ + { + "tag" : "struct", + "name" : "EthResult", + "fields" : { + "error" : { + "tag" : "scalar", + "name" : "string" + }, + "success" : { + "tag" : "scalar", + "name" : "bool" + }, + "value" : { + "tag" : "scalar", + "name" : "string" + } + } + } + ] + } + } + } + }, + "codomain" : { + "tag" : "unlabeledProduct", + "items" : [ + { + "tag" : "struct", + "name" : "QuorumResult", + "fields" : { + "error" : { + "tag" : "scalar", + "name" : "string" + }, + "results" : { + "tag" : "array", + "type" : { + "tag" : "struct", + "name" : "EthResult", + "fields" : { + "error" : { + "tag" : "scalar", + "name" : "string" + }, + "success" : { + "tag" : "scalar", + "name" : "bool" + }, + "value" : { + "tag" : "scalar", + "name" : "string" + } + } + } + }, + "value" : { + "tag" : "scalar", + "name" : "string" + } + } + } + ] + } + }, + "names" : { + "relay" : "-relay-", + "getDataSrv" : "getDataSrv", + "callbackSrv" : "callbackSrv", + "responseSrv" : "callbackSrv", + "responseFnName" : "response", + "errorHandlingSrv" : "errorHandlingSrv", + "errorFnName" : "error" + } +}, + script + ) +} + + export function roundRobin(...args) { let script = ` diff --git a/aqua-examples/decentralized-blockchain-gateway/gateway/aqua/rpc.aqua b/aqua-examples/decentralized-blockchain-gateway/gateway/aqua/rpc.aqua index 1832068..3b6fbfe 100644 --- a/aqua-examples/decentralized-blockchain-gateway/gateway/aqua/rpc.aqua +++ b/aqua-examples/decentralized-blockchain-gateway/gateway/aqua/rpc.aqua @@ -7,6 +7,11 @@ data EthResult: success: bool error: string +data QuorumResult: + value: string + results: []EthResult + error: string + service Logger("logger"): log(s: []string) logCall(s: string) @@ -17,6 +22,9 @@ service NumOp("op"): service Counter("counter"): incrementAndReturn() -> u32 +service QuorumChecker("quorum"): + check(results: []EthResult, minResults: u32) -> QuorumResult + func empty() -> EthResult: <- EthResult(value = "", success = true, error = "") @@ -49,4 +57,20 @@ func roundRobin(uris: []string, method: string, jsonArgs: []string, serviceId: s <- callFunc(uris[providerNumber], method, jsonArgs, serviceId) func roundRobinEth(uris: []string, method: string, jsonArgs: []string, serviceId: string, counterServiceId: string, counterPeerId: string) -> EthResult: - <- roundRobin(uris, method, jsonArgs, serviceId, counterServiceId, counterPeerId, call) \ No newline at end of file + <- roundRobin(uris, method, jsonArgs, serviceId, counterServiceId, counterPeerId, call) + +func quorum(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, serviceId: string, quorumServiceId: string, quorumPeerId: string, + callFunc: string, string, []string, string -> EthResult) -> QuorumResult: + results: *EthResult + on INIT_PEER_ID: + for uri <- uris par: + results <- callFunc(uri, method, jsonArgs, serviceId) + join results[uris.length - 1] + par Peer.timeout(timeout, "") + on quorumPeerId: + Counter quorumServiceId + quorumResult <- QuorumChecker.check(results, quorumNumber) + <- quorumResult + +func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, serviceId: string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult: + <- quorum(uris, quorumNumber, timeout, method, jsonArgs, serviceId, quorumServiceId, quorumPeerId, call) \ No newline at end of file diff --git a/aqua-examples/decentralized-blockchain-gateway/gateway/config.json b/aqua-examples/decentralized-blockchain-gateway/gateway/config.json index 4377e1b..02ca0de 100644 --- a/aqua-examples/decentralized-blockchain-gateway/gateway/config.json +++ b/aqua-examples/decentralized-blockchain-gateway/gateway/config.json @@ -3,10 +3,13 @@ "https://goerli.infura.io/v3/77214656b25f4cad9cd2540c6d40c301", "https://goerli.infura.io/v3/c48f3b538f154204ad53d04aa8990544" ], - "mode": "round-robin", + "mode": "quorum", "relay": "/dns4/kras-02.fluence.dev/tcp/19001/wss/p2p/12D3KooWHLxVhUQyAuZe6AHMB29P7wkvTNMn7eDMcsqimJYLKREf", "serviceId": "25bf2293-7503-4a01-af00-d1b7d089ca37", "port": 3000, "counterServiceId": null, - "counterPeerId": null + "counterPeerId": null, + "quorumServiceId": null, + "quorumPeerId": null, + "quorumNumber": 2 } \ No newline at end of file diff --git a/aqua-examples/decentralized-blockchain-gateway/gateway/src/config.js b/aqua-examples/decentralized-blockchain-gateway/gateway/src/config.js index f25e963..416fe84 100644 --- a/aqua-examples/decentralized-blockchain-gateway/gateway/src/config.js +++ b/aqua-examples/decentralized-blockchain-gateway/gateway/src/config.js @@ -1,8 +1,9 @@ import fs from 'fs'; export const configHelp = "Config structure: { port, relay, serviceId, providers, mode, counterServiceId?, counterPeerId?}\n" + - "Where 'mode' can be: 'random' (default) or 'round-robin',\n" + - "'counterServiceId' and 'counterPeerId' will use local service if undefined" + "Where 'mode' can be: 'random' (default), 'round-robin' or 'quorum',\n" + + "'counterServiceId' and 'counterPeerId' will use local service if undefined.\n" + "'quorumServiceId' and 'quorumPeerId' will use local service if undefined.\n" export function readConfig(path) { const rawdata = fs.readFileSync(path); diff --git a/aqua-examples/decentralized-blockchain-gateway/gateway/src/index.js b/aqua-examples/decentralized-blockchain-gateway/gateway/src/index.js index a4e9027..a1afa61 100644 --- a/aqua-examples/decentralized-blockchain-gateway/gateway/src/index.js +++ b/aqua-examples/decentralized-blockchain-gateway/gateway/src/index.js @@ -7,7 +7,14 @@ import bodyParser from "body-parser"; import {JSONRPCServer} from "json-rpc-2.0"; import {Fluence} from '@fluencelabs/js-client.api'; import "@fluencelabs/js-client.node" -import {randomLoadBalancingEth, registerCounter, registerLogger, roundRobinEth} from "../aqua-compiled/rpc.js"; +import { + quorumEth, + randomLoadBalancingEth, + registerCounter, + registerLogger, + registerQuorumChecker, + roundRobinEth +} from "../aqua-compiled/rpc.js"; import {readArguments} from "./arguments.js"; import {readConfig} from "./config.js"; import {methods} from "./methods.js"; @@ -57,8 +64,48 @@ registerCounter("counter", { } }) +function findSameResults(results, minNum) { + const resultCounts = results.filter((obj) => obj.success).map((obj) => obj.value).reduce(function(i, v, idx) { + if (i[v] === undefined) { + i[v] = 1 + } else { + i[v] = i[v] + 1; + } + return i; + }, {}); + + const getMaxRepeated = Math.max(...Object.values(resultCounts)); + if (getMaxRepeated >= minNum) { + console.log(resultCounts) + const max = Object.entries(resultCounts).find((kv) => kv[1] === getMaxRepeated) + return { + value: max[0], + results: [], + error: "" + } + } else { + return { + error: "No consensus in results", + results: results, + value: "" + } + } +} + +registerQuorumChecker("quorum", + { + check: (ethResults, minQuorum) => { + console.log(ethResults) + return findSameResults(ethResults, minQuorum) + } + } +) + const counterServiceId = config.counterServiceId || 'counter' const counterPeerId = config.counterPeerId || peerId +const quorumServiceId = config.quorumServiceId || 'quorum' +const quorumPeerId = config.quorumPeerId || peerId +const quorumNumber = config.quorumNumber || 2 async function methodHandler(req, method) { console.log(`Receiving request '${method}'`); @@ -69,6 +116,15 @@ async function methodHandler(req, method) { console.log("peerId: " + peerId) result = await roundRobinEth(config.providers, method, req.map((s) => JSON.stringify(s)), config.serviceId, counterServiceId, counterPeerId, config.serviceId); + } else if (config.mode === "quorum") { + result = await quorumEth(config.providers, config.quorumNumber, 5000, method, req.map((s) => JSON.stringify(s)), config.serviceId, quorumServiceId, quorumPeerId, + config.serviceId); + + if (result.error) { + return {error: result.error, results: result.results} + } + + console.log(result) }