Service api (#927)

This commit is contained in:
Dima 2020-07-27 16:39:54 +03:00 committed by GitHub
parent 4e89a13dee
commit 809395f3cb
8 changed files with 324 additions and 188 deletions

View File

@ -23,30 +23,63 @@ let peerId2 = await Fluence.generatePeerId();
Establish connections to predefined nodes.
```typescript
let client1 = await Fluence.connect("12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb", "104.248.25.59", 9003, peerId1);
let client2 = await Fluence.connect("12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er", "104.248.25.59", 9002, peerId2);
let client1 = await Fluence.connect("/dns4/134.209.186.43/tcp/9003/ws/p2p/12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb", peerId1);
let client2 = await Fluence.connect("/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er", peerId2);
```
Create a new unique service by the first client that will calculate the sum of two numbers.
```typescript
let serviceId = "sum-calculator-" + genUUID();
## Become a provider
await client1.registerService(serviceId, async (req) => {
Create a new unique name to provide by the first client that will calculate the sum of two numbers.
```typescript
let name = "sum-calculator-" + genUUID();
await cl1.provideName(name, async (req) => {
let message = {msgId: req.arguments.msgId, result: req.arguments.one + req.arguments.two};
await client1.sendCall(req.reply_to, message);
await cl1.sendCall(req.reply_to, message);
});
```
Send a request by the second client and print a result. The predicate is required to match a request and a response by `msgId`.
## Become a provider
To declare that you're available on some token (unique name), you can become a provider of that token. For example, below you become a provider of a name `sum-calculator-1234...` so you can share that name with other people, and they can call you by that.
```typescript
let msgId = "calculate-it-for-me" + genUUID();
let req = {one: 12, two: 23, msgId: msgId};
let predicate = (args: any) => args.msgId && args.msgId === msgId;
let response = await client2.sendServiceCallWaitResponse(serviceId, req, predicate);
let response = await client2.callProvider(name, req);
let result = response.result;
console.log(`calculation result is: ${result}`);
```
## Register Service
Will register service that will combine multiple modules around one serviceId
```
let serviceId = await cl2.createService(peerAddr, ["ipfs_node.wasm", "curl.wasm"]);
console.log(serviceId);
```
## Call Service
```
// peerAddr address of the node that runs this service
// "get_address" function to call
// { some_arg: "1" } arguments passed to the function
// "ipfs_node.wasm" name of the module to find function in
let resp = await cl2.callService(peerAddr, serviceId, "ipfs_node.wasm", {some_arg: "1"}, "get_address")
console.log(resp)
```
## Discover Services
```
// get available modules on node (to get info about connected node should use the method without arguments)
let modules = cl1.getAvailableModules(peerAddr);
// get interfaces of existing services
let interfaces = await cl2.getActiveInterfaces(peerAddr);
```

View File

@ -104,7 +104,7 @@ export async function createRelayAddress(relay: string, peerId: PeerId, withSig:
}
}
export function createServiceAddress(service: string, hash?: string): Address {
export function createProviderAddress(service: string, hash?: string): Address {
let protocol = {protocol: ProtocolType.Providers, value: service};

View File

@ -14,7 +14,14 @@
* limitations under the License.
*/
import {Address, createRelayAddress, ProtocolType} from "./address";
import {
Address,
createPeerAddress,
createRelayAddress,
createProviderAddress,
ProtocolType,
addressToString
} from "./address";
import {callToString, FunctionCall, genUUID, makeFunctionCall,} from "./function_call";
import * as PeerId from "peer-id";
import {Services} from "./services";
@ -23,9 +30,29 @@ import {Subscriptions} from "./subscriptions";
import * as PeerInfo from "peer-info";
import {FluenceConnection} from "./fluence_connection";
/**
* @param target receiver
* @param args message in the call
* @param moduleId module name
* @param fname function name
* @param context list of modules to use with the request
* @param name common field for debug purposes
* @param msgId hash that will be added to replyTo address
*/
interface Call {
target: Address,
args: any,
moduleId?: string,
fname?: string,
msgId?: string,
context?: string[],
name?: string
}
export class FluenceClient {
readonly selfPeerInfo: PeerInfo;
readonly selfPeerIdStr: string;
private nodePeerIdStr: string;
private connection: FluenceConnection;
@ -49,14 +76,23 @@ export class FluenceClient {
* Waits a response that match the predicate.
*
* @param predicate will be applied to each incoming call until it matches
* @param ignoreErrors ignore an errors, wait for success response
*/
waitResponse(predicate: (args: any, target: Address, replyTo: Address) => (boolean | undefined)): Promise<any> {
return new Promise((resolve, _) => {
waitResponse(predicate: (args: any, target: Address, replyTo: Address) => (boolean | undefined), ignoreErrors: boolean): Promise<any> {
return new Promise((resolve, reject) => {
// subscribe for responses, to handle response
// TODO if there's no conn, reject
this.subscribe((args: any, target: Address, replyTo: Address) => {
if (predicate(args, target, replyTo)) {
if (args.reason) {
if (ignoreErrors) {
return false;
} else {
reject(new Error(args.reason));
}
} else {
resolve(args);
}
return true;
}
return false;
@ -65,7 +101,7 @@ export class FluenceClient {
}
private getPredicate(msgId: string): (args: any, target: Address) => (boolean | undefined) {
return (args: any, target: Address) => target.hash && target.hash === msgId && !args.reason;
return (args: any, target: Address) => target.hash && target.hash === msgId;
}
/**
@ -77,91 +113,76 @@ export class FluenceClient {
* @param fname functin name
*/
async sendCallWaitResponse(target: Address, args: any, moduleId?: string, fname?: string): Promise<any> {
let replyHash = genUUID();
let predicate = this.getPredicate(replyHash);
await this.sendCall(target, args, true, moduleId, fname, replyHash, undefined);
return this.waitResponse(predicate);
let msgId = genUUID();
let predicate = this.getPredicate(msgId);
await this.sendCall({target: target, args: args, moduleId: moduleId, fname: fname, msgId: msgId});
return this.waitResponse(predicate, false);
}
/**
* Send call and forget.
*
* @param target receiver
* @param args message in the call
* @param reply add a `replyTo` field or not
*/
async sendCall(call: Call) {
if (this.connection && this.connection.isConnected()) {
await this.connection.sendFunctionCall(call.target, call.args, call.moduleId, call.fname, call.msgId, call.context, call.name);
} else {
throw Error("client is not connected")
}
}
/**
* Send call to the provider and wait a response matches predicate.
*
* @param provider published name in dht
* @param args message to the service
* @param moduleId module name
* @param fname function name
* @param name common field for debug purposes
* @param replyHash hash that will be added to replyTo address
* @param name debug info
*/
async sendCall(target: Address, args: any, reply?: boolean, moduleId?: string, fname?: string, replyHash?: string, name?: string) {
if (this.connection && this.connection.isConnected()) {
await this.connection.sendFunctionCall(target, args, reply, moduleId, fname, replyHash, name);
} else {
throw Error("client is not connected")
}
}
/**
* Send call to the service.
*
* @param moduleId
* @param args message to the service
* @param fname function name
* @param name common field for debug purposes
* @param replyHash hash that will be added to replyTo address
*/
async sendServiceCall(moduleId: string, args: any, fname?: string, replyHash?: string, name?: string) {
if (this.connection && this.connection.isConnected()) {
await this.connection.sendServiceCall(moduleId, false, args, fname, replyHash, name);
} else {
throw Error("client is not connected")
}
}
/**
* Send a call to the local service on a peer the client connected with.
*
* @param moduleId
* @param args message to the service
* @param fname function name
* @param name common field for debug purposes
* @param replyHash hash that will be added to replyTo address
*/
async sendServiceLocalCall(moduleId: string, args: any, fname?: string, replyHash?: string, name?: string) {
if (this.connection && this.connection.isConnected()) {
await this.connection.sendServiceCall(moduleId, true, args, fname, replyHash, name);
} else {
throw Error("client is not connected")
}
}
/**
* Send call to the service and wait a response matches predicate.
*
* @param moduleId
* @param args message to the service
* @param fname function name
*/
async sendServiceCallWaitResponse(moduleId: string, args: any, fname?: string): Promise<any> {
let replyHash = genUUID();
let predicate = this.getPredicate(replyHash);
await this.sendServiceCall(moduleId, args, fname, replyHash, fname);
return await this.waitResponse(predicate);
async callProvider(provider: string, args: any, moduleId?: string, fname?: string, name?: string): Promise<any> {
let msgId = genUUID();
let predicate = this.getPredicate(msgId);
let address = createProviderAddress(provider);
await this.sendCall({target: address, args: args, moduleId: moduleId, fname: fname, msgId: msgId, name: name});
return await this.waitResponse(predicate, true);
}
/**
* Send a call to the local service and wait a response matches predicate on a peer the client connected with.
*
* @param moduleId
* @param addr node address
* @param args message to the service
* @param fname function name
* @param context
* @param name debug info
*/
async sendServiceLocalCallWaitResponse(moduleId: string, args: any, fname?: string): Promise<any> {
let replyHash = genUUID();
let predicate = this.getPredicate(replyHash);
await this.sendServiceLocalCall(moduleId, args, fname, replyHash, undefined);
return await this.waitResponse(predicate);
async callPeer(moduleId: string, args: any, fname?: string, addr?: string, context?: string[], name?: string): Promise<any> {
let msgId = genUUID();
let predicate = this.getPredicate(msgId);
let address;
if (addr) {
address = createPeerAddress(addr);
} else {
address = createPeerAddress(this.nodePeerIdStr);
}
await this.sendCall({target: address, args: args, moduleId: moduleId, fname: fname, msgId: msgId, context: context, name: name})
return await this.waitResponse(predicate, false);
}
async callService(peerId: string, serviceId: string, moduleId: string, args: any, fname?: string): Promise<any> {
let target = createPeerAddress(peerId, serviceId);
let msgId = genUUID();
let predicate = this.getPredicate(msgId);
await this.sendCall({target: target, args: args, moduleId: moduleId, fname: fname, msgId: msgId});
return await this.waitResponse(predicate, false);
}
/**
@ -246,12 +267,86 @@ export class FluenceClient {
/**
* Sends a call to register the service_id.
* Become a name provider. Other network members could find and call one of the providers of this name by this name.
*/
async registerService(moduleId: string, fn: (req: FunctionCall) => void) {
await this.connection.registerService(moduleId);
async provideName(name: string, fn: (req: FunctionCall) => void) {
let replyTo = this.connection.sender;
await this.callPeer("provide", {name: name, address: addressToString(replyTo)})
this.services.addService(moduleId, fn)
this.services.addService(name, fn);
}
/**
* Sends a call to create a service on remote node.
*/
async createService(peerId: string, context: string[]): Promise<string> {
let resp = await this.callPeer("create", {}, undefined, peerId, context);
if (resp.result && resp.result.service_id) {
return resp.result.service_id
} else {
console.error("Unknown response type on `createService`: ", resp)
throw new Error("Unknown response type on `createService`");
}
}
async getInterface(serviceId: string, peerId?: string): Promise<any> {
let resp;
resp = await this.callPeer("get_interface", {service_id: serviceId}, undefined, peerId)
return resp.interface;
}
async getActiveInterfaces(peerId?: string): Promise<any> {
let resp;
if (peerId) {
resp = await this.sendCallWaitResponse(createPeerAddress(peerId), {}, "get_active_interfaces");
} else {
resp = await this.callPeer("get_active_interfaces", {}, undefined, peerId);
}
return resp.active_interfaces;
}
async getAvailableModules(peerId?: string): Promise<string[]> {
let resp;
if (peerId) {
resp = await this.sendCallWaitResponse(createPeerAddress(peerId), {}, "get_available_modules");
} else {
resp = await this.callPeer("get_available_modules", {}, undefined, peerId);
}
return resp.available_modules;
}
/**
* Add new WASM module to the node.
*
* @param bytes WASM in base64
* @param name WASM identificator
* @param mem_pages_count memory amount for WASM
* @param envs environment variables
* @param mapped_dirs links to directories
* @param preopened_files files and directories that will be used in WASM
* @param peerId the node to add module
*/
async addModule(bytes: string, name: string, mem_pages_count: number, envs: string[], mapped_dirs: any, preopened_files: string[], peerId?: string): Promise<any> {
let config: any = {
logger_enabled: true,
mem_pages_count: mem_pages_count,
name: name,
wasi: {
envs: envs,
mapped_dirs: mapped_dirs,
preopened_files: preopened_files
}
}
let resp;
if (peerId) {
resp = await this.sendCallWaitResponse(createPeerAddress(peerId), {bytes: bytes, config: config}, "add_module");
} else {
resp = await this.callPeer("add_module", {bytes: bytes, config: config}, undefined, peerId);
}
return resp.available_modules;
}
// subscribe new hook for every incoming call, to handle in-service responses and other different cases
@ -287,6 +382,7 @@ export class FluenceClient {
multiaddr = Multiaddr(multiaddr);
let nodePeerId = multiaddr.getPeerId();
this.nodePeerIdStr = nodePeerId;
if (!nodePeerId) {
throw Error("'multiaddr' did not contain a valid peer id")
@ -309,7 +405,7 @@ export class FluenceClient {
// if the client already had a connection, it will reregister all services after establishing a new connection.
if (!firstConnection) {
for (let service of this.services.getAllServices().keys()) {
await this.connection.registerService(service);
await this.connection.provideName(service);
}
}

View File

@ -14,16 +14,13 @@
* limitations under the License.
*/
import {Address, createPeerAddress, createServiceAddress} from "./address";
import {Address, createPeerAddress} from "./address";
import {
callToString,
FunctionCall,
genUUID,
makeCall,
makeFunctionCall,
makePeerCall,
makeRegisterMessage,
makeRelayCall,
makeProvideMessage,
parseFunctionCall
} from "./function_call";
import * as PeerId from "peer-id";
@ -50,7 +47,7 @@ export class FluenceConnection {
readonly sender: Address;
private node: LibP2p;
private readonly address: Multiaddr;
private readonly nodePeerId: PeerId;
readonly nodePeerId: PeerId;
private readonly selfPeerId: string;
private readonly handleCall: (call: FunctionCall) => FunctionCall | undefined;
@ -63,10 +60,10 @@ export class FluenceConnection {
this.sender = sender
}
makeReplyTo(replyHash?: string): Address {
if (replyHash) {
makeReplyTo(reply?: string): Address {
if (reply) {
let replyToWithHash = {...this.sender}
replyToWithHash.hash = replyHash;
if (typeof reply === "string") replyToWithHash.hash = reply;
return replyToWithHash;
} else {
return this.sender;
@ -96,37 +93,6 @@ export class FluenceConnection {
// connection status. If `Disconnected`, it cannot be reconnected
private status: Status = Status.Initializing;
/**
* Sends remote service_id call.
*/
async sendServiceCall(moduleId: string, isLocal: boolean, args: any, fname?: string, replyHash?: string, name?: string) {
let target;
if (isLocal) {
target = createPeerAddress(this.nodePeerId.toB58String());
} else {
target = createServiceAddress(moduleId);
}
let regMsg = makeCall(moduleId, target, args, this.sender, this.makeReplyTo(replyHash), fname, name);
await this.sendCall(regMsg);
}
/**
* Sends custom message to the peer.
*/
async sendPeerCall(peer: string, msg: any, name?: string) {
let regMsg = makePeerCall(PeerId.createFromB58String(peer), msg, this.sender, this.sender, name);
await this.sendCall(regMsg);
}
/**
* Sends custom message to the peer through relay.
*/
async sendRelayCall(peer: string, relay: string, msg: any, name?: string) {
let regMsg = await makeRelayCall(PeerId.createFromB58String(peer), PeerId.createFromB58String(relay), msg, this.sender, this.sender, name);
await this.sendCall(regMsg);
}
private async startReceiving() {
if (this.status === Status.Initializing) {
await this.node.start();
@ -193,24 +159,23 @@ export class FluenceConnection {
);
}
/**
* Send FunctionCall to the connected node.
*/
async sendFunctionCall(target: Address, args: any, reply?: boolean, moduleId?: string, fname?: string, replyHash?: string, name?: string) {
async sendFunctionCall(target: Address, args: any, moduleId?: string, fname?: string, msgId?: string, context?: string[], name?: string) {
this.checkConnectedOrThrow();
let replyTo;
if (reply) replyTo = this.makeReplyTo(replyHash);
if (msgId) replyTo = this.makeReplyTo(msgId);
let call = makeFunctionCall(genUUID(), target, this.sender, args, moduleId, fname, replyTo, name);
let call = makeFunctionCall(genUUID(), target, this.sender, args, moduleId, fname, replyTo, context, name);
await this.sendCall(call);
}
async registerService(serviceId: string) {
async provideName(name: string) {
let target = createPeerAddress(this.nodePeerId.toB58String())
let regMsg = await makeRegisterMessage(serviceId, target, this.sender);
let regMsg = await makeProvideMessage(name, target, this.sender);
await this.sendCall(regMsg);
}
}

View File

@ -17,7 +17,6 @@
import {
createPeerAddress,
createRelayAddress,
createServiceAddress,
Address, addressToString, parseAddress
} from "./address";
import * as PeerId from "peer-id";
@ -30,6 +29,7 @@ export interface FunctionCall {
"module"?: string,
fname?: string,
arguments: any,
context?: string[],
name?: string,
action: "FunctionCall"
}
@ -47,7 +47,7 @@ export function callToString(call: FunctionCall) {
return JSON.stringify(obj)
}
export function makeFunctionCall(uuid: string, target: Address, sender: Address, args: object, moduleId?: string, fname?: string, replyTo?: Address, name?: string): FunctionCall {
export function makeFunctionCall(uuid: string, target: Address, sender: Address, args: object, moduleId?: string, fname?: string, replyTo?: Address, context?: string[], name?: string): FunctionCall {
return {
uuid: uuid,
@ -57,6 +57,7 @@ export function makeFunctionCall(uuid: string, target: Address, sender: Address,
"module": moduleId,
fname: fname,
arguments: args,
context: context,
name: name,
action: "FunctionCall"
}
@ -82,6 +83,7 @@ export function parseFunctionCall(str: string): FunctionCall {
reply_to: replyTo,
sender: sender,
arguments: json.arguments,
context: json.context,
"module": json.module,
fname: json.fname,
name: json.name,
@ -95,37 +97,10 @@ export function genUUID() {
}
/**
* Message to peer through relay
* Message to provide new name.
*/
export async function makeRelayCall(client: PeerId, relay: PeerId, msg: any, sender: Address, replyTo?: Address, name?: string): Promise<FunctionCall> {
let relayAddress = await createRelayAddress(relay.toB58String(), client, false);
return makeFunctionCall(genUUID(), relayAddress, sender, msg, undefined, undefined, replyTo, name);
}
/**
* Message to peer
*/
export function makePeerCall(client: PeerId, msg: any, sender: Address, replyTo?: Address, name?: string): FunctionCall {
let peerAddress = createPeerAddress(client.toB58String());
return makeFunctionCall(genUUID(), peerAddress, sender, msg, undefined, undefined, replyTo, name);
}
/**
* Message to call remote service
*/
export function makeCall(moduleId: string, target: Address, args: any, sender: Address, replyTo?: Address, fname?: string, name?: string): FunctionCall {
return makeFunctionCall(genUUID(), target, sender, args, moduleId, fname, replyTo, name);
}
/**
* Message to register new service_id.
*/
export async function makeRegisterMessage(serviceId: string, target: Address, sender: Address): Promise<FunctionCall> {
return makeFunctionCall(genUUID(), target, sender, {service_id: serviceId}, "provide", undefined, sender, "provide service_id");
export async function makeProvideMessage(name: string, target: Address, sender: Address): Promise<FunctionCall> {
return makeFunctionCall(genUUID(), target, sender, {name: name, address: addressToString(sender)}, "provide", undefined, sender, undefined, "provide service_id");
}
// TODO uncomment when this will be implemented in Fluence network

View File

@ -1,14 +1,14 @@
import {
createPeerAddress,
createRelayAddress,
createServiceAddress,
createProviderAddress,
addressToString,
parseAddress, Address
parseAddress
} from "../address";
import {expect} from 'chai';
import 'mocha';
import {decode, encode} from "bs58"
import {encode} from "bs58"
import * as PeerId from "peer-id";
import {callToString, genUUID, makeFunctionCall, parseFunctionCall} from "../function_call";
import Fluence from "../fluence";
@ -16,6 +16,7 @@ import {certificateFromString, certificateToString, issue} from "../trust/certif
import {TrustGraph} from "../trust/trust_graph";
import {nodeRootCert} from "../trust/misc";
import {peerIdToSeed, seedToPeerId} from "../seed";
import {greetingWASM} from "./greeting_wasm";
describe("Typescript usage suite", () => {
@ -24,7 +25,7 @@ describe("Typescript usage suite", () => {
});
it("should be able to convert service_id address to and from string", () => {
let addr = createServiceAddress("service_id-1");
let addr = createProviderAddress("service_id-1");
let str = addressToString(addr);
let parsed = parseAddress(str);
@ -69,6 +70,7 @@ describe("Typescript usage suite", () => {
"mm",
"fff",
addr,
undefined,
"2444"
);
@ -124,10 +126,27 @@ describe("Typescript usage suite", () => {
});
// delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
it.skip("integration test", async function () {
it.skip("test provide", async function () {
this.timeout(15000);
await testProvide();
});
// delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
it.skip("test certs", async function () {
this.timeout(15000);
await testCerts();
// await testCalculator();
});
// delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
it.skip("test upload wasm", async function () {
this.timeout(15000);
await testUploadWasm();
});
// delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
it.skip("test list of services and interfaces", async function () {
this.timeout(15000);
await testServicesAndInterfaces();
});
});
@ -166,8 +185,53 @@ export async function testCerts() {
expect(certs[0].chain[1].issuedAt).to.be.equal(extended.chain[1].issuedAt)
}
export async function testUploadWasm() {
let key1 = await Fluence.generatePeerId();
let cl1 = await Fluence.connect("/dns4/134.209.186.43/tcp/9100/ws/p2p/12D3KooWPnLxnY71JDxvB3zbjKu9k1BCYNthGZw6iGrLYsR1RnWM", key1);
let moduleName = genUUID()
await cl1.addModule(greetingWASM, moduleName, 100, [], {}, []);
let availableModules = await cl1.getAvailableModules();
console.log(availableModules);
let peerId1 = "12D3KooWPnLxnY71JDxvB3zbjKu9k1BCYNthGZw6iGrLYsR1RnWM"
let serviceId = await cl1.createService(peerId1, [moduleName]);
let argName = genUUID();
let resp = await cl1.callService(peerId1, serviceId, moduleName, {name: argName}, "greeting")
expect(resp.result).to.be.equal(`Hi, ${argName}`)
}
export async function testServicesAndInterfaces() {
let key1 = await Fluence.generatePeerId();
let key2 = await Fluence.generatePeerId();
// connect to two different nodes
let cl1 = await Fluence.connect("/dns4/134.209.186.43/tcp/9100/ws/p2p/12D3KooWPnLxnY71JDxvB3zbjKu9k1BCYNthGZw6iGrLYsR1RnWM", key1);
let cl2 = await Fluence.connect("/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er", key2);
let peerId1 = "12D3KooWPnLxnY71JDxvB3zbjKu9k1BCYNthGZw6iGrLYsR1RnWM"
let serviceId = await cl2.createService(peerId1, ["ipfs_node.wasm"]);
let resp = await cl2.callService(peerId1, serviceId, "ipfs_node.wasm", {}, "get_address")
console.log(resp)
let interfaces = await cl1.getActiveInterfaces();
let interfaceResp = await cl1.getInterface(serviceId, peerId1);
console.log(interfaces);
console.log(interfaceResp);
let availableModules = await cl1.getAvailableModules(peerId1);
console.log(availableModules);
}
// Shows how to register and call new service in Fluence network
export async function testCalculator() {
export async function testProvide() {
let key1 = await Fluence.generatePeerId();
let key2 = await Fluence.generatePeerId();
@ -177,10 +241,10 @@ export async function testCalculator() {
let cl2 = await Fluence.connect("/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er", key2);
// service name that we will register with one connection and call with another
let serviceId = "sum-calculator-" + genUUID();
let providerId = "sum-calculator-" + genUUID();
// register service that will add two numbers and send a response with calculation result
await cl1.registerService(serviceId, async (req) => {
await cl1.provideName(providerId, async (req) => {
console.log("message received");
console.log(req);
@ -188,27 +252,30 @@ export async function testCalculator() {
let message = {msgId: req.arguments.msgId, result: req.arguments.one + req.arguments.two};
await cl1.sendCall(req.reply_to, message);
await cl1.sendCall({target: req.reply_to, args: message});
});
let req = {one: 12, two: 23};
// send call to `sum-calculator` service with two numbers
let response = await cl2.sendServiceCallWaitResponse(serviceId, req);
let response = await cl2.callProvider(providerId, req, providerId);
let result = response.result;
console.log(`calculation result is: ${result}`);
expect(result).to.be.equal(35)
await cl1.connect("/dns4/relay02.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9");
await delay(1000);
// send call to `sum-calculator` service with two numbers
await cl2.sendServiceCall(serviceId, req, "calculator request");
await cl2.callProvider(providerId, req, providerId, undefined, "calculator request");
let response2 = await cl2.sendServiceCallWaitResponse(serviceId, req);
let response2 = await cl2.callProvider(providerId, req, providerId);
let result2 = await response2.result;
console.log(`calculation result AFTER RECONNECT is: ${result2}`);
console.log("RESULT:");
console.log(response2);
expect(result2).to.be.equal(35)
}

File diff suppressed because one or more lines are too long

View File

@ -16,7 +16,6 @@
import {FluenceClient} from "../fluence_client";
import {Certificate, certificateFromString, certificateToString} from "./certificate";
import {genUUID} from "../function_call";
// The client to interact with the Fluence trust graph API
export class TrustGraph {
@ -34,7 +33,7 @@ export class TrustGraph {
certsStr.push(await certificateToString(cert));
}
let response = await this.client.sendServiceLocalCallWaitResponse("add_certificates", {
let response = await this.client.callPeer("add_certificates", {
certificates: certsStr,
peer_id: peerId
});
@ -50,7 +49,7 @@ export class TrustGraph {
// Get certificates that stores in Kademlia neighbourhood by `peerId` key.
async getCertificates(peerId: string): Promise<Certificate[]> {
let resp = await this.client.sendServiceLocalCallWaitResponse("certificates", {
let resp = await this.client.callPeer("certificates", {
peer_id: peerId
});