From fb5fe8d3a345efadd87c18efc99456fc6811cdd2 Mon Sep 17 00:00:00 2001 From: folex <0xdxdy@gmail.com> Date: Mon, 27 Sep 2021 20:50:09 +0300 Subject: [PATCH] Gather data in a single particle (#26) --- aqua/app.aqua | 21 +++--- src/_aqua/app.js | 165 ++++++++++++++++++++++------------------------- src/index.js | 18 +++--- 3 files changed, 97 insertions(+), 107 deletions(-) diff --git a/aqua/app.aqua b/aqua/app.aqua index ea33945..1a47175 100644 --- a/aqua/app.aqua +++ b/aqua/app.aqua @@ -4,7 +4,7 @@ alias PeerInfoCb: PeerId, Info, []Service, []Blueprint, []Module -> () alias ServiceInterfaceCb: PeerId, string, Interface -> () func collectServiceInterfaces(peer: PeerId, services: []Service, collectServiceInterface: ServiceInterfaceCb): - for srv <- services: + for srv <- services par: on peer: iface <- Srv.get_interface(srv.id) collectServiceInterface(peer, srv.id, iface) @@ -28,13 +28,14 @@ func findAndAskNeighboursSchema(relayPeerId: PeerId, clientId: PeerId, collectPe for n2 <- neighbors2 par: askAllAndSend(n2, collectPeerInfo, collectServiceInterface) -func getAll(relayPeerId: PeerId, knownPeers: []PeerId, collectPeerInfo: PeerInfoCb, collectServiceInterface: ServiceInterfaceCb): - -- co askAllAndSend(relayPeerId, collectPeerInfo, collectServiceInterface) - - -- in order to temporarily reduce the number of particles sent to client - -- we gather data from the known peers only. - -- Known peers are explicitly represent the whole network atm - for peer <- knownPeers par: - askAllAndSend(peer, collectPeerInfo, collectServiceInterface) +func getAll(knownPeers: []PeerId, collectPeerInfo: PeerInfoCb, collectServiceInterface: ServiceInterfaceCb): + on HOST_PEER_ID: + -- co askAllAndSend(relayPeerId, collectPeerInfo, collectServiceInterface) + + -- in order to temporarily reduce the number of particles sent to client + -- we gather data from the known peers only. + -- Known peers are explicitly represent the whole network atm + for peer <- knownPeers par: + askAllAndSend(peer, collectPeerInfo, collectServiceInterface) - -- co findAndAskNeighboursSchema(relayPeerId, %init_peer_id%, collectPeerInfo, collectServiceInterface) + -- co findAndAskNeighboursSchema(relayPeerId, %init_peer_id%, collectPeerInfo, collectServiceInterface) diff --git a/src/_aqua/app.js b/src/_aqua/app.js index c2c6d76..855a3fb 100644 --- a/src/_aqua/app.js +++ b/src/_aqua/app.js @@ -60,7 +60,7 @@ config = args[3]; (call %init_peer_id% ("getDataSrv" "services") [] services) ) (fold services srv - (seq + (par (seq (call -relay- ("op" "noop") []) (xor @@ -193,40 +193,34 @@ config = args[3]; (call -relay- ("op" "noop") []) ) (fold services srv - (seq + (par (seq - (seq - (call -relay- ("op" "noop") []) - (xor - (seq - (seq - (call peer ("srv" "get_interface") [srv.$.id!] iface) - (call -relay- ("op" "noop") []) - ) - (xor - (call %init_peer_id% ("callbackSrv" "collectServiceInterface") [peer srv.$.id! iface]) - (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) - ) - ) - (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) - ) - ) (call -relay- ("op" "noop") []) + (xor + (seq + (seq + (call peer ("srv" "get_interface") [srv.$.id!] iface) + (call -relay- ("op" "noop") []) + ) + (xor + (call %init_peer_id% ("callbackSrv" "collectServiceInterface") [peer srv.$.id! iface]) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) + ) + ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) + ) + ) + (seq + (call -relay- ("op" "noop") []) + (next srv) ) - (next srv) ) ) ) - (seq - (call -relay- ("op" "noop") []) - (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4]) - ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4]) ) ) - (seq - (call -relay- ("op" "noop") []) - (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5]) - ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5]) ) `, @@ -372,27 +366,27 @@ config = args[4]; (call -relay- ("op" "noop") []) ) (fold services srv - (seq + (par (seq - (seq - (call -relay- ("op" "noop") []) - (xor - (seq - (seq - (call n2 ("srv" "get_interface") [srv.$.id!] iface) - (call -relay- ("op" "noop") []) - ) - (xor - (call %init_peer_id% ("callbackSrv" "collectServiceInterface") [n2 srv.$.id! iface]) - (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) - ) - ) - (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) - ) - ) (call -relay- ("op" "noop") []) + (xor + (seq + (seq + (call n2 ("srv" "get_interface") [srv.$.id!] iface) + (call -relay- ("op" "noop") []) + ) + (xor + (call %init_peer_id% ("callbackSrv" "collectServiceInterface") [n2 srv.$.id! iface]) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) + ) + ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) + ) + ) + (seq + (call -relay- ("op" "noop") []) + (next srv) ) - (next srv) ) ) ) @@ -489,25 +483,22 @@ h.on('getDataSrv', 'clientId', () => {return clientId;}); export function getAll(...args) { let peer; - let relayPeerId; -let knownPeers; + let knownPeers; let collectPeerInfo; let collectServiceInterface; let config; if (FluencePeer.isInstance(args[0])) { peer = args[0]; - relayPeerId = args[1]; -knownPeers = args[2]; -collectPeerInfo = args[3]; -collectServiceInterface = args[4]; -config = args[5]; - } else { - peer = Fluence.getPeer(); - relayPeerId = args[0]; -knownPeers = args[1]; + knownPeers = args[1]; collectPeerInfo = args[2]; collectServiceInterface = args[3]; config = args[4]; + } else { + peer = Fluence.getPeer(); + knownPeers = args[0]; +collectPeerInfo = args[1]; +collectServiceInterface = args[2]; +config = args[3]; } let request; @@ -519,43 +510,40 @@ config = args[4]; (xor (seq (seq - (seq - (call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-) - (call %init_peer_id% ("getDataSrv" "relayPeerId") [] relayPeerId) - ) + (call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-) (call %init_peer_id% ("getDataSrv" "knownPeers") [] knownPeers) ) - (fold knownPeers peer - (par - (seq - (call -relay- ("op" "noop") []) - (xor - (seq + (xor + (fold knownPeers peer + (par + (seq + (call -relay- ("op" "noop") []) + (xor (seq (seq (seq (seq (seq (seq - (call peer ("peer" "identify") [] ident) - (call peer ("dist" "list_blueprints") [] blueprints) + (seq + (call peer ("peer" "identify") [] ident) + (call peer ("dist" "list_blueprints") [] blueprints) + ) + (call peer ("dist" "list_modules") [] modules) ) - (call peer ("dist" "list_modules") [] modules) + (call peer ("srv" "list") [] services) ) - (call peer ("srv" "list") [] services) + (call -relay- ("op" "noop") []) + ) + (xor + (call %init_peer_id% ("callbackSrv" "collectPeerInfo") [peer ident services blueprints modules]) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1]) ) - (call -relay- ("op" "noop") []) - ) - (xor - (call %init_peer_id% ("callbackSrv" "collectPeerInfo") [peer ident services blueprints modules]) - (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1]) ) + (call -relay- ("op" "noop") []) ) - (call -relay- ("op" "noop") []) - ) - (fold services srv - (seq - (seq + (fold services srv + (par (seq (call -relay- ("op" "noop") []) (xor @@ -572,23 +560,23 @@ config = args[4]; (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) ) ) - (call -relay- ("op" "noop") []) + (seq + (call -relay- ("op" "noop") []) + (next srv) + ) ) - (next srv) ) ) - ) - (seq - (call -relay- ("op" "noop") []) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4]) ) ) + (next peer) ) - (next peer) ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5]) ) ) - (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5]) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 6]) ) `, @@ -597,8 +585,7 @@ config = args[4]; h.on('getDataSrv', '-relay-', () => { return peer.getStatus().relayPeerId; }); - h.on('getDataSrv', 'relayPeerId', () => {return relayPeerId;}); -h.on('getDataSrv', 'knownPeers', () => {return knownPeers;}); + h.on('getDataSrv', 'knownPeers', () => {return knownPeers;}); h.use((req, resp, next) => { if(req.serviceId === 'callbackSrv' && req.fnName === 'collectPeerInfo') { diff --git a/src/index.js b/src/index.js index 48d56a8..19a648f 100644 --- a/src/index.js +++ b/src/index.js @@ -25,12 +25,12 @@ import { Fluence, KeyPair, setLogLevel } from '@fluencelabs/fluence'; import { Elm } from './Main.elm'; import * as serviceWorker from './serviceWorker'; import { interfaceInfo, peerInfo } from './types'; -import { askAllAndSend } from './_aqua/app'; +import { askAllAndSend, getAll } from './_aqua/app'; -const defaultNetworkName = 'krasnodar'; +const defaultNetworkName = 'testNet + krasnodar'; const defaultEnv = { - relays: [...krasnodar, ...testNet, ...stage], + relays: [...testNet, ...krasnodar, ...stage], relayIdx: 2, logLevel: 'error', }; @@ -162,11 +162,13 @@ function genFlags(peerId, relays, relayIdx) { } app.ports.getAll.subscribe(async (data) => { - for (let peer of data.knownPeers) { - await askAllAndSend(peer, collectPeerInfo, collectServiceInterface, { - ttl: 120000, - }); - } + // for (let peer of data.knownPeers) { + // await askAllAndSend(peer, collectPeerInfo, collectServiceInterface, { + // ttl: 120000, + // }); + // } + + await getAll(data.knownPeers, collectPeerInfo, collectServiceInterface, { ttl: 120000 }); }); })();