quorum implementation

This commit is contained in:
DieMyst 2023-02-21 16:31:21 +04:00
parent e9e57b09f4
commit d302f269ad
7 changed files with 821 additions and 54 deletions

View File

@ -28,13 +28,19 @@ where config is:
"serviceId": "eth-rpc serviceId", "serviceId": "eth-rpc serviceId",
"port": 3000, "port": 3000,
"counterServiceId": null, "counterServiceId": null,
"counterPeerId": null "counterPeerId": null,
"quorumServiceId": null,
"quorumPeerId": null,
"quorumNumber": null
} }
``` ```
`counterServiceId` and `counterPeerId` is credentials to counter service for `round-robin` mode. Will be used local counter if undefined. `counterServiceId` and `counterPeerId` is credentials to counter service for `round-robin` mode. Will be used local counter if undefined.
`quorumServiceId` and `quorumPeerId` is credentials to counter service for `round-robin` mode. Will be used local counter if undefined.
`quorumNumber` is `2` by default.
## Mode ## Mode
`random` - choose providers randomly `random` - choose providers randomly
`round-robin` - choose providers in circle order `round-robin` - choose providers in circle order
`quorum` - call all providers and choose the result that is the same from `>= quorumNumber` providers. Or return an error.

View File

@ -17,6 +17,14 @@ import {
// Services // Services
export interface NumOpDef {
identity: (n: number, callParams: CallParams$$<'n'>) => number | Promise<number>;
}
export function registerNumOp(service: NumOpDef): void;
export function registerNumOp(serviceId: string, service: NumOpDef): void;
export function registerNumOp(peer: IFluenceClient$$, service: NumOpDef): void;
export function registerNumOp(peer: IFluenceClient$$, serviceId: string, service: NumOpDef): void;
export interface LoggerDef { export interface LoggerDef {
log: (s: string[], callParams: CallParams$$<'s'>) => void | Promise<void>; log: (s: string[], callParams: CallParams$$<'s'>) => void | Promise<void>;
logCall: (s: string, callParams: CallParams$$<'s'>) => void | Promise<void>; logCall: (s: string, callParams: CallParams$$<'s'>) => void | Promise<void>;
@ -26,6 +34,14 @@ export function registerLogger(serviceId: string, service: LoggerDef): void;
export function registerLogger(peer: IFluenceClient$$, service: LoggerDef): void; export function registerLogger(peer: IFluenceClient$$, service: LoggerDef): void;
export function registerLogger(peer: IFluenceClient$$, serviceId: string, service: LoggerDef): void; export function registerLogger(peer: IFluenceClient$$, serviceId: string, service: LoggerDef): void;
export interface QuorumCheckerDef {
check: (results: { error: string; success: boolean; value: string; }[], minResults: number, callParams: CallParams$$<'results' | 'minResults'>) => { error: string; results: { error: string; success: boolean; value: string; }[]; value: string; } | Promise<{ error: string; results: { error: string; success: boolean; value: string; }[]; value: string; }>;
}
export function registerQuorumChecker(service: QuorumCheckerDef): void;
export function registerQuorumChecker(serviceId: string, service: QuorumCheckerDef): void;
export function registerQuorumChecker(peer: IFluenceClient$$, service: QuorumCheckerDef): void;
export function registerQuorumChecker(peer: IFluenceClient$$, serviceId: string, service: QuorumCheckerDef): void;
export interface EthCallerDef { export interface EthCallerDef {
eth_call: (uri: string, method: string, jsonArgs: string[], callParams: CallParams$$<'uri' | 'method' | 'jsonArgs'>) => { error: string; success: boolean; value: string; } | Promise<{ error: string; success: boolean; value: string; }>; eth_call: (uri: string, method: string, jsonArgs: string[], callParams: CallParams$$<'uri' | 'method' | 'jsonArgs'>) => { error: string; success: boolean; value: string; } | Promise<{ error: string; success: boolean; value: string; }>;
} }
@ -40,14 +56,6 @@ export function registerCounter(serviceId: string, service: CounterDef): void;
export function registerCounter(peer: IFluenceClient$$, service: CounterDef): void; export function registerCounter(peer: IFluenceClient$$, service: CounterDef): void;
export function registerCounter(peer: IFluenceClient$$, serviceId: string, service: CounterDef): void; export function registerCounter(peer: IFluenceClient$$, serviceId: string, service: CounterDef): void;
export interface NumOpDef {
identity: (n: number, callParams: CallParams$$<'n'>) => number | Promise<number>;
}
export function registerNumOp(service: NumOpDef): void;
export function registerNumOp(serviceId: string, service: NumOpDef): void;
export function registerNumOp(peer: IFluenceClient$$, service: NumOpDef): void;
export function registerNumOp(peer: IFluenceClient$$, serviceId: string, service: NumOpDef): void;
// Functions // Functions
@ -85,6 +93,33 @@ export function empty(
): Promise<EmptyResult>; ): Promise<EmptyResult>;
export type QuorumEthResult = { error: string; results: { error: string; success: boolean; value: string; }[]; value: string; }
export function quorumEth(
uris: string[],
quorumNumber: number,
timeout: number,
method: string,
jsonArgs: string[],
serviceId: string,
quorumServiceId: string,
quorumPeerId: string,
config?: {ttl?: number}
): Promise<QuorumEthResult>;
export function quorumEth(
peer: IFluenceClient$$,
uris: string[],
quorumNumber: number,
timeout: number,
method: string,
jsonArgs: string[],
serviceId: string,
quorumServiceId: string,
quorumPeerId: string,
config?: {ttl?: number}
): Promise<QuorumEthResult>;
export type RandomLoadBalancingResult = { error: string; success: boolean; value: string; } export type RandomLoadBalancingResult = { error: string; success: boolean; value: string; }
export function randomLoadBalancing( export function randomLoadBalancing(
uris: string[], uris: string[],
@ -125,6 +160,35 @@ export function randomLoadBalancingEth(
): Promise<RandomLoadBalancingEthResult>; ): Promise<RandomLoadBalancingEthResult>;
export type QuorumResult = { error: string; results: { error: string; success: boolean; value: string; }[]; value: string; }
export function quorum(
uris: string[],
quorumNumber: number,
timeout: number,
method: string,
jsonArgs: string[],
serviceId: string,
quorumServiceId: string,
quorumPeerId: string,
callFunc: (arg0: string, arg1: string, arg2: string[], arg3: string, callParams: CallParams$$<'arg0' | 'arg1' | 'arg2' | 'arg3'>) => { error: string; success: boolean; value: string; } | Promise<{ error: string; success: boolean; value: string; }>,
config?: {ttl?: number}
): Promise<QuorumResult>;
export function quorum(
peer: IFluenceClient$$,
uris: string[],
quorumNumber: number,
timeout: number,
method: string,
jsonArgs: string[],
serviceId: string,
quorumServiceId: string,
quorumPeerId: string,
callFunc: (arg0: string, arg1: string, arg2: string[], arg3: string, callParams: CallParams$$<'arg0' | 'arg1' | 'arg2' | 'arg3'>) => { error: string; success: boolean; value: string; } | Promise<{ error: string; success: boolean; value: string; }>,
config?: {ttl?: number}
): Promise<QuorumResult>;
export type RoundRobinResult = { error: string; success: boolean; value: string; } export type RoundRobinResult = { error: string; success: boolean; value: string; }
export function roundRobin( export function roundRobin(
uris: string[], uris: string[],

View File

@ -20,6 +20,45 @@ import {
export function registerNumOp(...args) {
registerService$$(
args,
{
"defaultServiceId" : "op",
"functions" : {
"tag" : "labeledProduct",
"fields" : {
"identity" : {
"tag" : "arrow",
"domain" : {
"tag" : "labeledProduct",
"fields" : {
"n" : {
"tag" : "scalar",
"name" : "u64"
}
}
},
"codomain" : {
"tag" : "unlabeledProduct",
"items" : [
{
"tag" : "scalar",
"name" : "i64"
}
]
}
}
}
}
}
);
}
export function registerLogger(...args) { export function registerLogger(...args) {
registerService$$( registerService$$(
args, args,
@ -71,6 +110,97 @@ export function registerLogger(...args) {
export function registerQuorumChecker(...args) {
registerService$$(
args,
{
"defaultServiceId" : "quorum",
"functions" : {
"tag" : "labeledProduct",
"fields" : {
"check" : {
"tag" : "arrow",
"domain" : {
"tag" : "labeledProduct",
"fields" : {
"results" : {
"tag" : "array",
"type" : {
"tag" : "struct",
"name" : "EthResult",
"fields" : {
"error" : {
"tag" : "scalar",
"name" : "string"
},
"success" : {
"tag" : "scalar",
"name" : "bool"
},
"value" : {
"tag" : "scalar",
"name" : "string"
}
}
}
},
"minResults" : {
"tag" : "scalar",
"name" : "u32"
}
}
},
"codomain" : {
"tag" : "unlabeledProduct",
"items" : [
{
"tag" : "struct",
"name" : "QuorumResult",
"fields" : {
"error" : {
"tag" : "scalar",
"name" : "string"
},
"results" : {
"tag" : "array",
"type" : {
"tag" : "struct",
"name" : "EthResult",
"fields" : {
"error" : {
"tag" : "scalar",
"name" : "string"
},
"success" : {
"tag" : "scalar",
"name" : "bool"
},
"value" : {
"tag" : "scalar",
"name" : "string"
}
}
}
},
"value" : {
"tag" : "scalar",
"name" : "string"
}
}
}
]
}
}
}
}
}
);
}
export function registerEthCaller(...args) { export function registerEthCaller(...args) {
registerService$$( registerService$$(
args, args,
@ -163,45 +293,6 @@ export function registerCounter(...args) {
); );
} }
export function registerNumOp(...args) {
registerService$$(
args,
{
"defaultServiceId" : "op",
"functions" : {
"tag" : "labeledProduct",
"fields" : {
"identity" : {
"tag" : "arrow",
"domain" : {
"tag" : "labeledProduct",
"fields" : {
"n" : {
"tag" : "scalar",
"name" : "u64"
}
}
},
"codomain" : {
"tag" : "unlabeledProduct",
"items" : [
{
"tag" : "scalar",
"name" : "i64"
}
]
}
}
}
}
}
);
}
// Functions // Functions
export function roundRobinEth(...args) { export function roundRobinEth(...args) {
@ -417,6 +508,242 @@ export function empty(...args) {
} }
export function quorumEth(...args) {
let script = `
(xor
(seq
(seq
(seq
(seq
(seq
(seq
(seq
(seq
(seq
(seq
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(call %init_peer_id% ("getDataSrv" "uris") [] uris)
)
(call %init_peer_id% ("getDataSrv" "quorumNumber") [] quorumNumber)
)
(call %init_peer_id% ("getDataSrv" "timeout") [] timeout)
)
(call %init_peer_id% ("getDataSrv" "method") [] method)
)
(call %init_peer_id% ("getDataSrv" "jsonArgs") [] jsonArgs)
)
(call %init_peer_id% ("getDataSrv" "serviceId") [] serviceId)
)
(call %init_peer_id% ("getDataSrv" "quorumServiceId") [] quorumServiceId)
)
(call %init_peer_id% ("getDataSrv" "quorumPeerId") [] quorumPeerId)
)
(new $results
(seq
(seq
(seq
(xor
(par
(fold uris uri-0
(par
(seq
(seq
(xor
(call -relay- (serviceId "eth_call") [uri-0 method jsonArgs] res)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
(ap res $results)
)
(call %init_peer_id% ("op" "noop") [])
)
(next uri-0)
)
(never)
)
(null)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
(par
(seq
(seq
(seq
(seq
(seq
(seq
(ap uris uris_to_functor)
(ap uris_to_functor.length uris_length)
)
(call %init_peer_id% ("math" "sub") [uris_length 1] sub)
)
(new $results_test
(seq
(seq
(seq
(call %init_peer_id% ("math" "add") [sub 1] results_incr)
(fold $results s
(seq
(seq
(ap s $results_test)
(canon %init_peer_id% $results_test #results_iter_canon)
)
(xor
(match #results_iter_canon.length results_incr
(null)
)
(next s)
)
)
(never)
)
)
(canon %init_peer_id% $results_test #results_result_canon)
)
(ap #results_result_canon results_gate)
)
)
)
(ap uris uris_to_functor-0)
)
(ap uris_to_functor-0.length uris_length-0)
)
(call %init_peer_id% ("math" "sub") [uris_length-0 1] sub-0)
)
(call %init_peer_id% ("peer" "timeout") [timeout ""])
)
)
(call -relay- ("op" "noop") [])
)
(xor
(seq
(seq
(canon quorumPeerId $results #results_canon)
(call quorumPeerId ("quorum" "check") [#results_canon quorumNumber] quorumResult)
)
(call -relay- ("op" "noop") [])
)
(seq
(call -relay- ("op" "noop") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
)
)
)
)
(xor
(call %init_peer_id% ("callbackSrv" "response") [quorumResult])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4])
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5])
)
`
return callFunction$$(
args,
{
"functionName" : "quorumEth",
"arrow" : {
"tag" : "arrow",
"domain" : {
"tag" : "labeledProduct",
"fields" : {
"uris" : {
"tag" : "array",
"type" : {
"tag" : "scalar",
"name" : "string"
}
},
"quorumNumber" : {
"tag" : "scalar",
"name" : "u32"
},
"timeout" : {
"tag" : "scalar",
"name" : "u32"
},
"method" : {
"tag" : "scalar",
"name" : "string"
},
"jsonArgs" : {
"tag" : "array",
"type" : {
"tag" : "scalar",
"name" : "string"
}
},
"serviceId" : {
"tag" : "scalar",
"name" : "string"
},
"quorumServiceId" : {
"tag" : "scalar",
"name" : "string"
},
"quorumPeerId" : {
"tag" : "scalar",
"name" : "string"
}
}
},
"codomain" : {
"tag" : "unlabeledProduct",
"items" : [
{
"tag" : "struct",
"name" : "QuorumResult",
"fields" : {
"error" : {
"tag" : "scalar",
"name" : "string"
},
"results" : {
"tag" : "array",
"type" : {
"tag" : "struct",
"name" : "EthResult",
"fields" : {
"error" : {
"tag" : "scalar",
"name" : "string"
},
"success" : {
"tag" : "scalar",
"name" : "bool"
},
"value" : {
"tag" : "scalar",
"name" : "string"
}
}
}
},
"value" : {
"tag" : "scalar",
"name" : "string"
}
}
}
]
}
},
"names" : {
"relay" : "-relay-",
"getDataSrv" : "getDataSrv",
"callbackSrv" : "callbackSrv",
"responseSrv" : "callbackSrv",
"responseFnName" : "response",
"errorHandlingSrv" : "errorHandlingSrv",
"errorFnName" : "error"
}
},
script
)
}
export function randomLoadBalancing(...args) { export function randomLoadBalancing(...args) {
let script = ` let script = `
@ -711,6 +1038,292 @@ export function randomLoadBalancingEth(...args) {
} }
export function quorum(...args) {
let script = `
(xor
(seq
(seq
(seq
(seq
(seq
(seq
(seq
(seq
(seq
(seq
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(call %init_peer_id% ("getDataSrv" "uris") [] uris)
)
(call %init_peer_id% ("getDataSrv" "quorumNumber") [] quorumNumber)
)
(call %init_peer_id% ("getDataSrv" "timeout") [] timeout)
)
(call %init_peer_id% ("getDataSrv" "method") [] method)
)
(call %init_peer_id% ("getDataSrv" "jsonArgs") [] jsonArgs)
)
(call %init_peer_id% ("getDataSrv" "serviceId") [] serviceId)
)
(call %init_peer_id% ("getDataSrv" "quorumServiceId") [] quorumServiceId)
)
(call %init_peer_id% ("getDataSrv" "quorumPeerId") [] quorumPeerId)
)
(new $results
(seq
(seq
(seq
(xor
(par
(fold uris uri-0
(par
(seq
(seq
(xor
(call %init_peer_id% ("callbackSrv" "callFunc") [uri-0 method jsonArgs serviceId] init_call_res0)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
(ap init_call_res0 $results)
)
(call %init_peer_id% ("op" "noop") [])
)
(next uri-0)
)
(never)
)
(null)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
(par
(seq
(seq
(seq
(seq
(seq
(seq
(ap uris uris_to_functor)
(ap uris_to_functor.length uris_length)
)
(call %init_peer_id% ("math" "sub") [uris_length 1] sub)
)
(new $results_test
(seq
(seq
(seq
(call %init_peer_id% ("math" "add") [sub 1] results_incr)
(fold $results s
(seq
(seq
(ap s $results_test)
(canon %init_peer_id% $results_test #results_iter_canon)
)
(xor
(match #results_iter_canon.length results_incr
(null)
)
(next s)
)
)
(never)
)
)
(canon %init_peer_id% $results_test #results_result_canon)
)
(ap #results_result_canon results_gate)
)
)
)
(ap uris uris_to_functor-0)
)
(ap uris_to_functor-0.length uris_length-0)
)
(call %init_peer_id% ("math" "sub") [uris_length-0 1] sub-0)
)
(call %init_peer_id% ("peer" "timeout") [timeout ""])
)
)
(call -relay- ("op" "noop") [])
)
(xor
(seq
(seq
(canon quorumPeerId $results #results_canon)
(call quorumPeerId ("quorum" "check") [#results_canon quorumNumber] quorumResult)
)
(call -relay- ("op" "noop") [])
)
(seq
(call -relay- ("op" "noop") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
)
)
)
)
(xor
(call %init_peer_id% ("callbackSrv" "response") [quorumResult])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4])
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5])
)
`
return callFunction$$(
args,
{
"functionName" : "quorum",
"arrow" : {
"tag" : "arrow",
"domain" : {
"tag" : "labeledProduct",
"fields" : {
"uris" : {
"tag" : "array",
"type" : {
"tag" : "scalar",
"name" : "string"
}
},
"quorumNumber" : {
"tag" : "scalar",
"name" : "u32"
},
"timeout" : {
"tag" : "scalar",
"name" : "u32"
},
"method" : {
"tag" : "scalar",
"name" : "string"
},
"jsonArgs" : {
"tag" : "array",
"type" : {
"tag" : "scalar",
"name" : "string"
}
},
"serviceId" : {
"tag" : "scalar",
"name" : "string"
},
"quorumServiceId" : {
"tag" : "scalar",
"name" : "string"
},
"quorumPeerId" : {
"tag" : "scalar",
"name" : "string"
},
"callFunc" : {
"tag" : "arrow",
"domain" : {
"tag" : "unlabeledProduct",
"items" : [
{
"tag" : "scalar",
"name" : "string"
},
{
"tag" : "scalar",
"name" : "string"
},
{
"tag" : "array",
"type" : {
"tag" : "scalar",
"name" : "string"
}
},
{
"tag" : "scalar",
"name" : "string"
}
]
},
"codomain" : {
"tag" : "unlabeledProduct",
"items" : [
{
"tag" : "struct",
"name" : "EthResult",
"fields" : {
"error" : {
"tag" : "scalar",
"name" : "string"
},
"success" : {
"tag" : "scalar",
"name" : "bool"
},
"value" : {
"tag" : "scalar",
"name" : "string"
}
}
}
]
}
}
}
},
"codomain" : {
"tag" : "unlabeledProduct",
"items" : [
{
"tag" : "struct",
"name" : "QuorumResult",
"fields" : {
"error" : {
"tag" : "scalar",
"name" : "string"
},
"results" : {
"tag" : "array",
"type" : {
"tag" : "struct",
"name" : "EthResult",
"fields" : {
"error" : {
"tag" : "scalar",
"name" : "string"
},
"success" : {
"tag" : "scalar",
"name" : "bool"
},
"value" : {
"tag" : "scalar",
"name" : "string"
}
}
}
},
"value" : {
"tag" : "scalar",
"name" : "string"
}
}
}
]
}
},
"names" : {
"relay" : "-relay-",
"getDataSrv" : "getDataSrv",
"callbackSrv" : "callbackSrv",
"responseSrv" : "callbackSrv",
"responseFnName" : "response",
"errorHandlingSrv" : "errorHandlingSrv",
"errorFnName" : "error"
}
},
script
)
}
export function roundRobin(...args) { export function roundRobin(...args) {
let script = ` let script = `

View File

@ -7,6 +7,11 @@ data EthResult:
success: bool success: bool
error: string error: string
data QuorumResult:
value: string
results: []EthResult
error: string
service Logger("logger"): service Logger("logger"):
log(s: []string) log(s: []string)
logCall(s: string) logCall(s: string)
@ -17,6 +22,9 @@ service NumOp("op"):
service Counter("counter"): service Counter("counter"):
incrementAndReturn() -> u32 incrementAndReturn() -> u32
service QuorumChecker("quorum"):
check(results: []EthResult, minResults: u32) -> QuorumResult
func empty() -> EthResult: func empty() -> EthResult:
<- EthResult(value = "", success = true, error = "") <- EthResult(value = "", success = true, error = "")
@ -50,3 +58,19 @@ func roundRobin(uris: []string, method: string, jsonArgs: []string, serviceId: s
func roundRobinEth(uris: []string, method: string, jsonArgs: []string, serviceId: string, counterServiceId: string, counterPeerId: string) -> EthResult: func roundRobinEth(uris: []string, method: string, jsonArgs: []string, serviceId: string, counterServiceId: string, counterPeerId: string) -> EthResult:
<- roundRobin(uris, method, jsonArgs, serviceId, counterServiceId, counterPeerId, call) <- roundRobin(uris, method, jsonArgs, serviceId, counterServiceId, counterPeerId, call)
func quorum(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, serviceId: string, quorumServiceId: string, quorumPeerId: string,
callFunc: string, string, []string, string -> EthResult) -> QuorumResult:
results: *EthResult
on INIT_PEER_ID:
for uri <- uris par:
results <- callFunc(uri, method, jsonArgs, serviceId)
join results[uris.length - 1]
par Peer.timeout(timeout, "")
on quorumPeerId:
Counter quorumServiceId
quorumResult <- QuorumChecker.check(results, quorumNumber)
<- quorumResult
func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, serviceId: string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult:
<- quorum(uris, quorumNumber, timeout, method, jsonArgs, serviceId, quorumServiceId, quorumPeerId, call)

View File

@ -3,10 +3,13 @@
"https://goerli.infura.io/v3/77214656b25f4cad9cd2540c6d40c301", "https://goerli.infura.io/v3/77214656b25f4cad9cd2540c6d40c301",
"https://goerli.infura.io/v3/c48f3b538f154204ad53d04aa8990544" "https://goerli.infura.io/v3/c48f3b538f154204ad53d04aa8990544"
], ],
"mode": "round-robin", "mode": "quorum",
"relay": "/dns4/kras-02.fluence.dev/tcp/19001/wss/p2p/12D3KooWHLxVhUQyAuZe6AHMB29P7wkvTNMn7eDMcsqimJYLKREf", "relay": "/dns4/kras-02.fluence.dev/tcp/19001/wss/p2p/12D3KooWHLxVhUQyAuZe6AHMB29P7wkvTNMn7eDMcsqimJYLKREf",
"serviceId": "25bf2293-7503-4a01-af00-d1b7d089ca37", "serviceId": "25bf2293-7503-4a01-af00-d1b7d089ca37",
"port": 3000, "port": 3000,
"counterServiceId": null, "counterServiceId": null,
"counterPeerId": null "counterPeerId": null,
"quorumServiceId": null,
"quorumPeerId": null,
"quorumNumber": 2
} }

View File

@ -1,8 +1,9 @@
import fs from 'fs'; import fs from 'fs';
export const configHelp = "Config structure: { port, relay, serviceId, providers, mode, counterServiceId?, counterPeerId?}\n" + export const configHelp = "Config structure: { port, relay, serviceId, providers, mode, counterServiceId?, counterPeerId?}\n" +
"Where 'mode' can be: 'random' (default) or 'round-robin',\n" + "Where 'mode' can be: 'random' (default), 'round-robin' or 'quorum',\n" +
"'counterServiceId' and 'counterPeerId' will use local service if undefined" "'counterServiceId' and 'counterPeerId' will use local service if undefined.\n"
"'quorumServiceId' and 'quorumPeerId' will use local service if undefined.\n"
export function readConfig(path) { export function readConfig(path) {
const rawdata = fs.readFileSync(path); const rawdata = fs.readFileSync(path);

View File

@ -7,7 +7,14 @@ import bodyParser from "body-parser";
import {JSONRPCServer} from "json-rpc-2.0"; import {JSONRPCServer} from "json-rpc-2.0";
import {Fluence} from '@fluencelabs/js-client.api'; import {Fluence} from '@fluencelabs/js-client.api';
import "@fluencelabs/js-client.node" import "@fluencelabs/js-client.node"
import {randomLoadBalancingEth, registerCounter, registerLogger, roundRobinEth} from "../aqua-compiled/rpc.js"; import {
quorumEth,
randomLoadBalancingEth,
registerCounter,
registerLogger,
registerQuorumChecker,
roundRobinEth
} from "../aqua-compiled/rpc.js";
import {readArguments} from "./arguments.js"; import {readArguments} from "./arguments.js";
import {readConfig} from "./config.js"; import {readConfig} from "./config.js";
import {methods} from "./methods.js"; import {methods} from "./methods.js";
@ -57,8 +64,48 @@ registerCounter("counter", {
} }
}) })
function findSameResults(results, minNum) {
const resultCounts = results.filter((obj) => obj.success).map((obj) => obj.value).reduce(function(i, v, idx) {
if (i[v] === undefined) {
i[v] = 1
} else {
i[v] = i[v] + 1;
}
return i;
}, {});
const getMaxRepeated = Math.max(...Object.values(resultCounts));
if (getMaxRepeated >= minNum) {
console.log(resultCounts)
const max = Object.entries(resultCounts).find((kv) => kv[1] === getMaxRepeated)
return {
value: max[0],
results: [],
error: ""
}
} else {
return {
error: "No consensus in results",
results: results,
value: ""
}
}
}
registerQuorumChecker("quorum",
{
check: (ethResults, minQuorum) => {
console.log(ethResults)
return findSameResults(ethResults, minQuorum)
}
}
)
const counterServiceId = config.counterServiceId || 'counter' const counterServiceId = config.counterServiceId || 'counter'
const counterPeerId = config.counterPeerId || peerId const counterPeerId = config.counterPeerId || peerId
const quorumServiceId = config.quorumServiceId || 'quorum'
const quorumPeerId = config.quorumPeerId || peerId
const quorumNumber = config.quorumNumber || 2
async function methodHandler(req, method) { async function methodHandler(req, method) {
console.log(`Receiving request '${method}'`); console.log(`Receiving request '${method}'`);
@ -69,6 +116,15 @@ async function methodHandler(req, method) {
console.log("peerId: " + peerId) console.log("peerId: " + peerId)
result = await roundRobinEth(config.providers, method, req.map((s) => JSON.stringify(s)), config.serviceId, counterServiceId, counterPeerId, result = await roundRobinEth(config.providers, method, req.map((s) => JSON.stringify(s)), config.serviceId, counterServiceId, counterPeerId,
config.serviceId); config.serviceId);
} else if (config.mode === "quorum") {
result = await quorumEth(config.providers, config.quorumNumber, 5000, method, req.map((s) => JSON.stringify(s)), config.serviceId, quorumServiceId, quorumPeerId,
config.serviceId);
if (result.error) {
return {error: result.error, results: result.results}
}
console.log(result)
} }