Gather data in a single particle (#26)

This commit is contained in:
folex 2021-09-27 20:50:09 +03:00 committed by GitHub
parent 566f26363d
commit fb5fe8d3a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 107 deletions

View File

@ -4,7 +4,7 @@ alias PeerInfoCb: PeerId, Info, []Service, []Blueprint, []Module -> ()
alias ServiceInterfaceCb: PeerId, string, Interface -> () alias ServiceInterfaceCb: PeerId, string, Interface -> ()
func collectServiceInterfaces(peer: PeerId, services: []Service, collectServiceInterface: ServiceInterfaceCb): func collectServiceInterfaces(peer: PeerId, services: []Service, collectServiceInterface: ServiceInterfaceCb):
for srv <- services: for srv <- services par:
on peer: on peer:
iface <- Srv.get_interface(srv.id) iface <- Srv.get_interface(srv.id)
collectServiceInterface(peer, srv.id, iface) collectServiceInterface(peer, srv.id, iface)
@ -28,13 +28,14 @@ func findAndAskNeighboursSchema(relayPeerId: PeerId, clientId: PeerId, collectPe
for n2 <- neighbors2 par: for n2 <- neighbors2 par:
askAllAndSend(n2, collectPeerInfo, collectServiceInterface) askAllAndSend(n2, collectPeerInfo, collectServiceInterface)
func getAll(relayPeerId: PeerId, knownPeers: []PeerId, collectPeerInfo: PeerInfoCb, collectServiceInterface: ServiceInterfaceCb): func getAll(knownPeers: []PeerId, collectPeerInfo: PeerInfoCb, collectServiceInterface: ServiceInterfaceCb):
-- co askAllAndSend(relayPeerId, collectPeerInfo, collectServiceInterface) on HOST_PEER_ID:
-- co askAllAndSend(relayPeerId, collectPeerInfo, collectServiceInterface)
-- in order to temporarily reduce the number of particles sent to client
-- we gather data from the known peers only. -- in order to temporarily reduce the number of particles sent to client
-- Known peers are explicitly represent the whole network atm -- we gather data from the known peers only.
for peer <- knownPeers par: -- Known peers are explicitly represent the whole network atm
askAllAndSend(peer, collectPeerInfo, collectServiceInterface) for peer <- knownPeers par:
askAllAndSend(peer, collectPeerInfo, collectServiceInterface)
-- co findAndAskNeighboursSchema(relayPeerId, %init_peer_id%, collectPeerInfo, collectServiceInterface) -- co findAndAskNeighboursSchema(relayPeerId, %init_peer_id%, collectPeerInfo, collectServiceInterface)

View File

@ -60,7 +60,7 @@ config = args[3];
(call %init_peer_id% ("getDataSrv" "services") [] services) (call %init_peer_id% ("getDataSrv" "services") [] services)
) )
(fold services srv (fold services srv
(seq (par
(seq (seq
(call -relay- ("op" "noop") []) (call -relay- ("op" "noop") [])
(xor (xor
@ -193,40 +193,34 @@ config = args[3];
(call -relay- ("op" "noop") []) (call -relay- ("op" "noop") [])
) )
(fold services srv (fold services srv
(seq (par
(seq (seq
(seq
(call -relay- ("op" "noop") [])
(xor
(seq
(seq
(call peer ("srv" "get_interface") [srv.$.id!] iface)
(call -relay- ("op" "noop") [])
)
(xor
(call %init_peer_id% ("callbackSrv" "collectServiceInterface") [peer srv.$.id! iface])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
)
(call -relay- ("op" "noop") []) (call -relay- ("op" "noop") [])
(xor
(seq
(seq
(call peer ("srv" "get_interface") [srv.$.id!] iface)
(call -relay- ("op" "noop") [])
)
(xor
(call %init_peer_id% ("callbackSrv" "collectServiceInterface") [peer srv.$.id! iface])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
)
(seq
(call -relay- ("op" "noop") [])
(next srv)
) )
(next srv)
) )
) )
) )
(seq (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4])
(call -relay- ("op" "noop") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4])
)
) )
) )
(seq (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5])
(call -relay- ("op" "noop") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5])
)
) )
`, `,
@ -372,27 +366,27 @@ config = args[4];
(call -relay- ("op" "noop") []) (call -relay- ("op" "noop") [])
) )
(fold services srv (fold services srv
(seq (par
(seq (seq
(seq
(call -relay- ("op" "noop") [])
(xor
(seq
(seq
(call n2 ("srv" "get_interface") [srv.$.id!] iface)
(call -relay- ("op" "noop") [])
)
(xor
(call %init_peer_id% ("callbackSrv" "collectServiceInterface") [n2 srv.$.id! iface])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
)
(call -relay- ("op" "noop") []) (call -relay- ("op" "noop") [])
(xor
(seq
(seq
(call n2 ("srv" "get_interface") [srv.$.id!] iface)
(call -relay- ("op" "noop") [])
)
(xor
(call %init_peer_id% ("callbackSrv" "collectServiceInterface") [n2 srv.$.id! iface])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
)
(seq
(call -relay- ("op" "noop") [])
(next srv)
) )
(next srv)
) )
) )
) )
@ -489,25 +483,22 @@ h.on('getDataSrv', 'clientId', () => {return clientId;});
export function getAll(...args) { export function getAll(...args) {
let peer; let peer;
let relayPeerId; let knownPeers;
let knownPeers;
let collectPeerInfo; let collectPeerInfo;
let collectServiceInterface; let collectServiceInterface;
let config; let config;
if (FluencePeer.isInstance(args[0])) { if (FluencePeer.isInstance(args[0])) {
peer = args[0]; peer = args[0];
relayPeerId = args[1]; knownPeers = args[1];
knownPeers = args[2];
collectPeerInfo = args[3];
collectServiceInterface = args[4];
config = args[5];
} else {
peer = Fluence.getPeer();
relayPeerId = args[0];
knownPeers = args[1];
collectPeerInfo = args[2]; collectPeerInfo = args[2];
collectServiceInterface = args[3]; collectServiceInterface = args[3];
config = args[4]; config = args[4];
} else {
peer = Fluence.getPeer();
knownPeers = args[0];
collectPeerInfo = args[1];
collectServiceInterface = args[2];
config = args[3];
} }
let request; let request;
@ -519,43 +510,40 @@ config = args[4];
(xor (xor
(seq (seq
(seq (seq
(seq (call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(call %init_peer_id% ("getDataSrv" "relayPeerId") [] relayPeerId)
)
(call %init_peer_id% ("getDataSrv" "knownPeers") [] knownPeers) (call %init_peer_id% ("getDataSrv" "knownPeers") [] knownPeers)
) )
(fold knownPeers peer (xor
(par (fold knownPeers peer
(seq (par
(call -relay- ("op" "noop") []) (seq
(xor (call -relay- ("op" "noop") [])
(seq (xor
(seq (seq
(seq (seq
(seq (seq
(seq (seq
(seq (seq
(seq (seq
(call peer ("peer" "identify") [] ident) (seq
(call peer ("dist" "list_blueprints") [] blueprints) (call peer ("peer" "identify") [] ident)
(call peer ("dist" "list_blueprints") [] blueprints)
)
(call peer ("dist" "list_modules") [] modules)
) )
(call peer ("dist" "list_modules") [] modules) (call peer ("srv" "list") [] services)
) )
(call peer ("srv" "list") [] services) (call -relay- ("op" "noop") [])
)
(xor
(call %init_peer_id% ("callbackSrv" "collectPeerInfo") [peer ident services blueprints modules])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
) )
(call -relay- ("op" "noop") [])
)
(xor
(call %init_peer_id% ("callbackSrv" "collectPeerInfo") [peer ident services blueprints modules])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
) )
(call -relay- ("op" "noop") [])
) )
(call -relay- ("op" "noop") []) (fold services srv
) (par
(fold services srv
(seq
(seq
(seq (seq
(call -relay- ("op" "noop") []) (call -relay- ("op" "noop") [])
(xor (xor
@ -572,23 +560,23 @@ config = args[4];
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
) )
) )
(call -relay- ("op" "noop") []) (seq
(call -relay- ("op" "noop") [])
(next srv)
)
) )
(next srv)
) )
) )
)
(seq
(call -relay- ("op" "noop") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4]) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4])
) )
) )
(next peer)
) )
(next peer)
) )
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5])
) )
) )
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5]) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 6])
) )
`, `,
@ -597,8 +585,7 @@ config = args[4];
h.on('getDataSrv', '-relay-', () => { h.on('getDataSrv', '-relay-', () => {
return peer.getStatus().relayPeerId; return peer.getStatus().relayPeerId;
}); });
h.on('getDataSrv', 'relayPeerId', () => {return relayPeerId;}); h.on('getDataSrv', 'knownPeers', () => {return knownPeers;});
h.on('getDataSrv', 'knownPeers', () => {return knownPeers;});
h.use((req, resp, next) => { h.use((req, resp, next) => {
if(req.serviceId === 'callbackSrv' && req.fnName === 'collectPeerInfo') { if(req.serviceId === 'callbackSrv' && req.fnName === 'collectPeerInfo') {

View File

@ -25,12 +25,12 @@ import { Fluence, KeyPair, setLogLevel } from '@fluencelabs/fluence';
import { Elm } from './Main.elm'; import { Elm } from './Main.elm';
import * as serviceWorker from './serviceWorker'; import * as serviceWorker from './serviceWorker';
import { interfaceInfo, peerInfo } from './types'; import { interfaceInfo, peerInfo } from './types';
import { askAllAndSend } from './_aqua/app'; import { askAllAndSend, getAll } from './_aqua/app';
const defaultNetworkName = 'krasnodar'; const defaultNetworkName = 'testNet + krasnodar';
const defaultEnv = { const defaultEnv = {
relays: [...krasnodar, ...testNet, ...stage], relays: [...testNet, ...krasnodar, ...stage],
relayIdx: 2, relayIdx: 2,
logLevel: 'error', logLevel: 'error',
}; };
@ -162,11 +162,13 @@ function genFlags(peerId, relays, relayIdx) {
} }
app.ports.getAll.subscribe(async (data) => { app.ports.getAll.subscribe(async (data) => {
for (let peer of data.knownPeers) { // for (let peer of data.knownPeers) {
await askAllAndSend(peer, collectPeerInfo, collectServiceInterface, { // await askAllAndSend(peer, collectPeerInfo, collectServiceInterface, {
ttl: 120000, // ttl: 120000,
}); // });
} // }
await getAll(data.knownPeers, collectPeerInfo, collectServiceInterface, { ttl: 120000 });
}); });
})(); })();