Protocol updates (#914)

This commit is contained in:
Dima 2020-06-26 16:12:37 +03:00 committed by GitHub
parent 8a9ad8e9de
commit afacef5138
8 changed files with 89 additions and 44 deletions

View File

@ -1,6 +1,6 @@
{
"name": "fluence",
"version": "0.5.6",
"version": "0.6.1",
"description": "the browser js-libp2p client for the Fluence network",
"main": "./dist/fluence.js",
"typings": "./dist/fluence.d.ts",

View File

@ -27,7 +27,7 @@ export interface Protocol {
}
export enum ProtocolType {
Service = "service",
Providers = "providers",
Peer = "peer",
Signature = "signature",
Client = "client"
@ -64,7 +64,7 @@ export function parseProtocol(protocol: string, protocolIterator: IterableIterat
protocol = protocol.toLocaleLowerCase();
switch (protocol) {
case ProtocolType.Service:
case ProtocolType.Providers:
return protocolWithValue(protocol, protocolIterator);
case ProtocolType.Client:
return protocolWithValue(protocol, protocolIterator);
@ -100,7 +100,7 @@ export async function createRelayAddress(relay: string, peerId: PeerId, withSig:
export function createServiceAddress(service: string): Address {
let protocol = {protocol: ProtocolType.Service, value: service};
let protocol = {protocol: ProtocolType.Providers, value: service};
return {
protocols: [protocol]

View File

@ -101,7 +101,22 @@ export class FluenceClient {
*/
async sendServiceCall(serviceId: string, args: any, name?: string) {
if (this.connection && this.connection.isConnected()) {
await this.connection.sendServiceCall(serviceId, args, name);
await this.connection.sendServiceCall(serviceId, false, args, name);
} else {
throw Error("client is not connected")
}
}
/**
* Send a call to the local service on a peer the client connected with.
*
* @param serviceId
* @param args message to the service
* @param name common field for debug purposes
*/
async sendServiceLocalCall(serviceId: string, args: any, name?: string) {
if (this.connection && this.connection.isConnected()) {
await this.connection.sendServiceCall(serviceId, true, args, name);
} else {
throw Error("client is not connected")
}
@ -119,6 +134,18 @@ export class FluenceClient {
return await this.waitResponse(predicate);
}
/**
* Send a call to the local service and wait a response matches predicate on a peer the client connected with.
*
* @param serviceId
* @param args message to the service
* @param predicate will be applied to each incoming call until it matches
*/
async sendServiceLocalCallWaitResponse(serviceId: string, args: any, predicate: (args: any, target: Address, replyTo: Address) => (boolean | undefined)): Promise<any> {
await this.sendServiceLocalCall(serviceId, args);
return await this.waitResponse(predicate);
}
/**
* Handle incoming call.
* If FunctionCall returns - we should send it as a response.
@ -147,31 +174,35 @@ export class FluenceClient {
_this.subscriptions.applyToSubscriptions(call);
switch (lastProtocol.protocol) {
case ProtocolType.Service:
try {
// call of the service, service should handle response sending, error handling, requests to other services
let applied = _this.services.applyToService(lastProtocol.value, call);
case ProtocolType.Providers:
// if the request hasn't been applied, there is no such service. Return an error.
if (!applied) {
console.log(`there is no service ${lastProtocol.value}`);
return this.responseCall(call.reply_to, {
reason: `there is no such service`,
msg: call
});
}
} catch (e) {
// if service throw an error, return it to the sender
return this.responseCall(call.reply_to, {
reason: `error on execution: ${e}`,
msg: call
});
}
return undefined;
case ProtocolType.Client:
if (lastProtocol.value === _this.selfPeerIdStr) {
console.log(`relay call: ${call}`);
console.log(`relay call:`);
console.log(JSON.stringify(call, undefined, 2));
if (call.module) {
try {
// call of the service, service should handle response sending, error handling, requests to other services
let applied = _this.services.applyToService(call);
// if the request hasn't been applied, there is no such service. Return an error.
if (!applied) {
console.log(`there is no service ${lastProtocol.value}`);
return this.responseCall(call.reply_to, {
reason: `there is no such service`,
msg: call
});
}
} catch (e) {
// if service throw an error, return it to the sender
return this.responseCall(call.reply_to, {
reason: `error on execution: ${e}`,
msg: call
});
}
}
} else {
console.warn(`this relay call is not for me: ${callToString(call)}`);
return this.responseCall(call.reply_to, {

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
import {Address} from "./address";
import {Address, createPeerAddress, createServiceAddress} from "./address";
import {
callToString,
FunctionCall,
@ -89,8 +89,15 @@ export class FluenceConnection {
/**
* Sends remote service_id call.
*/
async sendServiceCall(serviceId: string, args: any, name?: string) {
let regMsg = makeCall(serviceId, args, this.sender, this.sender, name);
async sendServiceCall(serviceId: string, isLocal: boolean, args: any, name?: string) {
let target;
if (isLocal) {
target = createPeerAddress(this.nodePeerId.toB58String());
} else {
target = createServiceAddress(serviceId);
}
let regMsg = makeCall(serviceId, target, args, this.sender, this.sender, name);
await this.sendCall(regMsg);
}
@ -186,13 +193,14 @@ export class FluenceConnection {
let replyTo;
if (reply) replyTo = this.sender;
let call = makeFunctionCall(genUUID(), target, this.sender, args, replyTo, name);
let call = makeFunctionCall(genUUID(), target, this.sender, args, undefined, undefined, replyTo, name);
await this.sendCall(call);
}
async registerService(serviceId: string) {
let regMsg = await makeRegisterMessage(serviceId, this.sender);
let target = createPeerAddress(this.nodePeerId.toB58String())
let regMsg = await makeRegisterMessage(serviceId, target, this.sender);
await this.sendCall(regMsg);
}
}

View File

@ -27,6 +27,8 @@ export interface FunctionCall {
target: Address,
reply_to?: Address,
sender: Address,
"module"?: string,
fname?: string,
arguments: any,
name?: string,
action: "FunctionCall"
@ -45,13 +47,15 @@ export function callToString(call: FunctionCall) {
return JSON.stringify(obj)
}
export function makeFunctionCall(uuid: string, target: Address, sender: Address, args: object, replyTo?: Address, name?: string): FunctionCall {
export function makeFunctionCall(uuid: string, target: Address, sender: Address, args: object, moduleF?: string, fname?: string, replyTo?: Address, name?: string): FunctionCall {
return {
uuid: uuid,
target: target,
reply_to: replyTo,
sender: sender,
"module": moduleF,
fname: fname,
arguments: args,
name: name,
action: "FunctionCall"
@ -78,6 +82,8 @@ export function parseFunctionCall(str: string): FunctionCall {
reply_to: replyTo,
sender: sender,
arguments: json.arguments,
"module": json.module,
fname: json.fname,
name: json.name,
action: "FunctionCall"
}
@ -94,7 +100,7 @@ export function genUUID() {
export async function makeRelayCall(client: PeerId, relay: PeerId, msg: any, sender: Address, replyTo?: Address, name?: string): Promise<FunctionCall> {
let relayAddress = await createRelayAddress(relay.toB58String(), client, false);
return makeFunctionCall(genUUID(), relayAddress, sender, msg, replyTo, name);
return makeFunctionCall(genUUID(), relayAddress, sender, msg, undefined, undefined, replyTo, name);
}
/**
@ -103,25 +109,23 @@ export async function makeRelayCall(client: PeerId, relay: PeerId, msg: any, sen
export function makePeerCall(client: PeerId, msg: any, sender: Address, replyTo?: Address, name?: string): FunctionCall {
let peerAddress = createPeerAddress(client.toB58String());
return makeFunctionCall(genUUID(), peerAddress, sender, msg, replyTo, name);
return makeFunctionCall(genUUID(), peerAddress, sender, msg, undefined, undefined, replyTo, name);
}
/**
* Message to call remote service_id
*/
export function makeCall(functionId: string, args: any, sender: Address, replyTo?: Address, name?: string): FunctionCall {
let target = createServiceAddress(functionId);
export function makeCall(functionId: string, target: Address, args: any, sender: Address, replyTo?: Address, name?: string): FunctionCall {
return makeFunctionCall(genUUID(), target, sender, args, replyTo, name);
return makeFunctionCall(genUUID(), target, sender, args, functionId, undefined, replyTo, name);
}
/**
* Message to register new service_id.
*/
export async function makeRegisterMessage(serviceId: string, sender: Address): Promise<FunctionCall> {
let target = createServiceAddress("provide");
return makeFunctionCall(genUUID(), target, sender, {service_id: serviceId}, sender, "provide service_id");
export async function makeRegisterMessage(serviceId: string, target: Address, sender: Address): Promise<FunctionCall> {
return makeFunctionCall(genUUID(), target, sender, {service_id: serviceId}, "provide", undefined, sender, "provide service_id");
}
// TODO uncomment when this will be implemented in Fluence network

View File

@ -36,8 +36,8 @@ export class Services {
// could throw error from service callback
// returns true if the call was applied
applyToService(serviceId: string, call: FunctionCall): boolean {
let service = this.services.get(serviceId);
applyToService(call: FunctionCall): boolean {
let service = this.services.get(call.module);
if (service) {
service(call);
return true;

View File

@ -66,6 +66,8 @@ describe("Typescript usage suite", () => {
arg2: 3,
arg4: [1, 2, 3]
},
"mm",
"fff",
addr,
"2444"
);

View File

@ -36,7 +36,7 @@ export class TrustGraph {
let msgId = genUUID()
let response = await this.client.sendServiceCallWaitResponse("add_certificates", {
let response = await this.client.sendServiceLocalCallWaitResponse("add_certificates", {
certificates: certsStr,
msg_id: msgId,
peer_id: peerId
@ -64,7 +64,7 @@ export class TrustGraph {
// Get certificates that stores in Kademlia neighbourhood by `peerId` key.
async getCertificates(peerId: string): Promise<Certificate[]> {
let msgId = genUUID();
let resp = await this.client.sendServiceCallWaitResponse("certificates", {
let resp = await this.client.sendServiceLocalCallWaitResponse("certificates", {
msg_id: msgId,
peer_id: peerId
}, (args) => args.msg_id && args.msg_id === msgId)