Big refactoring (#8)

Big codebase refactoring. 

* Multiple clients are allowed on the same browser instance
* Particle queue processing is split from particle handling logic
* Public AIP is completely rethought
* Updated project file structure. Clean exports for public api methods
* Additional unit tests
This commit is contained in:
Pavel 2021-01-19 15:47:49 +03:00 committed by GitHub
parent c7f94109a6
commit ba537c79b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1397 additions and 1332 deletions

3
.gitignore vendored
View File

@ -12,4 +12,5 @@ bundle/
# Dependency directories
node_modules/
jspm_packages/
jspm_packages/
/dist/

View File

@ -2,15 +2,12 @@
"name": "@fluencelabs/fluence",
"version": "0.8.0",
"description": "JS SDK for the Fluence network",
"main": "./dist/fluence.js",
"typings": "./dist/fluence.d.ts",
"main": "./dist/index.js",
"typings": "./dist/index.d.ts",
"scripts": {
"test": "mocha --timeout 10000 -r esm -r ts-node/register src/**/*.spec.ts",
"test-ts": "ts-mocha --timeout 10000 -r esm -p tsconfig.json src/**/*.spec.ts",
"package:build": "NODE_ENV=production webpack && npm run package",
"package": "tsc && rsync -r src/aqua/*.js dist/aqua",
"start": "webpack-dev-server -p",
"build": "webpack --mode production"
"build": "tsc && rsync -r src/internal/aqua/*.js dist/internal/aqua",
"build:webpack": "webpack --mode production"
},
"repository": "https://github.com/fluencelabs/fluence-js",
"author": "Fluence Labs",
@ -48,7 +45,7 @@
"text-encoding": "^0.7.0",
"ts-loader": "7.0.5",
"ts-mocha": "8.0.0",
"typescript": "3.9.5",
"typescript": "^3.9.5",
"webpack": "4.43.0",
"webpack-cli": "3.3.11",
"webpack-dev-server": "3.11.0"

252
src/FluenceClient.ts Normal file
View File

@ -0,0 +1,252 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import log from 'loglevel';
import PeerId from 'peer-id';
import { SecurityTetraplet, StepperOutcome } from './internal/commonTypes';
import { FluenceClientBase } from './internal/FluenceClientBase';
import { build, genUUID, ParticleDto } from './internal/particle';
import { ParticleProcessor } from './internal/ParticleProcessor';
import { ParticleProcessorStrategy } from './internal/ParticleProcessorStrategy';
const fetchCallbackServiceName = '__callback';
const selfRelayVarName = '__relay';
const wrapRelayBasedCall = (script: string) => {
return `
(seq
(call ${selfRelayVarName} ("op" "identity") [])
${script}
)
`;
};
const wrapFetchCall = (script: string, particleId: string, resultArgNames: string[]) => {
// TODO: sanitize
const resultTogether = resultArgNames.join(' ');
let res = `
(seq
${script}
(seq
(call ${selfRelayVarName} ("op" "identity") [])
(call %init_peer_id% ("${fetchCallbackServiceName}" "${particleId}") [${resultTogether}])
)
)`;
return wrapRelayBasedCall(res);
};
export interface FluenceEvent {
type: string;
args: any[];
}
export type FluenceEventHandler = (event: FluenceEvent) => void;
export class FluenceClient extends FluenceClientBase {
private eventSubscribers: Map<string, FluenceEventHandler[]> = new Map();
private eventValidators: Map<string, Function> = new Map();
private callbacks: Map<string, Function> = new Map();
private fetchParticles: Map<string, { resolve: Function; reject: Function }> = new Map();
constructor(selfPeerId: PeerId) {
super(selfPeerId);
this.processor = new ParticleProcessor(this.strategy, selfPeerId);
}
async fetch<T>(script: string, resultArgNames: string[], data?: Map<string, any>, ttl?: number): Promise<T> {
data = this.addRelayToArgs(data);
const callBackId = genUUID();
script = wrapFetchCall(script, callBackId, resultArgNames);
const particle = await build(this.selfPeerIdFull, script, data, ttl, callBackId);
return new Promise<T>((resolve, reject) => {
this.fetchParticles.set(callBackId, { resolve, reject });
this.processor.executeLocalParticle(particle);
});
}
// TODO:: better naming probably?
async fireAndForget(script: string, data?: Map<string, any>, ttl?: number) {
data = this.addRelayToArgs(data);
script = wrapRelayBasedCall(script);
await this.sendScript(script, data, ttl);
}
registerEvent(
channel: string,
eventName: string,
validate?: (channel: string, eventName: string, args: any[], tetraplets: any[][]) => boolean,
) {
if (!validate) {
validate = (c, e, a, t) => true;
}
this.eventValidators.set(`${channel}/${eventName}`, validate);
}
unregisterEvent(channel: string, eventName: string) {
this.eventValidators.delete(`${channel}/${eventName}`);
}
registerCallback(
serviceId: string,
fnName: string,
callback: (args: any[], tetraplets: SecurityTetraplet[][]) => object,
) {
this.callbacks.set(`${serviceId}/${fnName}`, callback);
}
unregisterCallback(channel: string, eventName: string) {
this.eventValidators.delete(`${channel}/${eventName}`);
}
subscribe(channel: string, handler: FluenceEventHandler) {
if (!this.eventSubscribers.get(channel)) {
this.eventSubscribers.set(channel, []);
}
this.eventSubscribers.get(channel).push(handler);
}
protected strategy: ParticleProcessorStrategy = {
particleHandler: (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => {
// missing built-in op
if (serviceId === 'op' && fnName === 'identity') {
return {
ret_code: 0,
result: JSON.stringify(args),
};
}
// async fetch model handling
if (serviceId === fetchCallbackServiceName) {
const executingParticlePromiseFns = this.fetchParticles.get(fnName);
if (executingParticlePromiseFns) {
// don't block
setImmediate(() => {
this.fetchParticles.delete(fnName);
executingParticlePromiseFns.resolve(args);
});
}
return {
ret_code: 0,
result: JSON.stringify({}),
};
}
// event model handling
const eventPair = `${serviceId}/${fnName}`;
const eventValidator = this.eventValidators.get(eventPair);
if (eventValidator) {
try {
if (!eventValidator(serviceId, fnName, args, tetraplets)) {
return {
ret_code: 1, // TODO:: error codes
result: 'validation failed',
};
}
} catch (e) {
log.error('error running validation function: ' + e);
return {
ret_code: 1, // TODO:: error codes
result: 'validation failed',
};
}
// don't block
setImmediate(() => {
this.pushEvent(serviceId, {
type: fnName,
args: args,
});
});
return {
ret_code: 0,
result: JSON.stringify({}),
};
}
// callback model handling
const callback = this.callbacks.get(eventPair);
if (callback) {
try {
const res = callback(args, tetraplets);
return {
ret_code: 0,
result: JSON.stringify(res),
};
} catch (e) {
return {
ret_code: 1, // TODO:: error codes
result: JSON.stringify(e),
};
}
}
return {
ret_code: 1,
result: `Error. There is no service: ${serviceId}`,
};
},
sendParticleFurther: async (particle: ParticleDto) => {
try {
await this.connection.sendParticle(particle);
} catch (reason) {
log.error(`Error on sending particle with id ${particle.id}: ${reason}`);
}
},
onParticleTimeout: (particle: ParticleDto, now: number) => {
log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`);
const executingParticle = this.fetchParticles.get(particle.id);
if (executingParticle) {
executingParticle.reject(new Error(`particle ${particle.id} timed out`));
}
},
onLocalParticleRecieved: (particle: ParticleDto) => {},
onExternalParticleRecieved: (particle: ParticleDto) => {},
onStepperExecuting: (particle: ParticleDto) => {},
onStepperExecuted: (stepperOutcome: StepperOutcome) => {
log.info('inner interpreter outcome:');
log.info(stepperOutcome);
},
};
private pushEvent(channel: string, event: FluenceEvent) {
const subs = this.eventSubscribers.get(channel);
if (subs) {
for (let sub of subs) {
sub(event);
}
}
}
private addRelayToArgs(data: Map<string, any>) {
if (data === undefined) {
data = new Map();
}
if (!data.has(selfRelayVarName)) {
data.set(selfRelayVarName, this.relayPeerId);
}
return data;
}
}

158
src/__test__/air.spec.ts Normal file
View File

@ -0,0 +1,158 @@
import 'mocha';
import { expect } from 'chai';
import { createLocalClient } from './util';
describe('== AIR suite', () => {
it('check init_peer_id', async function () {
// arrange
const serviceId = 'test_service';
const fnName = 'return_first_arg';
const client = await createLocalClient();
let res;
client.registerCallback(serviceId, fnName, (args, _) => {
res = args[0];
return res;
});
// act
const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [%init_peer_id%])`;
await client.sendScript(script);
// assert
expect(res).to.be.equal(client.selfPeerId);
});
it('call local function', async function () {
// arrange
const serviceId = 'test_service';
const fnName = 'return_first_arg';
const client = await createLocalClient();
let res;
client.registerCallback(serviceId, fnName, (args, _) => {
res = args[0];
return res;
});
// act
const arg = 'hello';
const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") ["${arg}"])`;
await client.sendScript(script);
// assert
expect(res).to.be.equal(arg);
});
it('check particle arguments', async function () {
// arrange
const serviceId = 'test_service';
const fnName = 'return_first_arg';
const client = await createLocalClient();
let res;
client.registerCallback(serviceId, fnName, (args, _) => {
res = args[0];
return res;
});
// act
const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [arg1])`;
const data = new Map();
data.set('arg1', 'hello');
await client.sendScript(script, data);
// assert
expect(res).to.be.equal('hello');
});
it('check security tetraplet', async function () {
// arrange
const makeDataServiceId = 'make_data_service';
const makeDataFnName = 'make_data';
const getDataServiceId = 'get_data_service';
const getDataFnName = 'get_data';
const client = await createLocalClient();
client.registerCallback(makeDataServiceId, makeDataFnName, (args, _) => {
return {
field: 42,
};
});
let res;
client.registerCallback(getDataServiceId, getDataFnName, (args, tetraplets) => {
res = {
args: args,
tetraplets: tetraplets,
};
return args[0];
});
// act
const script = `
(seq
(call %init_peer_id% ("${makeDataServiceId}" "${makeDataFnName}") [] result)
(call %init_peer_id% ("${getDataServiceId}" "${getDataFnName}") [result.$.field])
)`;
await client.sendScript(script);
// assert
const tetraplet = res.tetraplets[0][0];
expect(tetraplet).to.contain({
service_id: 'make_data_service',
function_name: 'make_data',
json_path: '$.field',
});
});
it('check chain of services work properly', async function () {
this.timeout(5000);
// arrange
const client = await createLocalClient();
const serviceId1 = 'check1';
const fnName1 = 'fn1';
let res1;
client.registerCallback(serviceId1, fnName1, (args, _) => {
res1 = args[0];
return res1;
});
const serviceId2 = 'check2';
const fnName2 = 'fn2';
let res2;
client.registerCallback(serviceId2, fnName2, (args, _) => {
res2 = args[0];
return res2;
});
const serviceId3 = 'check3';
const fnName3 = 'fn3';
let res3;
client.registerCallback(serviceId3, fnName3, (args, _) => {
res3 = args;
return res3;
});
const arg1 = 'arg1';
const arg2 = 'arg2';
// act
const script = `(seq
(seq
(call %init_peer_id% ("${serviceId1}" "${fnName1}") ["${arg1}"] result1)
(call %init_peer_id% ("${serviceId2}" "${fnName2}") ["${arg2}"] result2))
(call %init_peer_id% ("${serviceId3}" "${fnName3}") [result1 result2]))
`;
await client.sendScript(script);
// assert
expect(res1).to.be.equal(arg1);
expect(res2).to.be.equal(arg2);
expect(res3).to.be.deep.equal([res1, res2]);
});
});

View File

@ -1,10 +1,10 @@
import { expect } from 'chai';
import 'mocha';
import Fluence from '../fluence';
import { parseAIR } from '../internal/stepper';
describe('== AST parsing suite', () => {
it('parse simple script and return ast', async function () {
let ast = await Fluence.parseAIR(`
let ast = await parseAIR(`
(call node ("service" "function") [1 2 3 arg] output)
`);

358
src/__test__/client.spec.ts Normal file
View File

@ -0,0 +1,358 @@
import { expect } from 'chai';
import 'mocha';
import { encode } from 'bs58';
import { certificateFromString, certificateToString, issue } from '../internal/trust/certificate';
import { TrustGraph } from '../internal/trust/trust_graph';
import { nodeRootCert } from '../internal/trust/misc';
import { generatePeerId, peerIdToSeed, seedToPeerId } from '../internal/peerIdUtils';
import { FluenceClient } from '../FluenceClient';
import { createConnectedClient, createLocalClient } from './util';
import log from 'loglevel';
import { createClient } from '../api';
import Multiaddr from 'multiaddr';
describe('Typescript usage suite', () => {
it('should create private key from seed and back', async function () {
// prettier-ignore
let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201];
let seedStr = encode(seed);
log.trace('SEED STR: ' + seedStr);
let pid = await seedToPeerId(seedStr);
expect(peerIdToSeed(pid)).to.be.equal(seedStr);
});
it('should serialize and deserialize certificate correctly', async function () {
let cert = `11
1111
5566Dn4ZXXbBK5LJdUsE7L3pG9qdAzdPY47adjzkhEx9
3HNXpW2cLdqXzf4jz5EhsGEBFkWzuVdBCyxzJUZu2WPVU7kpzPjatcqvdJMjTtcycVAdaV5qh2fCGphSmw8UMBkr
158981172690500
1589974723504
2EvoZAZaGjKWFVdr36F1jphQ5cW7eK3yM16mqEHwQyr7
4UAJQWzB3nTchBtwARHAhsn7wjdYtqUHojps9xV6JkuLENV8KRiWM3BhQByx5KijumkaNjr7MhHjouLawmiN1A4d
1590061123504
1589974723504`;
let deser = await certificateFromString(cert);
let ser = certificateToString(deser);
expect(ser).to.be.equal(cert);
});
// delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
it.skip('should perform tests on certs', async function () {
this.timeout(15000);
await testCerts();
});
describe.skip('should make connection to network', async function () {
this.timeout(30000);
const testProcedure = async (client: FluenceClient) => {
let resMakingPromise = new Promise((resolve) => {
client.registerCallback('test', 'test', (args, _) => {
resolve(args);
return {};
});
});
let script = `
(seq
(call "${client.relayPeerId}" ("op" "identity") [])
(call "${client.selfPeerId}" ("test" "test") [hello])
)
`;
let data: Map<string, any> = new Map();
data.set('hello', 'world');
await client.sendScript(script, data);
const res = await resMakingPromise;
return res;
};
it('address as string', async function () {
// arrange
const addr =
'/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9';
// act
const client = await createClient(addr);
// assert
const res = await testProcedure(client);
expect(res).to.deep.equal(['world']);
});
it('address as multiaddr', async function () {
// arrange
const addr = new Multiaddr(
'/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
);
// act
const client = await createClient(addr);
// assert
const res = await testProcedure(client);
expect(res).to.deep.equal(['world']);
});
it('address as node', async function () {
// arrange
const addr = {
multiaddr:
'/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
peerId: '12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
};
// act
const client = await createClient(addr);
// assert
const res = await testProcedure(client);
expect(res).to.deep.equal(['world']);
});
it('peerid as peer id', async function () {
// arrange
const addr =
'/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9';
const pid = await generatePeerId();
// act
const client = await createClient(addr, pid);
// assert
const res = await testProcedure(client);
expect(res).to.deep.equal(['world']);
});
it('peerid as see', async function () {
// arrange
const addr =
'/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9';
const pid = peerIdToSeed(await generatePeerId());
// act
const client = await createClient(addr, pid);
// assert
const res = await testProcedure(client);
expect(res).to.deep.equal(['world']);
});
});
it.skip('should make a call through the network', async function () {
this.timeout(30000);
// arrange
const client = await createConnectedClient(
'/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
);
client.registerCallback('test', 'test', (args, _) => {
log.trace('should make a call through the network, called "test" "test" with args', args);
return {};
});
let resMakingPromise = new Promise((resolve) => {
client.registerCallback('test', 'reverse_args', (args, _) => {
resolve([...args].reverse());
return {};
});
});
// act
let script = `
(seq
(call "${client.relayPeerId}" ("op" "identity") [])
(seq
(call "${client.selfPeerId}" ("test" "test") [a b c d] result)
(call "${client.selfPeerId}" ("test" "reverse_args") [a b c d])
)
)
`;
let data: Map<string, any> = new Map();
data.set('a', 'some a');
data.set('b', 'some b');
data.set('c', 'some c');
data.set('d', 'some d');
await client.sendScript(script, data);
// assert
const res = await resMakingPromise;
expect(res).to.deep.equal(['some d', 'some c', 'some b', 'some a']);
});
it.skip('fireAndForget should work', async function () {
this.timeout(30000);
// arrange
const client = await createConnectedClient(
'/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
);
let resMakingPromise = new Promise((resolve) => {
client.registerCallback('test', 'reverse_args', (args, _) => {
resolve([...args].reverse());
return {};
});
});
// act
let script = `
(call "${client.selfPeerId}" ("test" "reverse_args") [a b c d])
`;
let data: Map<string, any> = new Map();
data.set('a', 'some a');
data.set('b', 'some b');
data.set('c', 'some c');
data.set('d', 'some d');
await client.fireAndForget(script, data);
// assert
const res = await resMakingPromise;
expect(res).to.deep.equal(['some d', 'some c', 'some b', 'some a']);
});
it.skip('fetch should work', async function () {
this.timeout(30000);
// arrange
const client = await createConnectedClient(
'/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
);
// act
let script = `
(call "${client.relayPeerId}" ("op" "identify") [] result)
`;
const data = new Map();
data.set('__relay', client.relayPeerId);
const [res] = await client.fetch(script, ['result'], data);
// assert
expect(res.external_addresses).to.be.not.undefined;
});
it.skip('two clients should work inside the same time browser', async function () {
// arrange
const pid1 = await generatePeerId();
const client1 = new FluenceClient(pid1);
await client1.connect(
'/dns4/dev.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
);
const pid2 = await generatePeerId();
const client2 = new FluenceClient(pid2);
await client2.connect(
'/dns4/dev.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
);
let resMakingPromise = new Promise((resolve) => {
client2.registerCallback('test', 'test', (args, _) => {
resolve([...args]);
return {};
});
});
let script = `
(seq
(call "${client1.relayPeerId}" ("op" "identity") [])
(call "${pid2.toB58String()}" ("test" "test") [a b c d])
)
`;
let data: Map<string, any> = new Map();
data.set('a', 'some a');
data.set('b', 'some b');
data.set('c', 'some c');
data.set('d', 'some d');
await client1.sendScript(script, data);
let res = await resMakingPromise;
expect(res).to.deep.equal(['some a', 'some b', 'some c', 'some d']);
});
it.skip('event registration should work', async function () {
// arrange
const pid1 = await generatePeerId();
const client1 = new FluenceClient(pid1);
await client1.connect(
'/dns4/dev.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
);
const pid2 = await generatePeerId();
const client2 = new FluenceClient(pid2);
await client2.connect(
'/dns4/dev.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
);
client2.registerEvent('event_stream', 'test');
const resMakingPromise = new Promise((resolve) => {
client2.subscribe('event_stream', resolve);
});
// act
let script = `
(call "${pid2.toB58String()}" ("event_stream" "test") [hello])
`;
let data: Map<string, any> = new Map();
data.set('hello', 'world');
await client1.fireAndForget(script, data);
// assert
let res = await resMakingPromise;
expect(res).to.deep.equal({
type: 'test',
args: ['world'],
});
});
});
export async function testCerts() {
const key1 = await generatePeerId();
const key2 = await generatePeerId();
// connect to two different nodes
const cl1 = new FluenceClient(key1);
const cl2 = new FluenceClient(key2);
await cl1.connect('/dns4/134.209.186.43/tcp/9003/ws/p2p/12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb');
await cl2.connect('/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er');
let trustGraph1 = new TrustGraph(/* cl1 */);
let trustGraph2 = new TrustGraph(/* cl2 */);
let issuedAt = new Date();
let expiresAt = new Date();
// certificate expires after one day
expiresAt.setDate(new Date().getDate() + 1);
// create root certificate for key1 and extend it with key2
let rootCert = await nodeRootCert(key1);
let extended = await issue(key1, key2, rootCert, expiresAt.getTime(), issuedAt.getTime());
// publish certificates to Fluence network
await trustGraph1.publishCertificates(key2.toB58String(), [extended]);
// get certificates from network
let certs = await trustGraph2.getCertificates(key2.toB58String());
// root certificate could be different because nodes save trusts with bigger `expiresAt` date and less `issuedAt` date
expect(certs[0].chain[1].issuedFor.toB58String()).to.be.equal(extended.chain[1].issuedFor.toB58String());
expect(certs[0].chain[1].signature).to.be.equal(extended.chain[1].signature);
expect(certs[0].chain[1].expiresAt).to.be.equal(extended.chain[1].expiresAt);
expect(certs[0].chain[1].issuedAt).to.be.equal(extended.chain[1].issuedAt);
await cl1.disconnect();
await cl2.disconnect();
}

View File

@ -2,4 +2,4 @@
--require @babel/register
src/test/**/*.spec.ts
src/__test__/**/*.spec.ts

15
src/__test__/util.ts Normal file
View File

@ -0,0 +1,15 @@
import { FluenceClient, generatePeerId } from '..';
export const createLocalClient = async () => {
const peerId = await generatePeerId();
const client = new FluenceClient(peerId);
await client.local();
return client;
};
export const createConnectedClient = async (node: string) => {
const peerId = await generatePeerId();
const client = new FluenceClient(peerId);
await client.connect(node);
return client;
};

111
src/api.ts Normal file
View File

@ -0,0 +1,111 @@
import { FluenceClient } from './FluenceClient';
import { SecurityTetraplet } from './internal/commonTypes';
import { genUUID, Particle } from './internal/particle';
import Multiaddr from 'multiaddr';
import PeerId, { isPeerId } from 'peer-id';
import { generatePeerId, seedToPeerId } from './internal/peerIdUtils';
type Node = {
peerId: string;
multiaddr: string;
};
export const createClient = async (
connectTo?: string | Multiaddr | Node,
peerIdOrSeed?: PeerId | string,
): Promise<FluenceClient> => {
let peerId;
if (!peerIdOrSeed) {
peerId = await generatePeerId();
} else if (isPeerId(peerIdOrSeed)) {
// keep unchanged
peerId = peerIdOrSeed;
} else {
// peerId is string, therefore seed
peerId = await seedToPeerId(peerIdOrSeed);
}
const client = new FluenceClient(peerId);
if (connectTo) {
let theAddress: Multiaddr;
let fromNode = (connectTo as any).multiaddr;
if (fromNode) {
theAddress = new Multiaddr(fromNode);
} else {
theAddress = new Multiaddr(connectTo as string);
}
await client.connect(theAddress);
}
return client;
};
export const sendParticle = async (client: FluenceClient, particle: Particle): Promise<string> => {
return await client.sendScript(particle.script, particle.data, particle.ttl);
};
export const registerServiceFunction = (
client: FluenceClient,
serviceId: string,
fnName: string,
handler: (args: any[], tetraplets: SecurityTetraplet[][]) => object,
) => {
client.registerCallback(serviceId, fnName, handler);
};
// prettier-ignore
export const unregisterServiceFunction = (
client: FluenceClient,
serviceId: string,
fnName: string
) => {
client.unregisterCallback(serviceId, fnName);
};
export const subscribeToEvent = (
client: FluenceClient,
serviceId: string,
fnName: string,
handler: (args: any[], tetraplets: SecurityTetraplet[][]) => void,
): Function => {
const realHandler = (args: any[], tetraplets: SecurityTetraplet[][]) => {
// dont' block
setImmediate(() => {
handler(args, tetraplets);
});
return {};
};
registerServiceFunction(client, serviceId, fnName, realHandler);
return () => {
unregisterServiceFunction(client, serviceId, fnName);
};
};
export const sendParticleAsFetch = async <T>(
client: FluenceClient,
particle: Particle,
callbackFnName: string,
callbackServiceId: string = '_callback',
): Promise<T> => {
const serviceId = callbackServiceId;
const fnName = callbackFnName;
let promise: Promise<T> = new Promise(function (resolve, reject) {
const unsub = subscribeToEvent(client, serviceId, fnName, (args: any[], _) => {
unsub();
resolve(args as any);
});
setTimeout(() => {
unsub();
reject(new Error(`callback for ${callbackServiceId}/${callbackFnName} timed out after ${particle.ttl}`));
}, particle.ttl);
});
sendParticle(client, particle);
return promise;
};

View File

@ -1,123 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Experimental attempts to generate aqua code through typescript functions.
*/
export interface Value<T> {
name: string,
value: T
}
export function value<T>(name: string, v: T): Value<T> {
return { name, value: v }
}
function updateData(value: Value<any>, data: Map<string, any>): void {
if (!data.get(value.name)) {
data.set(value.name, value.value)
}
}
function isValue<T>(value: string | Value<T>): value is Value<T> {
return (value as Value<T>).name !== undefined;
}
/**
* Generate a script with a call. Data is modified.
* @param target
* @param service
* @param functionV
* @param args
* @param returnName
* @param data
*/
export function call(target: string | Value<string>, service: string | Value<string>,
functionV: string | Value<string>, args: (string | Value<any>)[],
returnName: string | undefined, data: Map<string, any>): string {
let targetName = target;
if (isValue(target)) {
updateData(target, data)
targetName = target.name
}
let serviceName = service;
if (isValue(service)) {
updateData(service, data)
serviceName = service.name;
}
let functionName = functionV;
if (isValue(functionV)) {
updateData(functionV, data)
functionName = functionV.name;
}
let argsStr: string[] = []
args.forEach((v) => {
if (isValue(v)) {
updateData(v, data)
argsStr.push(v.name)
} else {
argsStr.push(v)
}
})
if (!returnName) {
returnName = ""
}
return `(call ${targetName} ("${serviceName}" "${functionName}") [${argsStr.join(" ")}] ${returnName})`
}
function wrap(command: string, scripts: string[]): string {
if (scripts.length === 2) {
return `(${command}
${scripts[0]}
${scripts[1]}
)`
} else {
let first = scripts.shift()
return `(${command}
${first}
${wrap(command, scripts)}
)`
}
}
/**
* Wrap an array of scripts with multiple 'seq' commands
* @param scripts
*/
export function seq(scripts: string[]): string {
if (scripts.length < 2) {
throw new Error("For 'seq' there must be at least 2 scripts")
}
return wrap("seq", scripts)
}
/**
* Wrap a script with 'par' command
* @param script
*/
export function par(script: string): string {
return `par(
${script}
)
`
}

View File

@ -1,27 +0,0 @@
import { getCurrentParticleId, registerService } from './globalState';
import { ServiceMultiple } from './service';
import log from 'loglevel';
let storage: Map<string, Map<string, any>> = new Map();
export function addData(particleId: string, data: Map<string, any>, ttl: number) {
storage.set(particleId, data);
setTimeout(() => {
log.debug(`data for ${particleId} is deleted`);
storage.delete(particleId);
}, ttl);
}
export const storageService = new ServiceMultiple('');
storageService.registerFunction('load', (args: any[]) => {
let current = getCurrentParticleId();
let data = storage.get(current);
if (data) {
return data.get(args[0]);
} else {
return {};
}
});
registerService(storageService);

View File

@ -1,83 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as PeerId from 'peer-id';
import Multiaddr from 'multiaddr';
import { FluenceClient } from './fluenceClient';
import * as log from 'loglevel';
import { LogLevelDesc } from 'loglevel';
import { parseAstClosure } from './stepper';
log.setLevel('info');
export default class Fluence {
static setLogLevel(level: LogLevelDesc): void {
log.setLevel(level);
}
/**
* Generates new peer id with Ed25519 private key.
*/
static async generatePeerId(): Promise<PeerId> {
return await PeerId.create({ keyType: 'Ed25519' });
}
/**
* Create FluenceClient without connecting it to a relay
*
* @param peerId client's peer id. Must contain a private key. See `generatePeerId()`
*/
static async local(peerId?: PeerId): Promise<FluenceClient> {
if (!peerId) {
peerId = await Fluence.generatePeerId();
}
let client = new FluenceClient(peerId);
await client.instantiateInterpreter();
return client;
}
/**
* Connect to Fluence node.
*
* @param multiaddr must contain host peer id
* @param peerId your peer id. Should be with a private key. Could be generated by `generatePeerId()` function
*/
static async connect(multiaddr: string | Multiaddr, peerId?: PeerId): Promise<FluenceClient> {
let client = await Fluence.local(peerId);
await client.connect(multiaddr);
return client;
}
/// Parses script and returns AST in JSON format
/// NOTE & TODO: interpreter is instantiated every time, make it a lazy constant?
static async parseAIR(script: string): Promise<string> {
let closure = await parseAstClosure();
return closure(script);
}
}
declare global {
interface Window {
Fluence: Fluence;
}
}
if (typeof window !== 'undefined') {
window.Fluence = Fluence;
}

View File

@ -1,414 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { build, Particle } from './particle';
import { StepperOutcome } from './stepperOutcome';
import * as PeerId from 'peer-id';
import Multiaddr from 'multiaddr';
import { FluenceConnection } from './fluenceConnection';
import { Subscriptions } from './subscriptions';
import { enqueueParticle, getCurrentParticleId, popParticle, setCurrentParticleId } from './globalState';
import { instantiateInterpreter, InterpreterInvoke } from './stepper';
import log from 'loglevel';
import { waitService } from './helpers/waitService';
import { ModuleConfig } from './moduleConfig';
const bs58 = require('bs58');
const INFO_LOG_LEVEL = 2;
export class FluenceClient {
readonly selfPeerId: PeerId;
readonly selfPeerIdStr: string;
private nodePeerIdStr: string;
private subscriptions = new Subscriptions();
private interpreter: InterpreterInvoke = undefined;
connection: FluenceConnection;
constructor(selfPeerId: PeerId) {
this.selfPeerId = selfPeerId;
this.selfPeerIdStr = selfPeerId.toB58String();
}
/**
* Pass a particle to a interpreter and send a result to other services.
*/
private async handleParticle(particle: Particle): Promise<void> {
// if a current particle is processing, add new particle to the queue
if (getCurrentParticleId() !== undefined && getCurrentParticleId() !== particle.id) {
enqueueParticle(particle);
} else {
if (this.interpreter === undefined) {
throw new Error("Undefined. Interpreter is not initialized. Use 'Fluence.connect' to create a client.");
}
// start particle processing if queue is empty
try {
setCurrentParticleId(particle.id);
// check if a particle is relevant
let now = Date.now();
let actualTtl = particle.timestamp + particle.ttl - now;
if (actualTtl <= 0) {
log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`);
} else {
// if there is no subscription yet, previous data is empty
let prevData: Uint8Array = Buffer.from([]);
let prevParticle = this.subscriptions.get(particle.id);
if (prevParticle) {
prevData = prevParticle.data;
// update a particle in a subscription
this.subscriptions.update(particle);
} else {
// set a particle with actual ttl
this.subscriptions.subscribe(particle, actualTtl);
}
let stepperOutcomeStr = this.interpreter(
particle.init_peer_id,
particle.script,
prevData,
particle.data,
);
let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr);
if (log.getLevel() <= INFO_LOG_LEVEL) {
log.info('inner interpreter outcome:');
log.info(stepperOutcome)
}
// update data after aquamarine execution
let newParticle: Particle = { ...particle };
newParticle.data = stepperOutcome.data;
this.subscriptions.update(newParticle);
// do nothing if there is no `next_peer_pks` or if client isn't connected to the network
if (stepperOutcome.next_peer_pks.length > 0 && this.connection) {
await this.connection.sendParticle(newParticle).catch((reason) => {
console.error(`Error on sending particle with id ${particle.id}: ${reason}`);
});
}
}
} finally {
// get last particle from the queue
let nextParticle = popParticle();
// start the processing of a new particle if it exists
if (nextParticle) {
// update current particle
setCurrentParticleId(nextParticle.id);
await this.handleParticle(nextParticle);
} else {
// wait for a new call (do nothing) if there is no new particle in a queue
setCurrentParticleId(undefined);
}
}
}
}
/**
* Handle incoming particle from a relay.
*/
private handleExternalParticle(): (particle: Particle) => Promise<void> {
let _this = this;
return async (particle: Particle) => {
let data: any = particle.data;
let error: any = data['protocol!error'];
if (error !== undefined) {
log.error('error in external particle: ');
log.error(error);
} else {
log.info('handle external particle: ');
log.info(particle);
await _this.handleParticle(particle);
}
};
}
async disconnect(): Promise<void> {
return this.connection.disconnect();
}
/**
* Instantiate WebAssembly with AIR interpreter to execute AIR scripts
*/
async instantiateInterpreter() {
this.interpreter = await instantiateInterpreter(this.selfPeerId);
}
/**
* Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection.
*
* @param multiaddr
*/
async connect(multiaddr: string | Multiaddr) {
multiaddr = Multiaddr(multiaddr);
if (!this.interpreter) {
throw Error("you must call 'instantiateInterpreter' before 'connect'");
}
let nodePeerId = multiaddr.getPeerId();
this.nodePeerIdStr = nodePeerId;
if (!nodePeerId) {
throw Error("'multiaddr' did not contain a valid peer id");
}
let firstConnection: boolean = true;
if (this.connection) {
firstConnection = false;
await this.connection.disconnect();
}
let node = PeerId.createFromB58String(nodePeerId);
let connection = new FluenceConnection(multiaddr, node, this.selfPeerId, this.handleExternalParticle());
await connection.connect();
this.connection = connection;
}
async sendParticle(particle: Particle): Promise<string> {
await this.handleParticle(particle);
return particle.id;
}
async executeParticle(particle: Particle) {
await this.handleParticle(particle);
}
nodeIdentityCall(): string {
return `(call "${this.nodePeerIdStr}" ("op" "identity") [] void[])`;
}
async requestResponse<T>(
name: string,
call: (nodeId: string) => string,
returnValue: string,
data: Map<string, any>,
handleResponse: (args: any[]) => T,
nodeId?: string,
ttl?: number,
): Promise<T> {
if (!ttl) {
ttl = 10000;
}
if (!nodeId) {
nodeId = this.nodePeerIdStr;
}
let serviceCall = call(nodeId);
let namedPromise = waitService(name, handleResponse, ttl);
let script = `(seq
${this.nodeIdentityCall()}
(seq
(seq
${serviceCall}
${this.nodeIdentityCall()}
)
(call "${this.selfPeerIdStr}" ("${namedPromise.name}" "") [${returnValue}] void[])
)
)
`;
let particle = await build(this.selfPeerId, script, data, ttl);
await this.sendParticle(particle);
return namedPromise.promise;
}
/**
* Send a script to add module to a relay. Waiting for a response from a relay.
*/
async addModule(
name: string,
moduleBase64: string,
config?: ModuleConfig,
nodeId?: string,
ttl?: number,
): Promise<void> {
if (!config) {
config = {
name: name,
mem_pages_count: 100,
logger_enabled: true,
wasi: {
envs: {},
preopened_files: ['/tmp'],
mapped_dirs: {},
},
};
}
let data = new Map();
data.set('module_bytes', moduleBase64);
data.set('module_config', config);
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "add_module") [module_bytes module_config] void[])`;
return this.requestResponse('addModule', call, '', data, () => {}, nodeId, ttl);
}
/**
* Send a script to add module to a relay. Waiting for a response from a relay.
*/
async addBlueprint(
name: string,
dependencies: string[],
blueprintId?: string,
nodeId?: string,
ttl?: number,
): Promise<string> {
let returnValue = 'blueprint_id';
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "add_blueprint") [blueprint] ${returnValue})`;
let data = new Map();
data.set('blueprint', { name: name, dependencies: dependencies, id: blueprintId });
return this.requestResponse(
'addBlueprint',
call,
returnValue,
data,
(args: any[]) => args[0] as string,
nodeId,
ttl,
);
}
/**
* Send a script to create a service to a relay. Waiting for a response from a relay.
*/
async createService(blueprintId: string, nodeId?: string, ttl?: number): Promise<string> {
let returnValue = 'service_id';
let call = (nodeId: string) => `(call "${nodeId}" ("srv" "create") [blueprint_id] ${returnValue})`;
let data = new Map();
data.set('blueprint_id', blueprintId);
return this.requestResponse(
'createService',
call,
returnValue,
data,
(args: any[]) => args[0] as string,
nodeId,
ttl,
);
}
/**
* Get all available modules hosted on a connected relay.
*/
async getAvailableModules(nodeId?: string, ttl?: number): Promise<string[]> {
let returnValue = 'modules';
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "get_modules") [] ${returnValue})`;
return this.requestResponse(
'getAvailableModules',
call,
returnValue,
new Map(),
(args: any[]) => args[0] as string[],
nodeId,
ttl,
);
}
/**
* Get all available blueprints hosted on a connected relay.
*/
async getBlueprints(nodeId: string, ttl?: number): Promise<string[]> {
let returnValue = 'blueprints';
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "get_blueprints") [] ${returnValue})`;
return this.requestResponse(
'getBlueprints',
call,
returnValue,
new Map(),
(args: any[]) => args[0] as string[],
nodeId,
ttl,
);
}
/**
* Add a provider to DHT network to neighborhood around a key.
*/
async addProvider(
key: Buffer,
providerPeer: string,
providerServiceId?: string,
nodeId?: string,
ttl?: number,
): Promise<void> {
let call = (nodeId: string) => `(call "${nodeId}" ("dht" "add_provider") [key provider] void[])`;
key = bs58.encode(key);
let provider = {
peer: providerPeer,
service_id: providerServiceId,
};
let data = new Map();
data.set('key', key);
data.set('provider', provider);
return this.requestResponse('addProvider', call, '', data, () => {}, nodeId, ttl);
}
/**
* Get a provider from DHT network from neighborhood around a key..
*/
async getProviders(key: Buffer, nodeId?: string, ttl?: number): Promise<any> {
key = bs58.encode(key);
let returnValue = 'providers';
let call = (nodeId: string) => `(call "${nodeId}" ("dht" "get_providers") [key] providers[])`;
let data = new Map();
data.set('key', key);
return this.requestResponse('getProviders', call, returnValue, data, (args) => args[0], nodeId, ttl);
}
/**
* Get relays neighborhood
*/
async neighborhood(node: string, ttl?: number): Promise<string[]> {
let returnValue = 'neighborhood';
let call = (nodeId: string) => `(call "${nodeId}" ("dht" "neighborhood") [node] ${returnValue})`;
let data = new Map();
data.set('node', node);
return this.requestResponse('neighborhood', call, returnValue, data, (args) => args[0] as string[], node, ttl);
}
/**
* Call relays 'identity' method. It should return passed 'fields'
*/
async relayIdentity(fields: string[], data: Map<string, any>, nodeId?: string, ttl?: number): Promise<any> {
let returnValue = 'id';
let call = (nodeId: string) => `(call "${nodeId}" ("op" "identity") [${fields.join(' ')}] ${returnValue})`;
return this.requestResponse('getIdentity', call, returnValue, data, (args: any[]) => args[0], nodeId, ttl);
}
}

View File

@ -1,51 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Service } from './service';
import { Particle } from './particle';
// TODO put state with wasm file in each created FluenceClient
let services: Map<string, Service> = new Map();
let particlesQueue: Particle[] = [];
let currentParticle: string | undefined = undefined;
export function getCurrentParticleId(): string | undefined {
return currentParticle;
}
export function setCurrentParticleId(particle: string | undefined) {
currentParticle = particle;
}
export function enqueueParticle(particle: Particle): void {
particlesQueue.push(particle);
}
export function popParticle(): Particle | undefined {
return particlesQueue.pop();
}
export function registerService(service: Service) {
services.set(service.serviceId, service);
}
export function deleteService(serviceId: string): boolean {
return services.delete(serviceId);
}
export function getService(serviceId: string): Service | undefined {
return services.get(serviceId);
}

View File

@ -1,46 +0,0 @@
/**
* Creates service that will wait for a response from external peers.
*/
import { genUUID } from '../particle';
import log from 'loglevel';
import { ServiceMultiple } from '../service';
import { deleteService, registerService } from '../globalState';
import { delay } from '../utils';
interface NamedPromise<T> {
promise: Promise<T>;
name: string;
}
/**
* Generates a service and a name of a service.
* Name should be used in a script.
* Promise will wait a result from a script or will be resolved after `ttl` milliseconds.
* @param ttl
*/
export function waitResult(ttl: number): NamedPromise<any[]> {
return waitService(genUUID(), (args: any[]) => args, ttl);
}
export function waitService<T>(functionName: string, func: (args: any[]) => T, ttl: number): NamedPromise<T> {
let serviceName = `${functionName}-${genUUID()}`;
log.info(`Create waiting service '${serviceName}'`);
let service = new ServiceMultiple(serviceName);
registerService(service);
let promise: Promise<T> = new Promise(function (resolve) {
service.registerFunction('', (args: any[]) => {
resolve(func(args));
return {};
});
});
let timeout = delay<T>(ttl, 'Timeout on waiting ' + serviceName);
return {
name: serviceName,
promise: Promise.race([promise, timeout]).finally(() => {
deleteService(serviceName);
}),
};
}

View File

@ -14,8 +14,8 @@
* limitations under the License.
*/
export interface StepperOutcome {
ret_code: number;
data: Uint8Array;
next_peer_pks: string[];
}
export { seedToPeerId, peerIdToSeed, generatePeerId } from './internal/peerIdUtils';
export { FluenceClient } from './FluenceClient';
export { SecurityTetraplet, PeerIdB58 } from './internal/commonTypes';
export * from './api';
export { Particle } from './internal/particle';

View File

@ -0,0 +1,96 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { build } from './particle';
import * as PeerId from 'peer-id';
import Multiaddr from 'multiaddr';
import { FluenceConnection } from './FluenceConnection';
import { ParticleProcessor } from './ParticleProcessor';
import { ParticleProcessorStrategy } from './ParticleProcessorStrategy';
import log from 'loglevel';
import { PeerIdB58 } from './commonTypes';
export abstract class FluenceClientBase {
readonly selfPeerIdFull: PeerId;
get relayPeerId(): PeerIdB58 {
return this.connection?.nodePeerId.toB58String();
}
get selfPeerId(): PeerIdB58 {
return this.selfPeerIdFull.toB58String();
}
get isConnected(): boolean {
return this.connection?.isConnected();
}
protected connection: FluenceConnection;
protected processor: ParticleProcessor;
protected abstract strategy: ParticleProcessorStrategy;
constructor(selfPeerIdFull: PeerId) {
this.selfPeerIdFull = selfPeerIdFull;
}
async disconnect(): Promise<void> {
await this.connection.disconnect();
await this.processor.destroy();
}
// HACK:: this is only needed to fix tests.
// Particle processor should be tested instead
async local(): Promise<void> {
await this.processor.init();
}
/**
* Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection.
*
* @param multiaddr
*/
async connect(multiaddr: string | Multiaddr): Promise<void> {
multiaddr = Multiaddr(multiaddr);
const nodePeerId = multiaddr.getPeerId();
if (!nodePeerId) {
throw Error("'multiaddr' did not contain a valid peer id");
}
if (this.connection) {
await this.connection.disconnect();
}
const node = PeerId.createFromB58String(nodePeerId);
const connection = new FluenceConnection(
multiaddr,
node,
this.selfPeerIdFull,
this.processor.executeExternalParticle.bind(this.processor),
);
await connection.connect();
await this.processor.init();
this.connection = connection;
}
async sendScript(script: string, data?: Map<string, any>, ttl?: number): Promise<string> {
const particle = await build(this.selfPeerIdFull, script, data, ttl);
this.processor.executeLocalParticle(particle);
return particle.id;
}
}

View File

@ -23,7 +23,7 @@ import pipe from 'it-pipe';
import Multiaddr from 'multiaddr';
import PeerId from 'peer-id';
import * as log from 'loglevel';
import { build, parseParticle, Particle, toAction } from './particle';
import { parseParticle, ParticleDto, toPayload } from './particle';
export const PROTOCOL_NAME = '/fluence/faas/1.0.0';
@ -39,9 +39,14 @@ export class FluenceConnection {
private readonly address: Multiaddr;
readonly nodePeerId: PeerId;
private readonly selfPeerIdStr: string;
private readonly handleParticle: (call: Particle) => void;
private readonly handleParticle: (call: ParticleDto) => void;
constructor(multiaddr: Multiaddr, hostPeerId: PeerId, selfPeerId: PeerId, handleParticle: (call: Particle) => void) {
constructor(
multiaddr: Multiaddr,
hostPeerId: PeerId,
selfPeerId: PeerId,
handleParticle: (call: ParticleDto) => void,
) {
this.selfPeerId = selfPeerId;
this.handleParticle = handleParticle;
this.selfPeerIdStr = selfPeerId.toB58String();
@ -114,14 +119,10 @@ export class FluenceConnection {
this.status = Status.Disconnected;
}
async buildParticle(script: string, data: Map<string, any>, ttl?: number): Promise<Particle> {
return build(this.selfPeerId, script, data, ttl);
}
async sendParticle(particle: Particle): Promise<void> {
async sendParticle(particle: ParticleDto): Promise<void> {
this.checkConnectedOrThrow();
let action = toAction(particle)
let action = toPayload(particle);
let particleStr = JSON.stringify(action);
log.debug('send particle: \n' + JSON.stringify(action, undefined, 2));

View File

@ -0,0 +1,239 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ParticleDto } from './particle';
import * as PeerId from 'peer-id';
import { instantiateInterpreter, InterpreterInvoke } from './stepper';
import { ParticleHandler, SecurityTetraplet, StepperOutcome } from './commonTypes';
import log from 'loglevel';
import { ParticleProcessorStrategy } from './ParticleProcessorStrategy';
// HACK:: make an api for aqua stepper to accept variables in an easy way!
let magicParticleStorage: Map<string, Map<string, any>> = new Map();
// HACK:: make an api for aqua stepper to accept variables in an easy way!
export function injectDataIntoParticle(particleId: string, data: Map<string, any>, ttl: number) {
log.debug(`setting data for ${particleId}`, data);
magicParticleStorage.set(particleId, data);
setTimeout(() => {
log.debug(`data for ${particleId} is deleted`);
magicParticleStorage.delete(particleId);
}, ttl);
}
// HACK:: make an api for aqua stepper to accept variables in an easy way!
const wrapWithDataInjectionHandling = (
handler: ParticleHandler,
getCurrentParticleId: () => string,
): ParticleHandler => {
return (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => {
if (serviceId === '__magic' && fnName === 'load') {
const current = getCurrentParticleId();
const data = magicParticleStorage.get(current);
const res = data ? data.get(args[0]) : {};
return {
ret_code: 0,
result: JSON.stringify(res),
};
}
return handler(serviceId, fnName, args, tetraplets);
};
};
export class ParticleProcessor {
private interpreter: InterpreterInvoke;
private subscriptions: Map<string, ParticleDto> = new Map();
private particlesQueue: ParticleDto[] = [];
private currentParticle?: string;
strategy: ParticleProcessorStrategy;
peerId: PeerId;
constructor(strategy: ParticleProcessorStrategy, peerId: PeerId) {
this.strategy = strategy;
this.peerId = peerId;
}
async init() {
await this.instantiateInterpreter();
}
async destroy() {
// TODO: destroy interpreter
}
async executeLocalParticle(particle: ParticleDto) {
this.strategy?.onLocalParticleRecieved(particle);
await this.handleParticle(particle).catch((err) => {
log.error('particle processing failed: ' + err);
});
}
async executeExternalParticle(particle: ParticleDto) {
this.strategy?.onExternalParticleRecieved(particle);
await this.handleExternalParticle(particle);
}
/*
* private
*/
private getCurrentParticleId(): string | undefined {
return this.currentParticle;
}
private setCurrentParticleId(particle: string | undefined) {
this.currentParticle = particle;
}
private enqueueParticle(particle: ParticleDto): void {
this.particlesQueue.push(particle);
}
private popParticle(): ParticleDto | undefined {
return this.particlesQueue.pop();
}
/**
* Subscriptions will be applied by outside message if id will be the same.
*
* @param particle
* @param ttl time to live, subscription will be deleted after this time
*/
subscribe(particle: ParticleDto, ttl: number) {
let self = this;
setTimeout(() => {
self.subscriptions.delete(particle.id);
self.strategy?.onParticleTimeout(particle, Date.now());
}, ttl);
this.subscriptions.set(particle.id, particle);
}
updateSubscription(particle: ParticleDto): boolean {
if (this.subscriptions.has(particle.id)) {
this.subscriptions.set(particle.id, particle);
return true;
} else {
return false;
}
}
getSubscription(id: string): ParticleDto | undefined {
return this.subscriptions.get(id);
}
hasSubscription(particle: ParticleDto): boolean {
return this.subscriptions.has(particle.id);
}
/**
* Pass a particle to a interpreter and send a result to other services.
*/
private async handleParticle(particle: ParticleDto): Promise<void> {
// if a current particle is processing, add new particle to the queue
if (this.getCurrentParticleId() !== undefined && this.getCurrentParticleId() !== particle.id) {
this.enqueueParticle(particle);
} else {
if (this.interpreter === undefined) {
throw new Error('Undefined. Interpreter is not initialized');
}
// start particle processing if queue is empty
try {
this.setCurrentParticleId(particle.id);
// check if a particle is relevant
let now = Date.now();
let actualTtl = particle.timestamp + particle.ttl - now;
if (actualTtl <= 0) {
this.strategy?.onParticleTimeout(particle, now);
} else {
// if there is no subscription yet, previous data is empty
let prevData: Uint8Array = Buffer.from([]);
let prevParticle = this.getSubscription(particle.id);
if (prevParticle) {
prevData = prevParticle.data;
// update a particle in a subscription
this.updateSubscription(particle);
} else {
// set a particle with actual ttl
this.subscribe(particle, actualTtl);
}
this.strategy.onStepperExecuting(particle);
let stepperOutcomeStr = this.interpreter(
particle.init_peer_id,
particle.script,
prevData,
particle.data,
);
let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr);
// update data after aquamarine execution
let newParticle: ParticleDto = { ...particle, data: stepperOutcome.data };
this.strategy.onStepperExecuted(stepperOutcome);
this.updateSubscription(newParticle);
// do nothing if there is no `next_peer_pks` or if client isn't connected to the network
if (stepperOutcome.next_peer_pks.length > 0) {
this.strategy.sendParticleFurther(newParticle);
}
}
} finally {
// get last particle from the queue
let nextParticle = this.popParticle();
// start the processing of a new particle if it exists
if (nextParticle) {
// update current particle
this.setCurrentParticleId(nextParticle.id);
await this.handleParticle(nextParticle);
} else {
// wait for a new call (do nothing) if there is no new particle in a queue
this.setCurrentParticleId(undefined);
}
}
}
}
/**
* Handle incoming particle from a relay.
*/
private async handleExternalParticle(particle: ParticleDto): Promise<void> {
let data: any = particle.data;
let error: any = data['protocol!error'];
if (error !== undefined) {
log.error('error in external particle: ');
log.error(error);
} else {
log.info('handle external particle: ');
log.info(particle);
await this.handleParticle(particle);
}
}
/**
* Instantiate WebAssembly with AIR interpreter to execute AIR scripts
*/
async instantiateInterpreter() {
this.interpreter = await instantiateInterpreter(
wrapWithDataInjectionHandling(
this.strategy.particleHandler.bind(this),
this.getCurrentParticleId.bind(this),
),
this.peerId,
);
}
}

View File

@ -14,16 +14,16 @@
* limitations under the License.
*/
export interface ModuleConfig {
name: string;
mem_pages_count?: number;
logger_enabled?: boolean;
wasi?: Wasi;
mounted_binaries?: object;
}
import { ParticleHandler, StepperOutcome } from './commonTypes';
import { ParticleDto } from './particle';
export interface Wasi {
envs?: object;
preopened_files?: string[];
mapped_dirs?: object;
export interface ParticleProcessorStrategy {
particleHandler: ParticleHandler;
sendParticleFurther: (particle: ParticleDto) => void;
onParticleTimeout?: (particle: ParticleDto, now: number) => void;
onLocalParticleRecieved?: (particle: ParticleDto) => void;
onExternalParticleRecieved?: (particle: ParticleDto) => void;
onStepperExecuting?: (particle: ParticleDto) => void;
onStepperExecuted?: (stepperOutcome: StepperOutcome) => void;
}

View File

@ -14,6 +14,24 @@
* limitations under the License.
*/
export interface CallServiceResult {
ret_code: number;
result: string;
}
export type ParticleHandler = (
serviceId: string,
fnName: string,
args: any[],
tetraplets: SecurityTetraplet[][],
) => CallServiceResult;
export interface StepperOutcome {
ret_code: number;
data: Uint8Array;
next_peer_pks: string[];
}
export interface ResolvedTriplet {
peer_pk: string;
service_id: string;
@ -23,3 +41,5 @@ export interface ResolvedTriplet {
export interface SecurityTetraplet extends ResolvedTriplet {
json_path: string;
}
export type PeerIdB58 = string;

View File

@ -15,14 +15,36 @@
*/
import { v4 as uuidv4 } from 'uuid';
import {fromByteArray, toByteArray} from 'base64-js';
import { fromByteArray, toByteArray } from 'base64-js';
import PeerId from 'peer-id';
import { encode } from 'bs58';
import { addData } from './dataStorage';
import { injectDataIntoParticle } from './ParticleProcessor';
const DEFAULT_TTL = 7000;
export interface Particle {
export class Particle {
script: string;
data: Map<string, any>;
ttl: number;
constructor(script: string, data?: Map<string, any> | Record<string, any>, ttl?: number) {
this.script = script;
if (data === undefined) {
this.data = new Map();
} else if (data instanceof Map) {
this.data = data;
} else {
this.data = new Map();
for (let k in data) {
this.data.set(k, data[k]);
}
}
this.ttl = ttl ?? DEFAULT_TTL;
}
}
export interface ParticleDto {
id: string;
init_peer_id: string;
timestamp: number;
@ -36,8 +58,8 @@ export interface Particle {
/**
* Represents particle action to send to a node
*/
interface ParticleAction {
action: 'Particle'
interface ParticlePayload {
action: 'Particle';
id: string;
init_peer_id: string;
timestamp: number;
@ -47,11 +69,11 @@ interface ParticleAction {
data: string;
}
function wrapScript(selfPeerId: string, script: string, fields: string[]): string {
function wrapWithVariableInjectionScript(script: string, fields: string[]): string {
fields.forEach((v) => {
script = `
(seq
(call %init_peer_id% ("" "load") ["${v}"] ${v})
(call %init_peer_id% ("__magic" "load") ["${v}"] ${v})
${script}
)
`;
@ -60,18 +82,28 @@ function wrapScript(selfPeerId: string, script: string, fields: string[]): strin
return script;
}
export async function build(peerId: PeerId, script: string, data: Map<string, any>, ttl?: number): Promise<Particle> {
let id = genUUID();
export async function build(
peerId: PeerId,
script: string,
data?: Map<string, any>,
ttl?: number,
customId?: string,
): Promise<ParticleDto> {
const id = customId ?? genUUID();
let currentTime = new Date().getTime();
if (data === undefined) {
data = new Map();
}
if (ttl === undefined) {
ttl = DEFAULT_TTL;
}
addData(id, data, ttl);
script = wrapScript(peerId.toB58String(), script, Array.from(data.keys()));
injectDataIntoParticle(id, data, ttl);
script = wrapWithVariableInjectionScript(script, Array.from(data.keys()));
let particle: Particle = {
let particle: ParticleDto = {
id: id,
init_peer_id: peerId.toB58String(),
timestamp: currentTime,
@ -89,9 +121,9 @@ export async function build(peerId: PeerId, script: string, data: Map<string, an
/**
* Creates an action to send to a node.
*/
export function toAction(particle: Particle): ParticleAction {
export function toPayload(particle: ParticleDto): ParticlePayload {
return {
action: "Particle",
action: 'Particle',
id: particle.id,
init_peer_id: particle.init_peer_id,
timestamp: particle.timestamp,
@ -99,11 +131,11 @@ export function toAction(particle: Particle): ParticleAction {
script: particle.script,
// TODO: copy signature from a particle after signatures will be implemented on nodes
signature: [],
data: fromByteArray(particle.data)
data: fromByteArray(particle.data),
};
}
export function parseParticle(str: string): Particle {
export function parseParticle(str: string): ParticleDto {
let json = JSON.parse(str);
return {
@ -117,7 +149,7 @@ export function parseParticle(str: string): Particle {
};
}
export function canonicalBytes(particle: Particle) {
export function canonicalBytes(particle: ParticleDto) {
let peerIdBuf = Buffer.from(particle.init_peer_id, 'utf8');
let idBuf = Buffer.from(particle.id, 'utf8');
@ -137,7 +169,7 @@ export function canonicalBytes(particle: Particle) {
/**
* Sign a particle with a private key from peerId.
*/
export async function signParticle(peerId: PeerId, particle: Particle): Promise<string> {
export async function signParticle(peerId: PeerId, particle: ParticleDto): Promise<string> {
let bufToSign = canonicalBytes(particle);
let signature = await peerId.privKey.sign(bufToSign);

View File

@ -32,3 +32,7 @@ export function peerIdToSeed(peerId: PeerId): string {
let seedBuf = peerId.privKey.marshal().subarray(0, 32);
return encode(seedBuf);
}
export async function generatePeerId(): Promise<PeerId> {
return await PeerId.create({ keyType: 'Ed25519' });
}

View File

@ -17,8 +17,8 @@
import { toByteArray } from 'base64-js';
import * as aqua from './aqua';
import { return_current_peer_id, return_call_service_result, getStringFromWasm0, free } from './aqua';
import { ParticleHandler, CallServiceResult, SecurityTetraplet } from './commonTypes';
import { service } from './service';
import PeerId from 'peer-id';
import log from 'loglevel';
import { wasmBs64 } from '@fluencelabs/aquamarine-stepper';
@ -26,7 +26,12 @@ import Instance = WebAssembly.Instance;
import Exports = WebAssembly.Exports;
import ExportValue = WebAssembly.ExportValue;
export type InterpreterInvoke = (init_user_id: string, script: string, prev_data: Uint8Array, data: Uint8Array) => string;
export type InterpreterInvoke = (
init_user_id: string,
script: string,
prev_data: Uint8Array,
data: Uint8Array,
) => string;
type ImportObject = {
'./aquamarine_client_bg.js': {
// fn call_service_impl(service_id: String, fn_name: String, args: String, security_tetraplets: String) -> String;
@ -117,8 +122,32 @@ function log_import(cfg: HostImportsConfig): LogImport {
};
}
const theParticleHandler = (
callback: ParticleHandler,
service_id: string,
fn_name: string,
args: string,
tetraplets: string,
): CallServiceResult => {
try {
let argsObject = JSON.parse(args);
if (!Array.isArray(argsObject)) {
throw new Error('args is not an array');
}
let tetrapletsObject: SecurityTetraplet[][] = JSON.parse(tetraplets);
return callback(service_id, fn_name, argsObject, tetrapletsObject);
} catch (err) {
console.error('Cannot parse arguments: ' + JSON.stringify(err));
return {
result: JSON.stringify('Cannot parse arguments: ' + JSON.stringify(err)),
ret_code: 1,
};
}
};
/// Returns import object that describes host functions called by AIR interpreter
function newImportObject(cfg: HostImportsConfig, peerId: PeerId): ImportObject {
function newImportObject(particleHandler: ParticleHandler, cfg: HostImportsConfig, peerId: PeerId): ImportObject {
return {
// __wbg_callserviceimpl_c0ca292e3c8c0c97 this is a function generated by bindgen. Could be changed.
// If so, an error with a new name will be occurred after wasm initialization.
@ -131,7 +160,14 @@ function newImportObject(cfg: HostImportsConfig, peerId: PeerId): ImportObject {
let fnName = getStringFromWasm0(wasm, arg3, arg4);
let args = getStringFromWasm0(wasm, arg5, arg6);
let tetraplets = getStringFromWasm0(wasm, arg7, arg8);
let serviceResult = service(serviceId, fnName, args, tetraplets);
/*
TODO:: parse and pack arguments into structure like the following
class Argument<T> {
value: T,
SecurityTetraplet: tetraplet
}
*/
let serviceResult = theParticleHandler(particleHandler, serviceId, fnName, args, tetraplets);
let resultStr = JSON.stringify(serviceResult);
return_call_service_result(wasm, resultStr, arg0);
} finally {
@ -167,9 +203,12 @@ function newLogImport(cfg: HostImportsConfig): ImportObject {
/// Instantiates AIR interpreter, and returns its `invoke` function as closure
/// NOTE: an interpreter is also called a stepper from time to time
export async function instantiateInterpreter(peerId: PeerId): Promise<InterpreterInvoke> {
export async function instantiateInterpreter(
particleHandler: ParticleHandler,
peerId: PeerId,
): Promise<InterpreterInvoke> {
let cfg = new HostImportsConfig((cfg) => {
return newImportObject(cfg, peerId);
return newImportObject(particleHandler, cfg, peerId);
});
let instance = await interpreterInstance(cfg);
@ -204,3 +243,10 @@ export async function parseAstClosure(): Promise<(script: string) => string> {
return aqua.ast(instance.exports, script);
};
}
/// Parses script and returns AST in JSON format
/// NOTE & TODO: interpreter is instantiated every time, make it a lazy constant?
export async function parseAIR(script: string): Promise<string> {
let closure = await parseAstClosure();
return closure(script);
}

View File

@ -14,18 +14,15 @@
* limitations under the License.
*/
import { FluenceClient } from '../fluenceClient';
import { Certificate, certificateFromString, certificateToString } from './certificate';
import * as log from 'loglevel';
// TODO update after 'aquamarine' implemented
// The client to interact with the Fluence trust graph API
export class TrustGraph {
client: FluenceClient;
//client: FluenceClient;
constructor(client: FluenceClient) {
this.client = client;
}
constructor() {}
// Publish certificate to Fluence network. It will be published in Kademlia neighbourhood by `peerId` key.
async publishCertificates(peerId: string, certs: Certificate[]) {

View File

@ -1,158 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { getService } from './globalState';
import { SecurityTetraplet } from './securityTetraplet';
export interface CallServiceResult {
ret_code: number;
result: string;
}
export abstract class Service {
serviceId: string;
/**
* Calls the function from local client
* @param fnName - name of the function to call
* @param args - arguments to be passed to the function
* @param tetraplets - array of arrays of tetraplets. First index corresponds to argument number.
* If the argument is not an array the second array will always contain exactly one element.
* If the argument is an array the second index will correspond to the index of element in argument's array
*/
abstract call(fnName: string, args: any[], tetraplets: SecurityTetraplet[][]): CallServiceResult;
}
/**
* Creates one function for all function names.
*/
export class ServiceOne implements Service {
serviceId: string;
fn: (fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => object;
constructor(serviceId: string, fn: (fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => object) {
this.serviceId = serviceId;
this.fn = fn;
}
/**
* Calls the function from local client
* @param fnName - name of the function to call
* @param args - arguments to be passed to the function
* @param tetraplets - array of arrays of tetraplets. First index corresponds to argument number.
* If the argument is not an array the second array will always contain exactly one element.
* If the argument is an array the second index will correspond to the index of element in argument's array
*/
call(fnName: string, args: any[], tetraplets: SecurityTetraplet[][]): CallServiceResult {
try {
let result = this.fn(fnName, args, tetraplets);
return {
ret_code: 0,
result: JSON.stringify(result),
};
} catch (err) {
return {
ret_code: 1,
result: JSON.stringify(err),
};
}
}
}
/**
* Creates function per function name. Returns an error when call a name without registered function.
*/
export class ServiceMultiple implements Service {
serviceId: string;
functions: Map<string, (args: any[], tetraplets: SecurityTetraplet[][]) => object> = new Map();
constructor(serviceId: string) {
this.serviceId = serviceId;
}
/**
* Registers a callback function into Aquamarine
* @param fnName - the function name to be registered
* @param fn - callback function which will be called from Aquamarine.
* The callback function has the following parameters:
* args - arguments to be passed to the function
* tetraplets - array of arrays of tetraplets. First index corresponds to argument number.
* If the argument is not an array the second array will always contain exactly one element.
* If the argument is an array the second index will correspond to the index of element in argument's array
*/
registerFunction(fnName: string, fn: (args: any[], tetraplets: SecurityTetraplet[][]) => object) {
this.functions.set(fnName, fn);
}
/**
* Calls the function from local client
* @param fnName - name of the function to call
* @param args - arguments to be passed to the function
* @param tetraplets - array of arrays of tetraplets. First index corresponds to argument number.
* If the argument is not an array the second array will always contain exactly one element.
* If the argument is an array the second index will correspond to the index of element in argument's array
*/
call(fnName: string, args: any[], tetraplets: SecurityTetraplet[][]): CallServiceResult {
let fn = this.functions.get(fnName);
if (fn) {
try {
let result = fn(args, tetraplets);
return {
ret_code: 0,
result: JSON.stringify(result),
};
} catch (err) {
return {
ret_code: 1,
result: JSON.stringify(err),
};
}
} else {
let errorMsg = `Error. There is no function ${fnName}`;
return {
ret_code: 1,
result: JSON.stringify(errorMsg),
};
}
}
}
export function service(service_id: string, fn_name: string, args: string, tetraplets: string): CallServiceResult {
try {
let argsObject = JSON.parse(args);
if (!Array.isArray(argsObject)) {
throw new Error('args is not an array');
}
let tetrapletsObject: SecurityTetraplet[][] = JSON.parse(tetraplets);
let service = getService(service_id);
if (service) {
return service.call(fn_name, argsObject, tetrapletsObject);
} else {
return {
result: JSON.stringify(`Error. There is no service: ${service_id}`),
ret_code: 0,
};
}
} catch (err) {
console.error('Cannot parse arguments: ' + JSON.stringify(err));
return {
result: JSON.stringify('Cannot parse arguments: ' + JSON.stringify(err)),
ret_code: 1,
};
}
}

View File

@ -1,56 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Particle } from './particle';
import log from 'loglevel';
export class Subscriptions {
private subscriptions: Map<string, Particle> = new Map();
constructor() {}
/**
* Subscriptions will be applied by outside message if id will be the same.
*
* @param particle
* @param ttl time to live, subscription will be deleted after this time
*/
subscribe(particle: Particle, ttl: number) {
let _this = this;
setTimeout(() => {
_this.subscriptions.delete(particle.id);
log.info(`Particle with id ${particle.id} deleted by timeout`);
}, ttl);
this.subscriptions.set(particle.id, particle);
}
update(particle: Particle): boolean {
if (this.subscriptions.has(particle.id)) {
this.subscriptions.set(particle.id, particle);
return true;
} else {
return false;
}
}
get(id: string): Particle | undefined {
return this.subscriptions.get(id);
}
hasSubscription(particle: Particle): boolean {
return this.subscriptions.has(particle.id);
}
}

View File

@ -1,153 +0,0 @@
import 'mocha';
import Fluence from '../fluence';
import { build } from '../particle';
import { ServiceMultiple } from '../service';
import { registerService } from '../globalState';
import { expect } from 'chai';
import { SecurityTetraplet } from '../securityTetraplet';
function registerPromiseService<T>(
serviceId: string,
fnName: string,
f: (args: any[]) => T,
): Promise<[T, SecurityTetraplet[][]]> {
let service = new ServiceMultiple(serviceId);
registerService(service);
return new Promise((resolve, reject) => {
service.registerFunction(fnName, (args: any[], tetraplets: SecurityTetraplet[][]) => {
resolve([f(args), tetraplets]);
return { result: f(args) };
});
});
}
describe('== AIR suite', () => {
it('check init_peer_id', async function () {
let serviceId = 'init_peer';
let fnName = 'id';
let checkPromise = registerPromiseService(serviceId, fnName, (args) => args[0]);
let client = await Fluence.local();
let script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [%init_peer_id%])`;
let particle = await build(client.selfPeerId, script, new Map());
await client.executeParticle(particle);
let args = (await checkPromise)[0];
expect(args).to.be.equal(client.selfPeerIdStr);
});
it('call local function', async function () {
let serviceId = 'console';
let fnName = 'log';
let checkPromise = registerPromiseService(serviceId, fnName, (args) => args[0]);
let client = await Fluence.local();
let arg = 'hello';
let script = `(call %init_peer_id% ("${serviceId}" "${fnName}") ["${arg}"])`;
// Wrap script into particle, so it can be executed by local WASM runtime
let particle = await build(client.selfPeerId, script, new Map());
await client.executeParticle(particle);
let [args, tetraplets] = await checkPromise;
expect(args).to.be.equal(arg);
});
it('check particle arguments', async function () {
let serviceId = 'check';
let fnName = 'args';
let checkPromise = registerPromiseService(serviceId, fnName, (args) => args[0]);
let client = await Fluence.local();
let arg = 'arg1';
let value = 'hello';
let script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [${arg}])`;
let data = new Map();
data.set('arg1', value);
let particle = await build(client.selfPeerId, script, data);
await client.executeParticle(particle);
let [args, tetraplets] = await checkPromise;
expect(args).to.be.equal(value);
});
it('check security tetraplet', async function () {
let makeDataPromise = registerPromiseService('make_data_service', 'make_data', (args) => {
field: 42;
});
let getDataPromise = registerPromiseService('get_data_service', 'get_data', (args) => args[0]);
let client = await Fluence.local();
let script = `
(seq
(call %init_peer_id% ("make_data_service" "make_data") [] result)
(call %init_peer_id% ("get_data_service" "get_data") [result.$.field])
)`;
let particle = await build(client.selfPeerId, script, new Map());
await client.executeParticle(particle);
await makeDataPromise;
let [args, tetraplets] = await getDataPromise;
let tetraplet = tetraplets[0][0];
expect(tetraplet).to.contain({
service_id: 'make_data_service',
function_name: 'make_data',
json_path: '$.field',
});
});
it('check chain of services work properly', async function () {
this.timeout(5000);
let serviceId1 = 'check1';
let fnName1 = 'fn1';
let checkPromise1 = registerPromiseService(serviceId1, fnName1, (args) => args[0]);
let serviceId2 = 'check2';
let fnName2 = 'fn2';
let checkPromise2 = registerPromiseService(serviceId2, fnName2, (args) => args[0]);
let serviceId3 = 'check3';
let fnName3 = 'fn3';
let checkPromise3 = registerPromiseService(serviceId3, fnName3, (args) => args);
let client = await Fluence.local();
let arg1 = 'arg1';
let arg2 = 'arg2';
// language=Clojure
let script = `(seq
(seq
(call %init_peer_id% ("${serviceId1}" "${fnName1}") ["${arg1}"] result1)
(call %init_peer_id% ("${serviceId2}" "${fnName2}") ["${arg2}"] result2))
(call %init_peer_id% ("${serviceId3}" "${fnName3}") [result1 result2]))
`;
let particle = await build(client.selfPeerId, script, new Map());
await client.executeParticle(particle);
let args1 = (await checkPromise1)[0];
expect(args1).to.be.equal(arg1);
let args2 = (await checkPromise2)[0];
expect(args2).to.be.equal(arg2);
let args3 = (await checkPromise3)[0];
expect(args3).to.be.deep.equal([{ result: arg1 }, { result: arg2 }]);
});
});

View File

@ -1,128 +0,0 @@
import { expect } from 'chai';
import 'mocha';
import { encode } from 'bs58';
import Fluence from '../fluence';
import { certificateFromString, certificateToString, issue } from '../trust/certificate';
import { TrustGraph } from '../trust/trust_graph';
import { nodeRootCert } from '../trust/misc';
import { peerIdToSeed, seedToPeerId } from '../seed';
import { build } from '../particle';
import { Service, ServiceOne } from '../service';
import { registerService } from '../globalState';
import { waitResult } from '../helpers/waitService';
describe('Typescript usage suite', () => {
it('should create private key from seed and back', async function () {
// prettier-ignore
let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201];
let seedStr = encode(seed);
console.log('SEED STR: ' + seedStr);
let pid = await seedToPeerId(seedStr);
expect(peerIdToSeed(pid)).to.be.equal(seedStr);
});
it('should serialize and deserialize certificate correctly', async function () {
let cert = `11
1111
5566Dn4ZXXbBK5LJdUsE7L3pG9qdAzdPY47adjzkhEx9
3HNXpW2cLdqXzf4jz5EhsGEBFkWzuVdBCyxzJUZu2WPVU7kpzPjatcqvdJMjTtcycVAdaV5qh2fCGphSmw8UMBkr
158981172690500
1589974723504
2EvoZAZaGjKWFVdr36F1jphQ5cW7eK3yM16mqEHwQyr7
4UAJQWzB3nTchBtwARHAhsn7wjdYtqUHojps9xV6JkuLENV8KRiWM3BhQByx5KijumkaNjr7MhHjouLawmiN1A4d
1590061123504
1589974723504`;
let deser = await certificateFromString(cert);
let ser = certificateToString(deser);
expect(ser).to.be.equal(cert);
});
// delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
it.skip('test certs', async function () {
this.timeout(15000);
await testCerts();
});
it.skip('', async function () {
let pid = await Fluence.generatePeerId();
let cl = await Fluence.connect(
'/ip4/138.197.177.2/tcp/9001/ws/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9',
pid,
);
let service = new ServiceOne('test', (fnName: string, args: any[]) => {
console.log('called: ' + args);
return {};
});
registerService(service);
let namedPromise = waitResult(30000);
let script = `
(seq (
(call ( "${pid.toB58String()}" ("test" "test") (a b c d) result))
(call ( "${pid.toB58String()}" ("${namedPromise.name}" "") (d c b a) void[]))
))
`;
let data: Map<string, any> = new Map();
data.set('a', 'some a');
data.set('b', 'some b');
data.set('c', 'some c');
data.set('d', 'some d');
let particle = await build(pid, script, data, 30000);
await cl.sendParticle(particle);
let res = await namedPromise.promise;
expect(res).to.be.equal(['some d', 'some c', 'some b', 'some a']);
});
});
const delay = (ms: number) => new Promise((res) => setTimeout(res, ms));
export async function testCerts() {
let key1 = await Fluence.generatePeerId();
let key2 = await Fluence.generatePeerId();
// connect to two different nodes
let cl1 = await Fluence.connect(
'/dns4/134.209.186.43/tcp/9003/ws/p2p/12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb',
key1,
);
let cl2 = await Fluence.connect(
'/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er',
key2,
);
let trustGraph1 = new TrustGraph(cl1);
let trustGraph2 = new TrustGraph(cl2);
let issuedAt = new Date();
let expiresAt = new Date();
// certificate expires after one day
expiresAt.setDate(new Date().getDate() + 1);
// create root certificate for key1 and extend it with key2
let rootCert = await nodeRootCert(key1);
let extended = await issue(key1, key2, rootCert, expiresAt.getTime(), issuedAt.getTime());
// publish certificates to Fluence network
await trustGraph1.publishCertificates(key2.toB58String(), [extended]);
// get certificates from network
let certs = await trustGraph2.getCertificates(key2.toB58String());
// root certificate could be different because nodes save trusts with bigger `expiresAt` date and less `issuedAt` date
expect(certs[0].chain[1].issuedFor.toB58String()).to.be.equal(extended.chain[1].issuedFor.toB58String());
expect(certs[0].chain[1].signature).to.be.equal(extended.chain[1].signature);
expect(certs[0].chain[1].expiresAt).to.be.equal(extended.chain[1].expiresAt);
expect(certs[0].chain[1].issuedAt).to.be.equal(extended.chain[1].issuedAt);
await cl1.disconnect();
await cl2.disconnect();
}

View File

@ -1,21 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export function delay<T>(ms: number, error: string): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(() => reject(new Error(error)), ms);
});
}

View File

@ -13,14 +13,14 @@
"allowSyntheticDefaultImports": true,
"resolveJsonModule": true,
"pretty": true,
"target": "esnext",
"target": "ES5",
"module": "commonjs",
"moduleResolution": "node",
"declaration": true,
"esModuleInterop": true,
"declarationMap": true,
"strict": true,
"noImplicitAny": true,
"noImplicitAny": false,
"alwaysStrict": true,
"noImplicitThis": true,
"strictNullChecks": false
@ -29,7 +29,7 @@
"node_modules",
"dist",
"bundle",
"src/test"
"src/__test__"
],
"include": ["src/**/*"]
}

View File

@ -5,9 +5,7 @@ const HtmlWebpackPlugin = require('html-webpack-plugin');
const production = (process.env.NODE_ENV === 'production');
const config = {
entry: {
app: ['./src/fluence.ts']
},
entry: './src/index.ts',
module: {
rules: [
{