Particle lifecycle (#21)

Complete rethinking and refactoring of the codebase.

The codebase basically consists these 5 moving parts now: 

1. Fluence client (and the Particle processor which might be merged with the client) - This part is responsible for initiating Request flows, managing existing requests flows (it keeps the queue of received particles), pulling right strings on request flows to update their state etc
2. Fluence connection - This part is responsible for connecting to network, sending\receiving particles
3. RequestFlow - This is where the state of particle execution process is kept. It is basically a state storage with some control levers to update the state. Each request flow contains some particle lifecycle methods and the AquaCallHandler where all callback logic is kept
4. RequestFlowBuilder - This is where requests are prepared by the user (the developer of the client application) before they are ready to be sent into the network.
5. AquaCallHandler - This is how interpreter callbacks are handled. It is very similar to express.js app and is made of middlewares. Aqua handler is the unified api for both callbacks for our Request flows and non-ours (i.e services that are expected to be called be other peers). See `AquaHandler.ts` for details
This commit is contained in:
Pavel 2021-03-03 22:01:05 +03:00 committed by GitHub
parent 77ca5502b2
commit b0ed007399
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1828 additions and 1156 deletions

6
package-lock.json generated
View File

@ -1108,9 +1108,9 @@
}
},
"@fluencelabs/aquamarine-interpreter": {
"version": "0.7.0",
"resolved": "https://registry.npmjs.org/@fluencelabs/aquamarine-interpreter/-/aquamarine-interpreter-0.7.0.tgz",
"integrity": "sha512-2GPsOXSakpRPJFiKAcylK6Q/UhYHrQgrs8a1GCgr/OlrQEYkC4PY4HxnrdErt8fzTUDBHH4veKHKGM+IByYhxA=="
"version": "0.7.2",
"resolved": "https://registry.npmjs.org/@fluencelabs/aquamarine-interpreter/-/aquamarine-interpreter-0.7.2.tgz",
"integrity": "sha512-4LrcpeG0ONb3/kTFgt1QNERn9e7aAJBJgqbqNnx81NqFFngTi2xypKIuyPOttcxSdZTH5mpbwwn3JKFimvOvNA=="
},
"@istanbuljs/load-nyc-config": {
"version": "1.1.0",

View File

@ -16,7 +16,7 @@
"author": "Fluence Labs",
"license": "Apache-2.0",
"dependencies": {
"@fluencelabs/aquamarine-interpreter": "^0.7.0",
"@fluencelabs/aquamarine-interpreter": "0.7.2",
"async": "3.2.0",
"base64-js": "1.3.1",
"bs58": "4.0.1",

View File

@ -1,35 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { PeerIdB58 } from './internal/commonTypes';
import Multiaddr from 'multiaddr';
export interface FluenceClient {
readonly relayPeerId: PeerIdB58;
readonly selfPeerId: PeerIdB58;
readonly isConnected: boolean;
disconnect(): Promise<void>;
/**
* Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection.
*
* @param multiaddr
*/
connect(multiaddr: string | Multiaddr): Promise<void>;
sendScript(script: string, data?: Map<string, any>, ttl?: number): Promise<string>;
}

View File

@ -1,7 +1,3 @@
import { generatePeerId } from '..';
import { createClient } from '../api';
import { FluenceClientImpl } from '../internal/FluenceClientImpl';
// Uncomment to test on dev nodes
// export const nodes = [
// {
@ -14,22 +10,15 @@ import { FluenceClientImpl } from '../internal/FluenceClientImpl';
// },
// ];
// start docker container to run integration tests locally
// > docker run --rm -e RUST_LOG="info" -p 1210:1210 -p 4310:4310 fluencelabs/fluence:freeze -t 1210 -w 4310 -k gKdiCSUr1TFGFEgu2t8Ch1XEUsrN5A2UfBLjSZvfci9SPR3NvZpACfcpPGC3eY4zma1pk7UvYv5zb1VjvPHwCjj
/*
* start docker container to run integration tests locally:
docker run --rm -e RUST_LOG="info" -p 1210:1210 -p 4310:4310 fluencelabs/fluence -t 1210 -w 4310 -k gKdiCSUr1TFGFEgu2t8Ch1XEUsrN5A2UfBLjSZvfci9SPR3NvZpACfcpPGC3eY4zma1pk7UvYv5zb1VjvPHwCjj
*/
export const nodes = [
{
multiaddr: '/ip4/127.0.0.1/tcp/4310/ws/p2p/12D3KooWKEprYXUXqoV5xSBeyqrWLpQLLH4PXfvVkDJtmcqmh5V3',
peerId: '12D3KooWKEprYXUXqoV5xSBeyqrWLpQLLH4PXfvVkDJtmcqmh5V3',
},
];
export const createLocalClient = async () => {
const peerId = await generatePeerId();
const client = new FluenceClientImpl(peerId);
await client.local();
return client;
};
export const createConnectedClient = async (node: string) => {
return (await createClient(node)) as FluenceClientImpl;
};

View File

@ -9,16 +9,22 @@ import {
uploadModule,
} from '../../internal/builtins';
import { ModuleConfig } from '../../internal/moduleConfig';
import { checkConnection } from '../../api';
import { generatePeerId } from '../..';
import { FluenceClientImpl } from '../../internal/FluenceClientImpl';
import { createConnectedClient, nodes } from '../connection';
import { createClient, FluenceClient } from '../../api.unstable';
import { nodes } from '../connection';
let client: FluenceClient;
describe('Builtins usage suite', () => {
afterEach(async () => {
if (client) {
await client.disconnect();
}
});
jest.setTimeout(10000);
it('get_modules', async function () {
const client = await createConnectedClient(nodes[0].multiaddr);
client = await createClient(nodes[0].multiaddr);
let modulesList = await getModules(client);
@ -26,7 +32,7 @@ describe('Builtins usage suite', () => {
});
it('get_interfaces', async function () {
const client = await createConnectedClient(nodes[0].multiaddr);
client = await createClient(nodes[0].multiaddr);
let interfaces = await getInterfaces(client);
@ -34,28 +40,15 @@ describe('Builtins usage suite', () => {
});
it('get_blueprints', async function () {
const client = await createConnectedClient(nodes[0].multiaddr);
client = await createClient(nodes[0].multiaddr);
let bpList = await getBlueprints(client);
expect(bpList).not.toBeUndefined;
});
it('check_connection', async function () {
const peerId = await generatePeerId();
const client = new FluenceClientImpl(peerId);
await client.local();
await client.connect(nodes[0].multiaddr);
let isConnected = await checkConnection(client);
expect(isConnected).toEqual(true);
});
it('upload_modules', async function () {
const client = await createConnectedClient(nodes[0].multiaddr);
console.log('peerid: ' + client.selfPeerId);
client = await createClient(nodes[0].multiaddr);
let config: ModuleConfig = {
name: 'test_broken_module',
@ -75,7 +68,7 @@ describe('Builtins usage suite', () => {
});
it('add_blueprint', async function () {
const client = await createConnectedClient(nodes[0].multiaddr);
client = await createClient(nodes[0].multiaddr);
let bpId = 'some';
@ -84,20 +77,19 @@ describe('Builtins usage suite', () => {
expect(bpIdReturned).toEqual(bpId);
});
// FIXME:: there is no error on broken blueprint from a node
it.skip('create_service', async function () {
const client = await createConnectedClient(nodes[0].multiaddr);
it('create broken blueprint', async function () {
client = await createClient(nodes[0].multiaddr);
let serviceId = await createService(client, 'test_broken_blueprint');
let promise = createService(client, 'test_broken_blueprint');
// TODO there is no error on broken blueprint from a node
expect(serviceId).not.toBeUndefined;
await expect(promise).rejects.toMatchObject({
error: expect.stringContaining("Blueprint wasn't found at"),
instruction: expect.stringContaining('blueprint_id'),
});
});
it('add and remove script', async function () {
const client = await createConnectedClient(nodes[0].multiaddr);
console.log('peerid: ' + client.selfPeerId);
client = await createClient(nodes[0].multiaddr);
let script = `
(seq
@ -107,7 +99,7 @@ describe('Builtins usage suite', () => {
`;
let resMakingPromise = new Promise((resolve) => {
client.registerCallback('test', 'test1', (args, _) => {
client.aquaCallHandler.on('test', 'test1', (args, _) => {
resolve([...args]);
return {};
});
@ -117,7 +109,6 @@ describe('Builtins usage suite', () => {
await resMakingPromise
.then((args) => {
console.log('final!');
expect(args as string[]).toEqual(['1', '2', '3']);
})
.finally(() => {

View File

@ -1,201 +1,54 @@
import { encode } from 'bs58';
import { generatePeerId, peerIdToSeed, seedToPeerId } from '../../internal/peerIdUtils';
import { FluenceClientImpl } from '../../internal/FluenceClientImpl';
import log from 'loglevel';
import { createClient, subscribeForErrors } from '../../api';
import { checkConnection, createClient, FluenceClient } from '../../api.unstable';
import Multiaddr from 'multiaddr';
import { createConnectedClient, createLocalClient, nodes } from '../connection';
import { nodes } from '../connection';
import { RequestFlowBuilder } from '../../internal/RequestFlowBuilder';
let client: FluenceClient;
describe('Typescript usage suite', () => {
it('should create private key from seed and back', async function () {
// prettier-ignore
let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201];
let seedStr = encode(seed);
log.trace('SEED STR: ' + seedStr);
let pid = await seedToPeerId(seedStr);
expect(peerIdToSeed(pid)).toEqual(seedStr);
afterEach(async () => {
if (client) {
await client.disconnect();
}
});
describe('should make connection to network', function () {
const testProcedure = async (client: FluenceClientImpl) => {
let resMakingPromise = new Promise((resolve) => {
client.registerCallback('test', 'test', (args, _) => {
resolve(args);
return {};
});
});
let script = `
(seq
(call "${client.relayPeerId}" ("op" "identity") [])
(call "${client.selfPeerId}" ("test" "test") [hello])
)
`;
let data: Map<string, any> = new Map();
data.set('hello', 'world');
await client.sendScript(script, data);
return await resMakingPromise;
};
it('address as string', async function () {
// arrange
const addr = nodes[0].multiaddr;
// act
const client = (await createClient(addr)) as FluenceClientImpl;
// assert
const res = await testProcedure(client);
expect(res).toEqual(['world']);
});
it('address as multiaddr', async function () {
// arrange
const addr = new Multiaddr(nodes[0].multiaddr);
// act
const client = (await createClient(addr)) as FluenceClientImpl;
// assert
const res = await testProcedure(client);
expect(res).toEqual(['world']);
});
it('address as node', async function () {
// arrange
const addr = nodes[0];
// act
const client = (await createClient(addr)) as FluenceClientImpl;
// assert
const res = await testProcedure(client);
expect(res).toEqual(['world']);
});
it('peerid as peer id', async function () {
// arrange
const addr = nodes[0].multiaddr;
const pid = await generatePeerId();
// act
const client = (await createClient(addr, pid)) as FluenceClientImpl;
// assert
const res = await testProcedure(client);
expect(res).toEqual(['world']);
});
it('peerid as seed', async function () {
// arrange
const addr = nodes[0].multiaddr;
const pid = peerIdToSeed(await generatePeerId());
// act
const client = (await createClient(addr, pid)) as FluenceClientImpl;
// assert
const res = await testProcedure(client);
expect(res).toEqual(['world']);
});
});
it('should make a call through the network', async function () {
it('should make a call through network', async () => {
// arrange
const client = await createConnectedClient(nodes[0].multiaddr);
client.registerCallback('test', 'test', (args, _) => {
log.trace('should make a call through the network, called "test" "test" with args', args);
return {};
});
let resMakingPromise = new Promise((resolve) => {
client.registerCallback('test', 'reverse_args', (args, _) => {
resolve([...args].reverse());
return {};
});
});
client = await createClient();
await client.connect(nodes[0].multiaddr);
// act
let script = `
(seq
(call "${client.relayPeerId}" ("op" "identity") [])
(seq
(call "${client.selfPeerId}" ("test" "test") [a b c d] result)
(call "${client.selfPeerId}" ("test" "reverse_args") [a b c d])
)
const [request, promise] = new RequestFlowBuilder()
.withRawScript(
`(seq
(call init_relay ("op" "identity") ["hello world!"] result)
(call %init_peer_id% ("callback" "callback") [result])
)`,
)
`;
let data: Map<string, any> = new Map();
data.set('a', 'some a');
data.set('b', 'some b');
data.set('c', 'some c');
data.set('d', 'some d');
await client.sendScript(script, data);
.buildAsFetch<[[string]]>('callback', 'callback');
await client.initiateFlow(request);
// assert
const res = await resMakingPromise;
expect(res).toEqual(['some d', 'some c', 'some b', 'some a']);
const [[result]] = await promise;
expect(result).toBe('hello world!');
});
it('fireAndForget should work', async function () {
it('check connection should work', async function () {
client = await createClient();
await client.connect(nodes[0].multiaddr);
let isConnected = await checkConnection(client);
expect(isConnected).toEqual(true);
});
it('two clients should work inside the same time browser', async () => {
// arrange
const client = await createConnectedClient(nodes[0].multiaddr);
const client1 = await createClient(nodes[0].multiaddr);
const client2 = await createClient(nodes[0].multiaddr);
let resMakingPromise = new Promise((resolve) => {
client.registerCallback('test', 'reverse_args', (args, _) => {
resolve([...args].reverse());
return {};
});
});
// act
let script = `
(call "${client.selfPeerId}" ("test" "reverse_args") [a b c d])
`;
let data: Map<string, any> = new Map();
data.set('a', 'some a');
data.set('b', 'some b');
data.set('c', 'some c');
data.set('d', 'some d');
await client.fireAndForget(script, data);
// assert
const res = await resMakingPromise;
expect(res).toEqual(['some d', 'some c', 'some b', 'some a']);
});
it('fetch should work', async function () {
// arrange
const client = await createConnectedClient(nodes[0].multiaddr);
// act
let script = `
(call "${client.relayPeerId}" ("peer" "identify") [] result)
`;
const data = new Map();
data.set('__relay', client.relayPeerId);
const [res] = await client.fetch(script, ['result'], data);
// assert
expect(res.external_addresses).not.toBeUndefined;
});
it('two clients should work inside the same time browser', async function () {
// arrange
const client1 = await createConnectedClient(nodes[0].multiaddr);
const client2 = await createConnectedClient(nodes[0].multiaddr);
let resMakingPromise = new Promise((resolve) => {
client2.registerCallback('test', 'test', (args, _) => {
client2.aquaCallHandler.onEvent('test', 'test', (args, _) => {
resolve([...args]);
return {};
});
@ -214,29 +67,93 @@ describe('Typescript usage suite', () => {
data.set('c', 'some c');
data.set('d', 'some d');
await client1.sendScript(script, data);
await client1.initiateFlow(new RequestFlowBuilder().withRawScript(script).withVariables(data).build());
let res = await resMakingPromise;
expect(res).toEqual(['some a', 'some b', 'some c', 'some d']);
await client1.disconnect();
await client2.disconnect();
});
describe('should make connection to network', () => {
it('address as string', async () => {
// arrange
const addr = nodes[0].multiaddr;
// act
client = await createClient(addr);
const isConnected = await checkConnection(client);
// assert
expect(isConnected).toBeTruthy;
});
it('address as multiaddr', async () => {
// arrange
const addr = new Multiaddr(nodes[0].multiaddr);
// act
client = await createClient(addr);
const isConnected = await checkConnection(client);
// assert
expect(isConnected).toBeTruthy;
});
it('address as node', async () => {
// arrange
const addr = nodes[0];
// act
client = await createClient(addr);
const isConnected = await checkConnection(client);
// assert
expect(isConnected).toBeTruthy;
});
it('peerid as peer id', async () => {
// arrange
const addr = nodes[0].multiaddr;
// act
client = await createClient(addr);
const isConnected = await checkConnection(client);
// assert
expect(isConnected).toBeTruthy;
});
it('peerid as seed', async () => {
// arrange
const addr = nodes[0].multiaddr;
// act
client = await createClient(addr);
const isConnected = await checkConnection(client);
// assert
expect(isConnected).toBeTruthy;
});
});
it('xor handling should work with connected client', async function () {
// arrange
const client = await createConnectedClient(nodes[0].multiaddr);
log.setLevel('info');
const [request, promise] = new RequestFlowBuilder()
.withRawScript(
`
(seq
(call init_relay ("op" "identity") [])
(call init_relay ("incorrect" "service") ["incorrect_arg"])
)
`,
)
.buildWithErrorHandling();
// act
let script = `
(seq
(call relay ("op" "identity") [])
(call relay ("incorrect" "service") ["incorrect_arg"])
)
`;
const data = new Map();
data.set('relay', client.relayPeerId);
const promise = subscribeForErrors(client, 7000);
await client.sendScript(script, data);
client = await createClient(nodes[0].multiaddr);
await client.initiateFlow(request);
// assert
await expect(promise).rejects.toMatchObject({
@ -247,18 +164,25 @@ describe('Typescript usage suite', () => {
it('xor handling should work with local client', async function () {
// arrange
const client = await createLocalClient();
const [request, promise] = new RequestFlowBuilder()
.withRawScript(
`
(call %init_peer_id% ("service" "fails") [])
`,
)
.configHandler((h) => {
h.use((req, res, _) => {
res.retCode = 1;
res.result = 'service failed internally';
});
})
.buildWithErrorHandling();
// act
let script = `(call %init_peer_id% ("incorrect" "service") ["incorrect_arg"])`;
const promise = subscribeForErrors(client, 7000);
await client.sendScript(script);
client = await createClient();
await client.initiateFlow(request);
// assert
await expect(promise).rejects.toMatchObject({
error: expect.stringContaining('There is no service: incorrect'),
instruction: expect.stringContaining('incorrect'),
});
await expect(promise).rejects.toMatch('service failed internally');
});
});

View File

@ -0,0 +1,127 @@
import {
createClient,
Particle,
FluenceClient,
sendParticle,
registerServiceFunction,
subscribeToEvent,
sendParticleAsFetch,
} from '../../api';
import { nodes } from '../connection';
let client: FluenceClient;
describe('Legacy api suite', () => {
it('sendParticle', async () => {
client = await createClient(nodes[0]);
const result = new Promise((resolve) => {
subscribeToEvent(client, 'callback', 'callback', (args) => {
resolve(args[0]);
});
});
const script = `(seq
(call init_relay ("op" "identity") [])
(call %init_peer_id% ("callback" "callback") [arg])
)`;
const data = {
arg: 'hello world!',
};
await sendParticle(client, new Particle(script, data, 7000));
expect(await result).toBe('hello world!');
});
it('sendParticle Error', async () => {
client = await createClient(nodes[0]);
const script = `
(call init_relay ("incorrect" "service") [])
`;
const promise = new Promise((resolve, reject) => {
sendParticle(client, new Particle(script), reject);
});
await expect(promise).rejects.toMatchObject({
error: expect.stringContaining("Service with id 'incorrect' not found"),
instruction: expect.stringContaining('incorrect'),
});
});
it('sendParticleAsFetch', async () => {
client = await createClient(nodes[0]);
const script = `(seq
(call init_relay ("op" "identity") [])
(call %init_peer_id% ("service" "fn") [arg])
)`;
const data = {
arg: 'hello world!',
};
const [result] = await sendParticleAsFetch<[string]>(client, new Particle(script, data, 7000), 'fn', 'service');
expect(result).toBe('hello world!');
});
it('sendParticleAsFetch Error', async () => {
client = await createClient(nodes[0]);
const script = `
(call init_relay ("incorrect" "service") [])
`;
const promise = sendParticleAsFetch<[string]>(client, new Particle(script), 'fn', 'service');
await expect(promise).rejects.toMatchObject({
error: expect.stringContaining("Service with id 'incorrect' not found"),
instruction: expect.stringContaining('incorrect'),
});
});
it('registerServiceFunction', async () => {
client = await createClient(nodes[0]);
registerServiceFunction(client, 'service', 'fn', (args) => {
return { res: args[0] + ' world!' };
});
const script = `(seq
(call %init_peer_id% ("service" "fn") ["hello"] result)
(call %init_peer_id% ("callback" "callback") [result])
)`;
const [result] = await sendParticleAsFetch<[string]>(
client,
new Particle(script, {}, 7000),
'callback',
'callback',
);
expect(result).toEqual({ res: 'hello world!' });
});
it('subscribeToEvent', async () => {
client = await createClient(nodes[0]);
const promise = new Promise((resolve) => {
subscribeToEvent(client, 'service', 'fn', (args) => {
resolve(args[0] + ' world!');
});
});
const script = `
(call %init_peer_id% ("service" "fn") ["hello"])
`;
await sendParticle(client, new Particle(script, {}, 7000));
const result = await promise;
expect(result).toBe('hello world!');
});
});

View File

@ -0,0 +1,323 @@
import { AquaCallHandler, errorHandler } from '../../internal/AquaHandler';
import { ResultCodes } from '../../internal/commonTypes';
const req = () => ({
serviceId: 'service',
fnName: 'fn name',
args: [],
tetraplets: [],
particleContext: {
particleId: 'id',
},
});
const res = () => ({
res,
});
describe('Aqua handler tests', () => {
it('Should work without middlewares', () => {
// arrange
const handler = new AquaCallHandler();
// act
const res = handler.execute(req());
// assert
expect(res).not.toBeUndefined();
});
it('Should work with no-op middleware', () => {
// arrange
const handler = new AquaCallHandler();
handler.use((req, res, next) => {
next();
});
// act
const res = handler.execute(req());
// assert
expect(res).not.toBeUndefined();
});
it('Should work with two overlapping middlewares', () => {
// arrange
const handler = new AquaCallHandler();
handler
.use((req, res, next) => {
res.result = { hello: 'world' };
})
.use((req, res, next) => {
res.result = { hello: 'incorect' };
next();
});
// act
const res = handler.execute(req());
// assert
expect(res).toMatchObject({
result: { hello: 'world' },
});
});
it('Should work with two NON-overlapping middlewares', () => {
// arrange
const handler = new AquaCallHandler();
handler
.use((req, res, next) => {
res.result = {};
next();
})
.use((req, res, next) => {
res.result.name = 'john';
next();
})
.use((req, res, next) => {
res.result.color = 'red';
next();
});
// act
const res = handler.execute(req());
// assert
expect(res).toMatchObject({
result: { name: 'john', color: 'red' },
});
});
it('Should work with provided error handling middleware', () => {
// arrange
const handler = new AquaCallHandler();
handler.use(errorHandler);
handler.use((req, res, next) => {
throw new Error('some error');
});
// act
const res = handler.execute(req());
// assert
expect(res).toMatchObject({
retCode: ResultCodes.exceptionInHandler,
result: 'Error: some error',
});
});
describe('Service handler tests', () => {
it('Should register service function', () => {
// arrange
const handler = new AquaCallHandler();
handler.on('service', 'function', (args) => {
return { called: args };
});
// act
const res = handler.execute({
...req(),
serviceId: 'service',
fnName: 'function',
args: ['hello', 'world'],
});
// assert
expect(res).toMatchObject({
retCode: ResultCodes.success,
result: { called: ['hello', 'world'] },
});
});
it('Should UNregister service function', () => {
// arrange
const handler = new AquaCallHandler();
const unreg = handler.on('service', 'function', (args) => {
return { called: args };
});
unreg();
// act
const res = handler.execute({
...req(),
serviceId: 'service',
fnName: 'function',
args: ['hello', 'world'],
});
// assert
expect(res).toMatchObject({
retCode: ResultCodes.unkownError,
});
});
it('Should register event', async () => {
// arrange
const handler = new AquaCallHandler();
const returnPromise = new Promise((resolve) => {
handler.onEvent('service', 'function', (args) => {
resolve({ called: args });
});
});
handler.onEvent('service', 'function', (args) => {
return { called: args };
});
// act
const res = handler.execute({
...req(),
serviceId: 'service',
fnName: 'function',
args: ['hello', 'world'],
});
// assert
await expect(returnPromise).resolves.toMatchObject({ called: ['hello', 'world'] });
});
it('Should UNregister event', () => {
// arrange
const handler = new AquaCallHandler();
const unreg = handler.onEvent('service', 'function', (args) => {
// don't care
});
unreg();
// act
const res = handler.execute({
...req(),
serviceId: 'service',
fnName: 'function',
args: ['hello', 'world'],
});
// assert
expect(res).toMatchObject({
retCode: ResultCodes.unkownError,
});
});
it('Should register multiple service functions', () => {
// arrange
const handler = new AquaCallHandler();
handler.on('service', 'function1', (args) => {
return 'called function1';
});
handler.on('service', 'function2', (args) => {
return 'called function2';
});
// act
const res1 = handler.execute({
...req(),
serviceId: 'service',
fnName: 'function1',
});
const res2 = handler.execute({
...req(),
serviceId: 'service',
fnName: 'function2',
});
// assert
expect(res1).toMatchObject({
retCode: ResultCodes.success,
result: 'called function1',
});
expect(res2).toMatchObject({
retCode: ResultCodes.success,
result: 'called function2',
});
});
it('Should override previous function registration', () => {
// arrange
const handler = new AquaCallHandler();
handler.on('service', 'function', (args) => {
return { called: args };
});
handler.on('service', 'function', (args) => {
return 'overridden';
});
// act
const res = handler.execute({
...req(),
serviceId: 'service',
fnName: 'function',
});
// assert
expect(res).toMatchObject({
retCode: ResultCodes.success,
result: 'overridden',
});
});
});
describe('Middleware combination tests', () => {
it('Should work with NON overlapping function registration', () => {
// arrange
const base = new AquaCallHandler();
base.on('service', 'function1', (args) => {
return 'called function1';
});
const another = new AquaCallHandler();
base.on('service', 'function2', (args) => {
return 'called function2';
});
base.combineWith(another);
// act
const res1 = base.execute({
...req(),
serviceId: 'service',
fnName: 'function1',
});
const res2 = base.execute({
...req(),
serviceId: 'service',
fnName: 'function2',
});
// assert
expect(res1).toMatchObject({
retCode: ResultCodes.success,
result: 'called function1',
});
expect(res2).toMatchObject({
retCode: ResultCodes.success,
result: 'called function2',
});
});
it('Should work with overlapping function registration', () => {
// arrange
const base = new AquaCallHandler();
base.on('service', 'function', (args) => {
return { called: args };
});
const another = new AquaCallHandler();
another.on('service', 'function', (args) => {
return 'overridden';
});
base.combineWith(another);
// act
const res = base.execute({
...req(),
serviceId: 'service',
fnName: 'function',
});
// assert
expect(res).toMatchObject({
retCode: ResultCodes.success,
result: 'overridden',
});
});
});
});

View File

@ -0,0 +1,31 @@
import PeerId from 'peer-id';
import { genUUID } from '../../internal/particle';
import { seedToPeerId } from '../../internal/peerIdUtils';
import { RequestFlow } from '../../internal/RequestFlow';
describe('Request flow tests', () => {
it('particle initiation should work', async () => {
// arrange
jest.useFakeTimers();
const seed = '4vzv3mg6cnjpEK24TXXLA3Ye7QrvKWPKqfbDvAKAyLK6';
const mockDate = new Date(Date.UTC(2021, 2, 14)).valueOf();
Date.now = jest.fn(() => mockDate);
const request = RequestFlow.createLocal('(null)', 10000);
const peerId = await seedToPeerId(seed);
// act
await request.initState(peerId);
// assert
const particle = request.getParticle();
expect(particle).toMatchObject({
init_peer_id: peerId.toB58String(),
script: '(null)',
signature: '',
timestamp: mockDate,
ttl: 10000,
});
expect(setTimeout).toHaveBeenCalledTimes(1);
});
});

View File

@ -1,26 +1,34 @@
import { createLocalClient } from '../connection';
import {subscribeForErrors} from "../../api";
import { createClient, FluenceClient } from '../../api.unstable';
import { RequestFlow } from '../../internal/RequestFlow';
import { RequestFlowBuilder } from '../../internal/RequestFlowBuilder';
let client: FluenceClient;
describe('== AIR suite', () => {
afterEach(async () => {
if (client) {
await client.disconnect();
}
});
it('check init_peer_id', async function () {
// arrange
const serviceId = 'test_service';
const fnName = 'return_first_arg';
const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [%init_peer_id%])`;
const client = await createLocalClient();
let res;
client.registerCallback(serviceId, fnName, (args, _) => {
res = args[0];
return res;
});
// prettier-ignore
const [request, promise] = new RequestFlowBuilder()
.withRawScript(script)
.buildAsFetch<string[]>(serviceId, fnName);
// act
const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [%init_peer_id%])`;
await client.sendScript(script);
client = await createClient();
await client.initiateFlow(request);
const [result] = await promise;
// assert
expect(res).toEqual(client.selfPeerId);
expect(result).toBe(client.selfPeerId);
});
it('call local function', async function () {
@ -28,10 +36,10 @@ describe('== AIR suite', () => {
const serviceId = 'test_service';
const fnName = 'return_first_arg';
const client = await createLocalClient();
client = await createClient();
let res;
client.registerCallback(serviceId, fnName, (args, _) => {
client.aquaCallHandler.on(serviceId, fnName, (args, _) => {
res = args[0];
return res;
});
@ -39,69 +47,66 @@ describe('== AIR suite', () => {
// act
const arg = 'hello';
const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") ["${arg}"])`;
await client.sendScript(script);
await client.initiateFlow(RequestFlow.createLocal(script));
// assert
expect(res).toEqual(arg);
});
it('call broken script', async function () {
// arrange
const client = await createLocalClient();
const script = `(incorrect)`;
describe('error handling', () => {
it('call broken script', async function () {
// arrange
const script = `(incorrect)`;
// prettier-ignore
const [request, error] = new RequestFlowBuilder()
.withRawScript(script)
.buildWithErrorHandling();
// act
const promise = client.sendScript(script);
// act
client = await createClient();
await client.initiateFlow(request);
// assert
await expect(promise).rejects.toContain("aqua script can't be parsed");
});
// assert
await expect(error).rejects.toContain("aqua script can't be parsed");
});
it('call script without ttl', async function () {
// arrange
const client = await createLocalClient();
const script = `(call %init_peer_id% ("op" "identity") [""])`;
it('call script without ttl', async function () {
// arrange
const script = `(null)`;
// prettier-ignore
const [request, promise] = new RequestFlowBuilder()
.withTTL(0)
.withRawScript(script)
.buildAsFetch();
// act
const promise = client.sendScript(script, undefined, 0);
// act
client = await createClient();
await client.initiateFlow(request);
// assert
await expect(promise).rejects.toContain('Particle expired');
});
it.skip('call broken script by fetch', async function () {
// arrange
const client = await createLocalClient();
const script = `(incorrect)`;
// act
const promise = client.fetch(script, ['result']);
// assert
await expect(promise).rejects.toContain("aqua script can't be parsed");
// assert
await expect(promise).rejects.toContain('Timed out after');
});
});
it('check particle arguments', async function () {
// arrange
const serviceId = 'test_service';
const fnName = 'return_first_arg';
const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [arg1])`;
const client = await createLocalClient();
let res;
client.registerCallback(serviceId, fnName, (args, _) => {
res = args[0];
return res;
});
// prettier-ignore
const [request, promise] = new RequestFlowBuilder()
.withRawScript(script)
.withVariable('arg1', 'hello')
.buildAsFetch<string[]>(serviceId, fnName);
// act
const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [arg1])`;
const data = new Map();
data.set('arg1', 'hello');
await client.sendScript(script, data);
client = await createClient();
await client.initiateFlow(request);
const [result] = await promise;
// assert
expect(res).toEqual('hello');
expect(result).toEqual('hello');
});
it('check security tetraplet', async function () {
@ -111,15 +116,15 @@ describe('== AIR suite', () => {
const getDataServiceId = 'get_data_service';
const getDataFnName = 'get_data';
const client = await createLocalClient();
client = await createClient();
client.registerCallback(makeDataServiceId, makeDataFnName, (args, _) => {
client.aquaCallHandler.on(makeDataServiceId, makeDataFnName, (args, _) => {
return {
field: 42,
};
});
let res;
client.registerCallback(getDataServiceId, getDataFnName, (args, tetraplets) => {
client.aquaCallHandler.on(getDataServiceId, getDataFnName, (args, tetraplets) => {
res = {
args: args,
tetraplets: tetraplets,
@ -133,7 +138,7 @@ describe('== AIR suite', () => {
(call %init_peer_id% ("${makeDataServiceId}" "${makeDataFnName}") [] result)
(call %init_peer_id% ("${getDataServiceId}" "${getDataFnName}") [result.$.field])
)`;
await client.sendScript(script);
await client.initiateFlow(new RequestFlowBuilder().withRawScript(script).build());
// assert
const tetraplet = res.tetraplets[0][0];
@ -146,12 +151,12 @@ describe('== AIR suite', () => {
it('check chain of services work properly', async function () {
// arrange
const client = await createLocalClient();
client = await createClient();
const serviceId1 = 'check1';
const fnName1 = 'fn1';
let res1;
client.registerCallback(serviceId1, fnName1, (args, _) => {
client.aquaCallHandler.on(serviceId1, fnName1, (args, _) => {
res1 = args[0];
return res1;
});
@ -159,7 +164,7 @@ describe('== AIR suite', () => {
const serviceId2 = 'check2';
const fnName2 = 'fn2';
let res2;
client.registerCallback(serviceId2, fnName2, (args, _) => {
client.aquaCallHandler.on(serviceId2, fnName2, (args, _) => {
res2 = args[0];
return res2;
});
@ -167,7 +172,7 @@ describe('== AIR suite', () => {
const serviceId3 = 'check3';
const fnName3 = 'fn3';
let res3;
client.registerCallback(serviceId3, fnName3, (args, _) => {
client.aquaCallHandler.on(serviceId3, fnName3, (args, _) => {
res3 = args;
return res3;
});
@ -182,7 +187,7 @@ describe('== AIR suite', () => {
(call %init_peer_id% ("${serviceId2}" "${fnName2}") ["${arg2}"] result2))
(call %init_peer_id% ("${serviceId3}" "${fnName3}") [result1 result2]))
`;
await client.sendScript(script);
await client.initiateFlow(new RequestFlowBuilder().withRawScript(script).build());
// assert
expect(res1).toEqual(arg1);

View File

@ -1,8 +1,9 @@
import { parseAIR } from '../../internal/stepper';
import { AquamarineInterpreter } from '../../internal/aqua/interpreter';
describe('== AST parsing suite', () => {
it('parse simple script and return ast', async function () {
let ast = await parseAIR(`
const interpreter = await AquamarineInterpreter.create({} as any);
let ast = interpreter.parseAir(`
(call node ("service" "function") [1 2 3 arg] output)
`);

View File

@ -0,0 +1,13 @@
import { encode } from 'bs58';
import { peerIdToSeed, seedToPeerId } from '../..';
describe('Peer Id utils', () => {
it('should create private key from seed and back', async function () {
// prettier-ignore
let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201];
let seedStr = encode(seed);
let pid = await seedToPeerId(seedStr);
expect(peerIdToSeed(pid)).toEqual(seedStr);
});
});

View File

@ -1,11 +1,42 @@
import { FluenceClient } from './FluenceClient';
import { SecurityTetraplet } from './internal/commonTypes';
import { Particle } from './internal/particle';
import Multiaddr from 'multiaddr';
import PeerId, { isPeerId } from 'peer-id';
import { generatePeerId, seedToPeerId } from './internal/peerIdUtils';
import { FluenceClientImpl } from './internal/FluenceClientImpl';
import log from 'loglevel';
import PeerId from 'peer-id';
import { PeerIdB58, SecurityTetraplet } from './internal/commonTypes';
import * as unstable from './api.unstable';
import { ClientImpl } from './internal/ClientImpl';
import { RequestFlowBuilder } from './internal/RequestFlowBuilder';
import { RequestFlow } from './internal/RequestFlow';
/**
* The class represents interface to Fluence Platform. To create a client @see {@link createClient} function.
*/
export interface FluenceClient {
/**
* { string } Gets the base58 representation of the current peer id. Read only
*/
readonly relayPeerId: PeerIdB58;
/**
* { string } Gets the base58 representation of the connected relay's peer id. Read only
*/
readonly selfPeerId: PeerIdB58;
/**
* { string } True if the client is connected to network. False otherwise. Read only
*/
readonly isConnected: boolean;
/**
* Disconnects the client from the network
*/
disconnect(): Promise<void>;
/**
* Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection.
*
* @param {string | Multiaddr} [multiaddr] - Address of the node in Fluence network.
*/
connect(multiaddr: string | Multiaddr): Promise<void>;
}
type Node = {
peerId: string;
@ -22,44 +53,77 @@ export const createClient = async (
connectTo?: string | Multiaddr | Node,
peerIdOrSeed?: PeerId | string,
): Promise<FluenceClient> => {
let peerId;
if (!peerIdOrSeed) {
peerId = await generatePeerId();
} else if (isPeerId(peerIdOrSeed)) {
// keep unchanged
peerId = peerIdOrSeed;
} else {
// peerId is string, therefore seed
peerId = await seedToPeerId(peerIdOrSeed);
}
const client = new FluenceClientImpl(peerId);
if (connectTo) {
let theAddress: Multiaddr;
let fromNode = (connectTo as any).multiaddr;
if (fromNode) {
theAddress = new Multiaddr(fromNode);
} else {
theAddress = new Multiaddr(connectTo as string);
}
await client.connect(theAddress);
if (!(await checkConnection(client))) {
throw new Error('Connection check failed. Check if the node is working or try to connect to another node');
}
}
return client;
const res = await unstable.createClient(connectTo, peerIdOrSeed);
return res as any;
};
export const checkConnection = async (client: FluenceClient): Promise<boolean> => {
return unstable.checkConnection(client as any);
};
/**
* The class representing Particle - a data structure used to perform operations on Fluence Network. It originates on some peer in the network, travels the network through a predefined path, triggering function execution along its way.
*/
export class Particle {
script: string;
data: Map<string, any>;
ttl: number;
/**
* Creates a particle with specified parameters.
* @param { String }script - Air script which defines the execution of a particle its path, functions it triggers on peers, and so on.
* @param { Map<string, any> | Record<string, any> } data - Variables passed to the particle in the form of either JS Map or JS object with keys representing variable names and values representing values correspondingly
* @param { [Number]=7000 } ttl - Time to live, a timout after which the particle execution is stopped by Aquamarine.
*/
constructor(script: string, data?: Map<string, any> | Record<string, any>, ttl?: number) {
this.script = script;
if (data === undefined) {
this.data = new Map();
} else if (data instanceof Map) {
this.data = data;
} else {
this.data = new Map();
for (let k in data) {
this.data.set(k, data[k]);
}
}
this.ttl = ttl ?? 7000;
}
}
/**
* Send a particle to Fluence Network using the specified Fluence Client.
* @param { FluenceClient } client - The Fluence Client instance.
* @param { Particle } particle - The particle to send.
*/
export const sendParticle = async (client: FluenceClient, particle: Particle): Promise<string> => {
return await client.sendScript(particle.script, particle.data, particle.ttl);
export const sendParticle = async (
client: FluenceClient,
particle: Particle,
onError?: (err) => void,
): Promise<string> => {
const c = client as ClientImpl;
const [req, errorPromise] = new RequestFlowBuilder()
.withRawScript(particle.script)
.withVariables(particle.data)
.withTTL(particle.ttl)
.buildWithErrorHandling();
errorPromise.catch(onError);
await c.initiateFlow(req);
return req.id;
};
/*
This map stores functions which unregister callbacks registered by registerServiceFunction
The key sould be created with makeKey. The value is the unresitration function
This is only needed to support legacy api
*/
const handlersUnregistratorsMap = new Map();
const makeKey = (client: FluenceClient, serviceId: string, fnName: string) => {
const pid = client.selfPeerId || '';
return `${pid}/${serviceId}/${fnName}`;
};
/**
@ -75,7 +139,8 @@ export const registerServiceFunction = (
fnName: string,
handler: (args: any[], tetraplets: SecurityTetraplet[][]) => object,
) => {
(client as FluenceClientImpl).registerCallback(serviceId, fnName, handler);
const unregister = (client as ClientImpl).aquaCallHandler.on(serviceId, fnName, handler);
handlersUnregistratorsMap.set(makeKey(client, serviceId, fnName), unregister);
};
// prettier-ignore
@ -90,7 +155,12 @@ export const unregisterServiceFunction = (
serviceId: string,
fnName: string
) => {
(client as FluenceClientImpl).unregisterCallback(serviceId, fnName);
const key = makeKey(client, serviceId, fnName);
const unuse = handlersUnregistratorsMap.get(key);
if(unuse) {
unuse();
}
handlersUnregistratorsMap.delete(key);
};
/**
@ -136,79 +206,13 @@ export const sendParticleAsFetch = async <T>(
callbackFnName: string,
callbackServiceId: string = '_callback',
): Promise<T> => {
const serviceId = callbackServiceId;
const fnName = callbackFnName;
const [request, promise] = new RequestFlowBuilder()
.withRawScript(particle.script)
.withVariables(particle.data)
.withTTL(particle.ttl)
.buildAsFetch<T>(callbackServiceId, callbackFnName);
let promise: Promise<T> = new Promise(function (resolve, reject) {
const unsub = subscribeToEvent(client, serviceId, fnName, (args: any[], _) => {
unsub();
resolve(args as any);
});
setTimeout(() => {
unsub();
reject(new Error(`callback for ${callbackServiceId}/${callbackFnName} timed out after ${particle.ttl}`));
}, particle.ttl);
});
await sendParticle(client, particle);
await (client as ClientImpl).initiateFlow(request);
return promise;
};
export const checkConnection = async (client: FluenceClient): Promise<boolean> => {
let msg = Math.random().toString(36).substring(7);
let callbackFn = 'checkConnection';
let callbackService = '_callback';
const particle = new Particle(
`
(seq
(call __relay ("op" "identity") [msg] result)
(call myPeerId ("${callbackService}" "${callbackFn}") [result])
)
`,
{
__relay: client.relayPeerId,
myPeerId: client.selfPeerId,
msg,
},
);
if (!client.isConnected) {
return false;
}
try {
let result = await sendParticleAsFetch<string[][]>(client, particle, callbackFn, callbackService);
if (result[0][0] != msg) {
log.warn("unexpected behavior. 'identity' must return arguments the passed arguments.");
}
return true;
} catch (e) {
log.error('Error on establishing connection: ', e);
return false;
}
};
export const subscribeForErrors = (client: FluenceClient, ttl: number): Promise<void> => {
return new Promise((resolve, reject) => {
registerServiceFunction(client, '__magic', 'handle_xor', (args, _) => {
setTimeout(() => {
try {
reject(JSON.parse(args[0]));
} catch {
reject(args);
}
}, 0);
unregisterServiceFunction(client, '__magic', 'handle_xor');
return {};
});
setTimeout(() => {
unregisterServiceFunction(client, '__magic', 'handle_xor');
resolve();
}, ttl);
});
};

143
src/api.unstable.ts Normal file
View File

@ -0,0 +1,143 @@
import Multiaddr from 'multiaddr';
import PeerId, { isPeerId } from 'peer-id';
import { generatePeerId, seedToPeerId } from './internal/peerIdUtils';
import { ClientImpl } from './internal/ClientImpl';
import log from 'loglevel';
import { RequestFlowBuilder } from './internal/RequestFlowBuilder';
import { PeerIdB58 } from './internal/commonTypes';
import { AquaCallHandler } from './internal/AquaHandler';
import { RequestFlow } from './internal/RequestFlow';
export { RequestFlowBuilder } from './internal/RequestFlowBuilder';
/**
* The class represents interface to Fluence Platform. To create a client use @see {@link createClient} function.
*/
export interface FluenceClient {
/**
* { string } Gets the base58 representation of the current peer id. Read only
*/
readonly relayPeerId: PeerIdB58;
/**
* { string } Gets the base58 representation of the connected relay's peer id. Read only
*/
readonly selfPeerId: PeerIdB58;
/**
* { string } True if the client is connected to network. False otherwise. Read only
*/
readonly isConnected: boolean;
/**
* The base handler which is used by every RequestFlow executed by this FluenceClient.
* Please note, that the handler is combined with the handler from RequestFlow before the execution occures.
* After this combination, middlewares from RequestFlow are executed before client handler's middlewares.
*/
readonly aquaCallHandler: AquaCallHandler;
/**
* Disconnects the client from the network
*/
disconnect(): Promise<void>;
/**
* Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection.
*
* @param multiaddr
*/
connect(multiaddr: string | Multiaddr): Promise<void>;
/**
* Initiates RequestFlow execution @see { @link RequestFlow }
* @param { RequestFlow } [ request ] - RequestFlow to start the execution of
*/
initiateFlow(request: RequestFlow): Promise<void>;
}
type Node = {
peerId: string;
multiaddr: string;
};
/**
* Creates a Fluence client. If the `connectTo` is specified connects the client to the network
* @param { string | Multiaddr | Node } [connectTo] - Node in Fluence network to connect to. If not specified client will not be connected to the n
* @param { PeerId | string } [peerIdOrSeed] - The Peer Id of the created client. Specified either as PeerId structure or as seed string. Will be generated randomly if not specified
* @returns { Promise<FluenceClient> } Promise which will be resolved with the created FluenceClient
*/
export const createClient = async (
connectTo?: string | Multiaddr | Node,
peerIdOrSeed?: PeerId | string,
): Promise<FluenceClient> => {
let peerId;
if (!peerIdOrSeed) {
peerId = await generatePeerId();
} else if (isPeerId(peerIdOrSeed)) {
// keep unchanged
peerId = peerIdOrSeed;
} else {
// peerId is string, therefore seed
peerId = await seedToPeerId(peerIdOrSeed);
}
const client = new ClientImpl(peerId);
if (connectTo) {
let theAddress: Multiaddr;
let fromNode = (connectTo as any).multiaddr;
if (fromNode) {
theAddress = new Multiaddr(fromNode);
} else {
theAddress = new Multiaddr(connectTo as string);
}
await client.connect(theAddress);
if (!(await checkConnection(client))) {
throw new Error('Connection check failed. Check if the node is working or try to connect to another node');
}
} else {
await client.local();
}
return client;
};
/**
* Checks the network connection by sending a ping-like request to relat node
* @param { FluenceClient } client - The Fluence Client instance.
*/
export const checkConnection = async (client: FluenceClient): Promise<boolean> => {
if (!client.isConnected) {
return false;
}
const msg = Math.random().toString(36).substring(7);
const callbackFn = 'checkConnection';
const callbackService = '_callback';
const [request, promise] = new RequestFlowBuilder()
.withRawScript(
`(seq
(call init_relay ("op" "identity") [msg] result)
(call %init_peer_id% ("${callbackService}" "${callbackFn}") [result])
)`,
)
.withVariables({
msg,
})
.buildAsFetch<[[string]]>(callbackService, callbackFn);
await client.initiateFlow(request);
try {
const [[result]] = await promise;
if (result != msg) {
log.warn("unexpected behavior. 'identity' must return arguments the passed arguments.");
}
return true;
} catch (e) {
log.error('Error on establishing connection: ', e);
return false;
}
};

View File

@ -15,10 +15,8 @@
*/
export { seedToPeerId, peerIdToSeed, generatePeerId } from './internal/peerIdUtils';
export { FluenceClient } from './FluenceClient';
export { SecurityTetraplet, PeerIdB58 } from './internal/commonTypes';
export * from './api';
export { Particle } from './internal/particle';
export * from './internal/builtins';
import log, { LogLevelDesc } from 'loglevel';

227
src/internal/AquaHandler.ts Normal file
View File

@ -0,0 +1,227 @@
import { ResultCodes, SecurityTetraplet } from './commonTypes';
/**
* Particle context. Contains additional information about particle which triggered `call` air instruction from Aquamarine interpreter
*/
interface ParticleContext {
/**
* The particle ID
*/
particleId: string;
[x: string]: any;
}
/**
* Represents the information passed from Aquamarine interpreter when a `call` air instruction is executed on the local peer
*/
interface AquaCall {
/**
* Service ID as specified in `call` air instruction
*/
serviceId: string;
/**
* Function name as specified in `call` air instruction
*/
fnName: string;
/**
* Arguments as specified in `call` air instruction
*/
args: any[];
/**
* Security Tetraplets recieved from Aquamarine interpreter
*/
tetraplets: SecurityTetraplet[][];
/**
* Particle context, @see {@link ParticleContext}
*/
particleContext: ParticleContext;
[x: string]: any;
}
/**
* Represents the result of the `call` air instruction to be returned into Aquamarine interpreter
*/
interface AquaCallResult {
/**
* Return code to be returned to Aquamarine interpreter
*/
retCode: ResultCodes;
/**
* Resul object to be returned to Aquamarine interpreter
*/
result?: any;
[x: string]: any;
}
/**
* Type for the middleware used in AquaCallHandler middleware chain.
* In a nutshell middelware is a function of request, response and function to trigger the next middleware in chain.
* Each middleware is free to write additional properties to either request or response object.
* When the chain finishes the response is passed back to Aquamarine interpreter
* @param { AquaCall } req - information about the air `call` instruction
* @param { AquaCallResult } resp - response to be passed to Aquamarine interpreter
* @param { Function } next - function which invokes next middleware in chain
*/
export type Middleware = (req: AquaCall, resp: AquaCallResult, next: Function) => void;
/**
* Convenience middleware factory. Registeres a handler for a pair of 'serviceId/fnName'.
* The return value of the handler is passed back to Aquamarine
* @param { string } serviceId - The identifier of service which would be used to make calls from Aquamarine
* @param { string } fnName - The identifier of function which would be used to make calls from Aquamarine
* @param { (args: any[], tetraplets: SecurityTetraplet[][]) => object } handler - The handler which should handle the call. The result is any object passed back to Aquamarine
*/
export const fnHandler = (
serviceId: string,
fnName: string,
handler: (args: any[], tetraplets: SecurityTetraplet[][]) => any,
) => {
return (req: AquaCall, resp: AquaCallResult, next: Function): void => {
if (req.fnName === fnName && req.serviceId === serviceId) {
const res = handler(req.args, req.tetraplets);
resp.retCode = ResultCodes.success;
resp.result = res;
}
next();
};
};
/**
* Convenience middleware factory. Registeres a handler for a pair of 'serviceId/fnName'.
* Similar to @see { @link fnHandler } but instead returns and empty object immediately runs the handler asynchronously
* @param { string } serviceId - The identifier of service which would be used to make calls from Aquamarine
* @param { string } fnName - The identifier of function which would be used to make calls from Aquamarine
* @param { (args: any[], tetraplets: SecurityTetraplet[][]) => void } handler - The handler which should handle the call.
*/
export const fnAsEventHandler = (
serviceId: string,
fnName: string,
handler: (args: any[], tetraplets: SecurityTetraplet[][]) => void,
) => {
return (req: AquaCall, resp: AquaCallResult, next: Function): void => {
if (req.fnName === fnName && req.serviceId === serviceId) {
setTimeout(() => {
handler(req.args, req.tetraplets);
}, 0);
resp.retCode = ResultCodes.success;
resp.result = {};
}
next();
};
};
/**
* Error catching middleware
*/
export const errorHandler: Middleware = (req: AquaCall, resp: AquaCallResult, next: Function): void => {
try {
next();
} catch (e) {
resp.retCode = ResultCodes.exceptionInHandler;
resp.result = e.toString();
}
};
type AquaCallFunction = (req: AquaCall, resp: AquaCallResult) => void;
/**
* Class defines the handling of a `call` air intruction executed by aquamarine on the local peer.
* All the execution process is defined by the chain of middlewares - architecture popular among backend web frameworks.
* Each middleware has the form of `(req: AquaCall, resp: AquaCallResult, next: Function) => void;`
* A handler starts with an empty middleware chain and does nothing.
* To execute the handler use @see { @link execute } function
*/
export class AquaCallHandler {
private middlewares: Middleware[] = [];
/**
* Appends middleware to the chain of middlewares
* @param { Middleware } middleware
*/
use(middleware: Middleware): AquaCallHandler {
this.middlewares.push(middleware);
return this;
}
/**
* Removes the middleware from the chain of middlewares
* @param { Middleware } middleware
*/
unUse(middleware: Middleware): AquaCallHandler {
const index = this.middlewares.indexOf(middleware);
if (index !== -1) {
this.middlewares.splice(index, 1);
}
return this;
}
/**
* Combine handler with another one. Combintaion is done by copying middleware chain from the argument's handler into current one.
* Please note, that current handler's middlewares take precedence over the ones from handler to be combined with
* @param { AquaCallHandler } other - AquaCallHandler to be combined with
*/
combineWith(other: AquaCallHandler): AquaCallHandler {
this.middlewares = [...this.middlewares, ...other.middlewares];
return this;
}
/**
* Convinience method for registring @see { @link fnHandler } middleware
*/
on(serviceId: string, fnName: string, handler: (args: any[], tetraplets: SecurityTetraplet[][]) => any): Function {
const mw = fnHandler(serviceId, fnName, handler);
this.use(mw);
return () => {
this.unUse(mw);
};
}
/**
* Convinience method for registring @see { @link fnAsEventHandler } middleware
*/
onEvent(
serviceId: string,
fnName: string,
handler: (args: any[], tetraplets: SecurityTetraplet[][]) => void,
): Function {
const mw = fnAsEventHandler(serviceId, fnName, handler);
this.use(mw);
return () => {
this.unUse(mw);
};
}
/**
* Collapses middleware chain into a single function.
*/
buildFunction(): AquaCallFunction {
const result = this.middlewares.reduceRight<AquaCallFunction>(
(agg, cur) => {
return (req, resp) => {
cur(req, resp, () => agg(req, resp));
};
},
(req, res) => {},
);
return result;
}
/**
* Executes the handler with the specified AquaCall request. Return the result response
*/
execute(req: AquaCall): AquaCallResult {
const res: AquaCallResult = {
retCode: ResultCodes.unkownError,
};
this.buildFunction()(req, res);
return res;
}
}

View File

@ -14,16 +14,25 @@
* limitations under the License.
*/
import { build } from './particle';
import * as PeerId from 'peer-id';
import Multiaddr from 'multiaddr';
import { FluenceConnection } from './FluenceConnection';
import { ParticleProcessor } from './ParticleProcessor';
import { ParticleProcessorStrategy } from './ParticleProcessorStrategy';
import { PeerIdB58 } from './commonTypes';
import { PeerIdB58, SecurityTetraplet } from './commonTypes';
import { FluenceClient } from 'src';
import { RequestFlow } from './RequestFlow';
import { AquaCallHandler, errorHandler, fnHandler } from './AquaHandler';
import { loadRelayFn, loadVariablesService } from './RequestFlowBuilder';
export abstract class FluenceClientBase {
const makeDefaultClientHandler = (): AquaCallHandler => {
const res = new AquaCallHandler();
res.use(errorHandler);
res.use(fnHandler('op', 'identity', (args, _) => args));
return res;
};
export class ClientImpl implements FluenceClient {
readonly selfPeerIdFull: PeerId;
get relayPeerId(): PeerIdB58 | undefined {
@ -38,16 +47,21 @@ export abstract class FluenceClientBase {
return this.connection?.isConnected();
}
protected connection: FluenceConnection;
private connection: FluenceConnection;
protected processor: ParticleProcessor;
protected abstract strategy: ParticleProcessorStrategy;
constructor(selfPeerIdFull: PeerId) {
this.selfPeerIdFull = selfPeerIdFull;
this.aquaCallHandler = makeDefaultClientHandler();
this.processor = new ParticleProcessor(selfPeerIdFull, this.aquaCallHandler);
}
aquaCallHandler: AquaCallHandler;
async disconnect(): Promise<void> {
await this.connection.disconnect();
if (this.connection) {
await this.connection.disconnect();
}
await this.processor.destroy();
}
@ -79,17 +93,19 @@ export abstract class FluenceClientBase {
multiaddr,
node,
this.selfPeerIdFull,
this.processor.executeExternalParticle.bind(this.processor),
this.processor.executeIncomingParticle.bind(this.processor),
);
await connection.connect();
await this.processor.init();
this.connection = connection;
await this.processor.init(connection);
}
async sendScript(script: string, data?: Map<string, any>, ttl?: number): Promise<string> {
const particle = await build(this.selfPeerIdFull, this.relayPeerId, script, data, ttl);
await this.processor.executeLocalParticle(particle);
return particle.id;
async initiateFlow(request: RequestFlow): Promise<void> {
// setting `relayVariableName` here. If the client is not connected (i.e it is created as local) then there is no relay
request.handler.on(loadVariablesService, loadRelayFn, () => {
return this.relayPeerId || '';
});
await request.initState(this.selfPeerIdFull);
this.processor.executeLocalParticle(request);
}
}

View File

@ -1,259 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import log from 'loglevel';
import PeerId from 'peer-id';
import { SecurityTetraplet, StepperOutcome } from './commonTypes';
import { FluenceClientBase } from './FluenceClientBase';
import { FluenceClient } from '../FluenceClient';
import { build, genUUID, ParticleDto } from './particle';
import { ParticleProcessor } from './ParticleProcessor';
import { ParticleProcessorStrategy } from './ParticleProcessorStrategy';
const fetchCallbackServiceName = '__callback';
const selfRelayVarName = '__relay';
const wrapRelayBasedCall = (script: string) => {
return `
(seq
(call ${selfRelayVarName} ("op" "identity") [])
${script}
)
`;
};
const wrapFetchCall = (script: string, particleId: string, resultArgNames: string[]) => {
// TODO: sanitize
const resultTogether = resultArgNames.join(' ');
let res = `
(seq
${script}
(seq
(call ${selfRelayVarName} ("op" "identity") [])
(call %init_peer_id% ("${fetchCallbackServiceName}" "${particleId}") [${resultTogether}])
)
)`;
return wrapRelayBasedCall(res);
};
export interface FluenceEvent {
type: string;
args: any[];
}
export type FluenceEventHandler = (event: FluenceEvent) => void;
export class FluenceClientImpl extends FluenceClientBase implements FluenceClient {
private eventSubscribers: Map<string, FluenceEventHandler[]> = new Map();
private eventValidators: Map<string, Function> = new Map();
private callbacks: Map<string, Function> = new Map();
private fetchParticles: Map<string, { resolve: Function; reject: Function }> = new Map();
constructor(selfPeerId: PeerId) {
super(selfPeerId);
this.processor = new ParticleProcessor(this.strategy, selfPeerId);
}
async fetch<T>(script: string, resultArgNames: string[], data?: Map<string, any>, ttl?: number): Promise<T> {
data = this.addRelayToArgs(data);
const callBackId = genUUID();
script = wrapFetchCall(script, callBackId, resultArgNames);
const particle = await build(this.selfPeerIdFull, this.relayPeerId, script, data, ttl, callBackId);
const prFetch = new Promise<T>(async (resolve, reject) => {
this.fetchParticles.set(callBackId, { resolve, reject });
});
const prExec = this.processor.executeLocalParticle(particle);
return prExec.then(() => prFetch);
}
// TODO:: better naming probably?
async fireAndForget(script: string, data?: Map<string, any>, ttl?: number) {
data = this.addRelayToArgs(data);
script = wrapRelayBasedCall(script);
await this.sendScript(script, data, ttl);
}
registerEvent(
channel: string,
eventName: string,
validate?: (channel: string, eventName: string, args: any[], tetraplets: any[][]) => boolean,
) {
if (!validate) {
validate = (c, e, a, t) => true;
}
this.eventValidators.set(`${channel}/${eventName}`, validate);
}
unregisterEvent(channel: string, eventName: string) {
this.eventValidators.delete(`${channel}/${eventName}`);
}
registerCallback(
serviceId: string,
fnName: string,
callback: (args: any[], tetraplets: SecurityTetraplet[][]) => object,
) {
this.callbacks.set(`${serviceId}/${fnName}`, callback);
}
unregisterCallback(channel: string, eventName: string) {
this.eventValidators.delete(`${channel}/${eventName}`);
}
subscribe(channel: string, handler: FluenceEventHandler) {
if (!this.eventSubscribers.get(channel)) {
this.eventSubscribers.set(channel, []);
}
this.eventSubscribers.get(channel).push(handler);
}
protected strategy: ParticleProcessorStrategy = {
particleHandler: (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => {
// missing built-in op
if (serviceId === 'op' && fnName === 'identity') {
return {
ret_code: 0,
result: JSON.stringify(args),
};
}
// async fetch model handling
if (serviceId === fetchCallbackServiceName) {
const executingParticlePromiseFns = this.fetchParticles.get(fnName);
if (executingParticlePromiseFns) {
// don't block
setTimeout(() => {
this.fetchParticles.delete(fnName);
executingParticlePromiseFns.resolve(args);
}, 0);
}
return {
ret_code: 0,
result: JSON.stringify({}),
};
}
// event model handling
const eventPair = `${serviceId}/${fnName}`;
const eventValidator = this.eventValidators.get(eventPair);
if (eventValidator) {
try {
if (!eventValidator(serviceId, fnName, args, tetraplets)) {
return {
ret_code: 1, // TODO:: error codes
result: 'validation failed',
};
}
} catch (e) {
log.error('error running validation function: ' + e);
return {
ret_code: 1, // TODO:: error codes
result: 'validation failed',
};
}
// don't block
setTimeout(() => {
this.pushEvent(serviceId, {
type: fnName,
args: args,
});
}, 0);
return {
ret_code: 0,
result: JSON.stringify({}),
};
}
// callback model handling
const callback = this.callbacks.get(eventPair);
if (callback) {
try {
const res = callback(args, tetraplets);
return {
ret_code: 0,
result: JSON.stringify(res),
};
} catch (e) {
return {
ret_code: 1, // TODO:: error codes
result: JSON.stringify(e),
};
}
}
return {
ret_code: 1,
result: `Error. There is no service: ${serviceId}`,
};
},
sendParticleFurther: async (particle: ParticleDto) => {
try {
await this.connection.sendParticle(particle);
} catch (reason) {
log.error(`Error on sending particle with id ${particle.id}: ${reason}`);
}
},
onParticleTimeout: (particle: ParticleDto, now: number) => {
log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`);
const executingParticle = this.fetchParticles.get(particle.id);
if (executingParticle) {
executingParticle.reject(new Error(`particle ${particle.id} timed out`));
}
},
onLocalParticleRecieved: (particle: ParticleDto) => {
log.debug('local particle received', particle);
},
onExternalParticleRecieved: (particle: ParticleDto) => {
log.debug('external particle received', particle);
},
onStepperExecuting: (particle: ParticleDto) => {
log.debug('stepper executing particle', particle);
},
onStepperExecuted: (stepperOutcome: StepperOutcome) => {
log.debug('inner interpreter outcome:', stepperOutcome);
},
};
private pushEvent(channel: string, event: FluenceEvent) {
const subs = this.eventSubscribers.get(channel);
if (subs) {
for (let sub of subs) {
sub(event);
}
}
}
private addRelayToArgs(data: Map<string, any>) {
if (data === undefined) {
data = new Map();
}
if (!data.has(selfRelayVarName)) {
data.set(selfRelayVarName, this.relayPeerId);
}
return data;
}
}

View File

@ -23,7 +23,7 @@ import pipe from 'it-pipe';
import Multiaddr from 'multiaddr';
import PeerId from 'peer-id';
import * as log from 'loglevel';
import { parseParticle, ParticleDto, toPayload } from './particle';
import { parseParticle, Particle, toPayload } from './particle';
export const PROTOCOL_NAME = '/fluence/faas/1.0.0';
@ -39,13 +39,13 @@ export class FluenceConnection {
private readonly address: Multiaddr;
readonly nodePeerId: PeerId;
private readonly selfPeerIdStr: string;
private readonly handleParticle: (call: ParticleDto) => void;
private readonly handleParticle: (call: Particle) => void;
constructor(
multiaddr: Multiaddr,
hostPeerId: PeerId,
selfPeerId: PeerId,
handleParticle: (call: ParticleDto) => void,
handleParticle: (call: Particle) => void,
) {
this.selfPeerId = selfPeerId;
this.handleParticle = handleParticle;
@ -118,7 +118,7 @@ export class FluenceConnection {
this.status = Status.Disconnected;
}
async sendParticle(particle: ParticleDto): Promise<void> {
async sendParticle(particle: Particle): Promise<void> {
this.checkConnectedOrThrow();
let action = toPayload(particle);

View File

@ -14,249 +14,165 @@
* limitations under the License.
*/
import { ParticleDto } from './particle';
import { logParticle, Particle } from './particle';
import * as PeerId from 'peer-id';
import { instantiateInterpreter, InterpreterInvoke } from './stepper';
import { ParticleHandler, SecurityTetraplet, StepperOutcome } from './commonTypes';
import { ParticleHandler, SecurityTetraplet, CallServiceResult } from './commonTypes';
import log from 'loglevel';
import { ParticleProcessorStrategy } from './ParticleProcessorStrategy';
// HACK:: make an api for aqua stepper to accept variables in an easy way!
let magicParticleStorage: Map<string, Map<string, any>> = new Map();
// HACK:: make an api for aqua stepper to accept variables in an easy way!
export function injectDataIntoParticle(particleId: string, data: Map<string, any>, ttl: number) {
log.trace(`setting data for ${particleId}`, data);
magicParticleStorage.set(particleId, data);
setTimeout(() => {
log.trace(`data for ${particleId} is deleted`);
magicParticleStorage.delete(particleId);
}, ttl);
}
// HACK:: make an api for aqua stepper to accept variables in an easy way!
const wrapWithDataInjectionHandling = (
handler: ParticleHandler,
getCurrentParticleId: () => string,
): ParticleHandler => {
return (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => {
if (serviceId === '__magic' && fnName === 'load') {
const current = getCurrentParticleId();
const data = magicParticleStorage.get(current);
const res = data ? data.get(args[0]) : {};
return {
ret_code: 0,
result: JSON.stringify(res),
};
}
return handler(serviceId, fnName, args, tetraplets);
};
};
import { RequestFlow } from './RequestFlow';
import { AquaCallHandler } from './AquaHandler';
import { FluenceConnection } from './FluenceConnection';
import { AquamarineInterpreter } from './aqua/interpreter';
export class ParticleProcessor {
private interpreter: InterpreterInvoke;
private subscriptions: Map<string, ParticleDto> = new Map();
private particlesQueue: ParticleDto[] = [];
private currentParticle?: string;
private readonly peerId: PeerId;
private readonly clientHandler: AquaCallHandler;
strategy: ParticleProcessorStrategy;
peerId: PeerId;
private connection: FluenceConnection;
private interpreter: AquamarineInterpreter;
private requests: Map<string, RequestFlow> = new Map();
private queue: RequestFlow[] = [];
private currentRequestId: string | null = null;
private watchDog;
constructor(strategy: ParticleProcessorStrategy, peerId: PeerId) {
this.strategy = strategy;
constructor(peerId: PeerId, clientHandler: AquaCallHandler) {
this.peerId = peerId;
this.clientHandler = clientHandler;
}
async init() {
await this.instantiateInterpreter();
/**
* Instantiate WebAssembly with AIR interpreter to execute AIR scripts
*/
async init(connection?: FluenceConnection) {
this.connection = connection;
this.interpreter = await AquamarineInterpreter.create({
particleHandler: this.hanlder.bind(this),
peerId: this.peerId,
});
this.watchDog = setInterval(() => {
for (let key in this.requests.keys) {
if (this.requests.get(key).hasExpired()) {
this.requests.delete(key);
}
}
}, 5000);
}
async destroy() {
// TODO: destroy interpreter
clearInterval(this.watchDog);
}
async executeLocalParticle(particle: ParticleDto): Promise<void> {
this.strategy?.onLocalParticleRecieved(particle);
return new Promise<void>((resolve, reject) => {
// we check by callbacks that the script passed through the interpreter without errors
this.handleParticle(particle, resolve, reject);
});
}
async executeLocalParticle(request: RequestFlow) {
request.handler.combineWith(this.clientHandler);
this.requests.set(request.id, request);
async executeExternalParticle(particle: ParticleDto): Promise<void> {
this.strategy?.onExternalParticleRecieved(particle);
return await this.handleExternalParticle(particle);
}
logParticle(log.debug, 'external particle received', request.getParticle());
/*
* private
*/
private getCurrentParticleId(): string | undefined {
return this.currentParticle;
}
private setCurrentParticleId(particle: string | undefined) {
this.currentParticle = particle;
}
private enqueueParticle(particle: ParticleDto): void {
this.particlesQueue.push(particle);
}
private popParticle(): ParticleDto | undefined {
return this.particlesQueue.pop();
}
/**
* Subscriptions will be applied by outside message if id will be the same.
*
* @param particle
* @param ttl time to live, subscription will be deleted after this time
*/
subscribe(particle: ParticleDto, ttl: number) {
let self = this;
setTimeout(() => {
self.subscriptions.delete(particle.id);
self.strategy?.onParticleTimeout(particle, Date.now());
}, ttl);
this.subscriptions.set(particle.id, particle);
}
updateSubscription(particle: ParticleDto): boolean {
if (this.subscriptions.has(particle.id)) {
this.subscriptions.set(particle.id, particle);
return true;
} else {
return false;
}
}
getSubscription(id: string): ParticleDto | undefined {
return this.subscriptions.get(id);
}
hasSubscription(particle: ParticleDto): boolean {
return this.subscriptions.has(particle.id);
}
/**
* Pass a particle to a interpreter and send a result to other services.
* `resolve` will be completed if ret_code equals 0
* `reject` will be completed if ret_code not equals 0
*/
private async handleParticle(particle: ParticleDto, resolve?: () => void, reject?: (r: any) => any): Promise<void> {
// if a current particle is processing, add new particle to the queue
if (this.getCurrentParticleId() !== undefined && this.getCurrentParticleId() !== particle.id) {
this.enqueueParticle(particle);
} else {
if (this.interpreter === undefined) {
throw new Error('Undefined. Interpreter is not initialized');
}
// start particle processing if queue is empty
try {
this.setCurrentParticleId(particle.id);
// check if a particle is relevant
let now = Date.now();
let actualTtl = particle.timestamp + particle.ttl - now;
if (actualTtl <= 0) {
this.strategy?.onParticleTimeout(particle, now);
if (reject) {
reject(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`);
}
} else {
// if there is no subscription yet, previous data is empty
let prevData: Uint8Array = Buffer.from([]);
let prevParticle = this.getSubscription(particle.id);
if (prevParticle) {
prevData = prevParticle.data;
// update a particle in a subscription
this.updateSubscription(particle);
} else {
// set a particle with actual ttl
this.subscribe(particle, actualTtl);
}
this.strategy.onStepperExecuting(particle);
let stepperOutcomeStr = this.interpreter(
particle.init_peer_id,
particle.script,
prevData,
particle.data,
);
let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr);
// update data after aquamarine execution
let newParticle: ParticleDto = { ...particle, data: stepperOutcome.data };
this.strategy.onStepperExecuted(stepperOutcome);
this.updateSubscription(newParticle);
// do nothing if there is no `next_peer_pks` or if client isn't connected to the network
if (stepperOutcome.next_peer_pks.length > 0) {
this.strategy.sendParticleFurther(newParticle);
}
if (stepperOutcome.ret_code == 0) {
if (resolve) {
resolve();
}
} else {
const error = stepperOutcome.error_message;
if (reject) {
reject(error);
} else {
log.error('Unhandled error: ', error);
}
}
}
} catch (e) {
if (reject) {
reject(e);
} else {
log.error('Unhandled error: ', e);
throw e;
}
} finally {
// get last particle from the queue
let nextParticle = this.popParticle();
// start the processing of a new particle if it exists
if (nextParticle) {
// update current particle
this.setCurrentParticleId(nextParticle.id);
return await this.handleParticle(nextParticle);
} else {
// wait for a new call (do nothing) if there is no new particle in a queue
this.setCurrentParticleId(undefined);
}
}
try {
this.processRequest(request);
} catch (err) {
log.error('particle processing failed: ' + err);
}
}
/**
* Handle incoming particle from a relay.
*/
private async handleExternalParticle(particle: ParticleDto): Promise<void> {
let data: any = particle.data;
let error: any = data['protocol!error'];
if (error !== undefined) {
log.error('error in external particle: ', error);
async executeIncomingParticle(particle: Particle) {
logParticle(log.debug, 'external particle received', particle);
let request = this.requests.get(particle.id);
if (request) {
request.receiveUpdate(particle);
} else {
return await this.handleParticle(particle);
request = RequestFlow.createExternal(particle);
request.handler.combineWith(this.clientHandler);
}
this.requests.set(request.id, request);
await this.processRequest(request);
}
private async processRequest(request: RequestFlow): Promise<void> {
// enque the request if it's not the currently processed one
if (this.currentRequestId !== null && this.currentRequestId !== request.id) {
this.queue.push(request);
return;
}
if (this.interpreter === undefined) {
throw new Error('Undefined. Interpreter is not initialized');
}
// start request processing if queue is empty
try {
this.currentRequestId = request.id;
if (request.hasExpired()) {
return;
}
logParticle(log.debug, 'interpreter executing particle', request.getParticle());
const interpreterOutcome = request.runInterpreter(this.interpreter);
log.debug('inner interpreter outcome:', {
ret_code: interpreterOutcome.ret_code,
error_message: interpreterOutcome.error_message,
next_peer_pks: interpreterOutcome.next_peer_pks,
});
if (interpreterOutcome.ret_code !== 0) {
request.raiseError(
`Interpreter failed with code=${interpreterOutcome.ret_code} message=${interpreterOutcome.error_message}`,
);
}
// do nothing if there is no `next_peer_pks` or if client isn't connected to the network
if (interpreterOutcome.next_peer_pks.length > 0) {
if (!this.connection) {
log.error('Cannot send particle: non connected');
}
request.sendIntoConnection(this.connection);
}
} finally {
// get last request from the queue
let nextRequest = this.queue.pop();
// start the processing of the new request if it exists
if (nextRequest) {
// update current particle
this.currentRequestId = nextRequest.id;
await this.processRequest(nextRequest);
} else {
// wait for a new call (do nothing) if there is no new particle in a queue
this.currentRequestId = null;
}
}
}
/**
* Instantiate WebAssembly with AIR interpreter to execute AIR scripts
*/
async instantiateInterpreter() {
this.interpreter = await instantiateInterpreter(
wrapWithDataInjectionHandling(
this.strategy.particleHandler.bind(this),
this.getCurrentParticleId.bind(this),
),
this.peerId,
);
}
private hanlder: ParticleHandler = (
serviceId: string,
fnName: string,
args: any[],
tetraplets: SecurityTetraplet[][],
): CallServiceResult => {
if (this.currentRequestId === null) {
throw Error('current request can`t be null here');
}
const request = this.requests.get(this.currentRequestId);
const res = request.handler.execute({
serviceId,
fnName,
args,
tetraplets,
particleContext: {
particleId: request.id,
},
});
return {
ret_code: res.retCode,
result: JSON.stringify(res.result || {}),
};
};
}

View File

@ -1,29 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ParticleHandler, StepperOutcome } from './commonTypes';
import { ParticleDto } from './particle';
export interface ParticleProcessorStrategy {
particleHandler: ParticleHandler;
sendParticleFurther: (particle: ParticleDto) => void;
onParticleTimeout?: (particle: ParticleDto, now: number) => void;
onLocalParticleRecieved?: (particle: ParticleDto) => void;
onExternalParticleRecieved?: (particle: ParticleDto) => void;
onStepperExecuting?: (particle: ParticleDto) => void;
onStepperExecuted?: (stepperOutcome: StepperOutcome) => void;
}

129
src/internal/RequestFlow.ts Normal file
View File

@ -0,0 +1,129 @@
import log, { trace } from 'loglevel';
import PeerId from 'peer-id';
import { AquamarineInterpreter } from './aqua/interpreter';
import { AquaCallHandler } from './AquaHandler';
import { InterpreterOutcome } from './commonTypes';
import { FluenceConnection } from './FluenceConnection';
import { Particle, genUUID } from './particle';
export const DEFAULT_TTL = 7000;
/**
* The class represents the current view (and state) of distributed the particle execution process from client's point of view.
* It stores the intermediate particles state during the process. RequestFlow is identified by the id of the particle that is executed during the flow.
* Each RequestFlow contains a separate (unique to the current flow) AquaCallHandler where the handling of `call` AIR instruction takes place
* Please note, that RequestFlow's is handler is combined with the handler from client before the execution occures.
* After the combination middlewares from RequestFlow are executed before client handler's middlewares.
*/
export class RequestFlow {
private state: Particle;
private prevData: Uint8Array = Buffer.from([]);
private onTimeoutHandlers = [];
private onErrorHandlers = [];
readonly id: string;
readonly isExternal: boolean;
readonly script: string;
readonly handler = new AquaCallHandler();
ttl: number = DEFAULT_TTL;
static createExternal(particle: Particle): RequestFlow {
const res = new RequestFlow(true, particle.id, particle.script);
res.ttl = particle.ttl;
res.state = particle;
setTimeout(res.raiseTimeout.bind(res), particle.ttl);
return res;
}
static createLocal(script: string, ttl?: number): RequestFlow {
const res = new RequestFlow(false, genUUID(), script);
res.ttl = ttl ?? DEFAULT_TTL;
return res;
}
constructor(isExternal: boolean, id: string, script: string) {
this.isExternal = isExternal;
this.id = id;
this.script = script;
}
onTimeout(handler: () => void) {
this.onTimeoutHandlers.push(handler);
}
onError(handler: (error) => void) {
this.onErrorHandlers.push(handler);
}
async initState(peerId: PeerId): Promise<void> {
const id = this.id;
let currentTime = Date.now();
const particle: Particle = {
id: id,
init_peer_id: peerId.toB58String(),
timestamp: currentTime,
ttl: this.ttl,
script: this.script,
signature: '',
data: Buffer.from([]),
};
this.state = particle;
setTimeout(this.raiseTimeout.bind(this), particle.ttl);
}
receiveUpdate(particle: Particle) {
// TODO:: keep the history of particle data mb?
this.prevData = this.state.data;
this.state.data = particle.data;
}
async sendIntoConnection(connection: FluenceConnection): Promise<void> {
const particle = this.state;
try {
await connection.sendParticle(particle);
} catch (err) {
log.error(`Error on sending particle with id ${particle.id}: ${err}`);
}
}
runInterpreter(interpreter: AquamarineInterpreter) {
const interpreterOutcomeStr = interpreter.invoke(
this.state.init_peer_id,
this.state.script,
this.prevData,
this.state.data,
);
const interpreterOutcome: InterpreterOutcome = JSON.parse(interpreterOutcomeStr);
// TODO:: keep the history of particle data mb?
this.state.data = interpreterOutcome.data;
return interpreterOutcome;
}
getParticle = () => this.state;
hasExpired(): boolean {
let now = Date.now();
const particle = this.getParticle();
let actualTtl = particle.timestamp + particle.ttl - now;
return actualTtl <= 0;
}
raiseError(error) {
for (const h of this.onErrorHandlers) {
h(error);
}
}
private raiseTimeout() {
const now = Date.now();
const particle = this.state;
log.info(`Particle expired. Now: ${now}, ttl: ${particle?.ttl}, ts: ${particle?.timestamp}`);
for (const h of this.onTimeoutHandlers) {
h();
}
}
}

View File

@ -0,0 +1,197 @@
import { of } from 'ipfs-only-hash';
import log from 'loglevel';
import { AquaCallHandler } from './AquaHandler';
import { DEFAULT_TTL, RequestFlow } from './RequestFlow';
export const loadVariablesService = 'load';
const loadVariablesFn = 'load_variable';
export const loadRelayFn = 'load_relay';
const xorHandleService = '__magic';
const xorHandleFn = 'handle_xor';
export const relayVariableName = 'init_relay';
const wrapWithXor = (script: string): string => {
return `
(xor
${script}
(xor
(match ${relayVariableName} ""
(call %init_peer_id% ("${xorHandleService}" "${xorHandleFn}") [%last_error%])
)
(seq
(call ${relayVariableName} ("op" "identity") [])
(call %init_peer_id% ("${xorHandleService}" "${xorHandleFn}") [%last_error%])
)
)
)`;
};
class ScriptBuilder {
private script: string;
raw(script: string): ScriptBuilder {
this.script = script;
return this;
}
build(): string {
return this.script;
}
}
const wrapWithVariableInjectionScript = (script: string, fields: string[]): string => {
fields.forEach((v) => {
script = `
(seq
(call %init_peer_id% ("${loadVariablesService}" "${loadVariablesFn}") ["${v}"] ${v})
${script}
)`;
});
return script;
};
const wrapWithInjectRelayScript = (script: string): string => {
return `
(seq
(seq
(call %init_peer_id% ("${loadVariablesService}" "${loadRelayFn}") [] ${relayVariableName})
(call %init_peer_id% ("op" "identity") [%init_peer_id%] init_peer_id)
)
${script}
)`;
};
export class RequestFlowBuilder {
private ttl: number = DEFAULT_TTL;
private variables = new Map<string, any>();
private handlerConfigs: Array<(handler: AquaCallHandler) => void> = [];
private buildScript: (sb: ScriptBuilder) => void;
private onTimeout: () => void;
private onError: (error: any) => void;
build() {
if (!this.buildScript) {
throw new Error();
}
const b = new ScriptBuilder();
this.buildScript(b);
let script = b.build();
script = wrapWithVariableInjectionScript(script, Array.from(this.variables.keys()));
script = wrapWithXor(script);
script = wrapWithInjectRelayScript(script);
const res = RequestFlow.createLocal(script, this.ttl);
res.handler.on(loadVariablesService, loadVariablesFn, (args, _) => {
return this.variables.get(args[0]) || {};
});
res.handler.onEvent(xorHandleService, xorHandleFn, (args) => {
let msg;
try {
msg = JSON.parse(args[0]);
} catch (e) {
msg = e;
}
try {
res.raiseError(msg);
} catch (e) {
log.error('Error handling script executed with error', e);
}
});
for (let h of this.handlerConfigs) {
h(res.handler);
}
if (this.onTimeout) {
res.onTimeout(this.onTimeout);
}
if (this.onError) {
res.onError(this.onError);
}
return res;
}
withScript(action: (sb: ScriptBuilder) => void): RequestFlowBuilder {
this.buildScript = action;
return this;
}
withRawScript(script: string): RequestFlowBuilder {
this.buildScript = (sb) => {
sb.raw(script);
};
return this;
}
withTTL(ttl: number): RequestFlowBuilder {
this.ttl = ttl;
return this;
}
configHandler(config: (handler: AquaCallHandler) => void): RequestFlowBuilder {
this.handlerConfigs.push(config);
return this;
}
handleTimeout(handler: () => void): RequestFlowBuilder {
this.onTimeout = handler;
return this;
}
handleScriptError(handler: (error) => void): RequestFlowBuilder {
this.onError = handler;
return this;
}
withVariable(name: string, value: any): RequestFlowBuilder {
this.variables.set(name, value);
return this;
}
withVariables(data: Map<string, any> | Record<string, any>): RequestFlowBuilder {
if (data instanceof Map) {
this.variables = new Map([...Array.from(this.variables.entries()), ...Array.from(data.entries())]);
} else {
for (let k in data) {
this.variables.set(k, data[k]);
}
}
return this;
}
buildAsFetch<T>(
callbackServiceId: string = 'callback',
callbackFnName: string = 'callback',
): [RequestFlow, Promise<T>] {
const fetchPromise = new Promise<T>((resolve, reject) => {
this.handlerConfigs.push((h) => {
h.onEvent(callbackServiceId, callbackFnName, (args, _) => {
resolve(args as any);
});
});
this.handleTimeout(() => {
reject(`Timed out after ${this.ttl}ms`);
});
this.handleScriptError((e) => {
reject(e);
});
});
return [this.build(), fetchPromise];
}
buildWithErrorHandling(): [RequestFlow, Promise<void>] {
const promise = new Promise<void>((resolve, reject) => {
this.handleScriptError(reject);
});
return [this.build(), promise];
}
}

View File

@ -102,16 +102,16 @@ function passArray8ToWasm0(wasm, arg, malloc) {
/**
* @param {any} wasm
* @param {string} init_user_id
* @param {string} init_peer_id
* @param {string} aqua
* @param {string} prev_data
* @param {string} data
* @param {string} log_level
* @returns {string}
*/
export function invoke(wasm, init_user_id, aqua, prev_data, data, log_level) {
export function invoke(wasm, init_peer_id, aqua, prev_data, data, log_level) {
try {
var ptr0 = passStringToWasm0(wasm, init_user_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
var ptr0 = passStringToWasm0(wasm, init_peer_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
var len0 = WASM_VECTOR_LEN;
var ptr1 = passStringToWasm0(wasm, aqua, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
var len1 = WASM_VECTOR_LEN;

View File

@ -15,20 +15,22 @@
*/
import { toByteArray } from 'base64-js';
import * as aqua from './aqua';
import { return_current_peer_id, return_call_service_result, getStringFromWasm0, free } from './aqua';
import { ParticleHandler, CallServiceResult, SecurityTetraplet } from './commonTypes';
import * as aqua from '.';
import { return_current_peer_id, return_call_service_result, getStringFromWasm0, free } from '.';
import { ParticleHandler, CallServiceResult, SecurityTetraplet } from '../commonTypes';
import PeerId from 'peer-id';
import log from 'loglevel';
import wasmBs64 from '@fluencelabs/aquamarine-interpreter';
export type InterpreterInvoke = (
init_user_id: string,
// prettier-ignore
type InterpreterInvoke = (
init_peer_id: string,
script: string,
prev_data: Uint8Array,
data: Uint8Array,
) => string;
type ImportObject = {
'./aquamarine_client_bg.js': {
// fn call_service_impl(service_id: String, fn_name: String, args: String, security_tetraplets: String) -> String;
@ -140,7 +142,7 @@ const theParticleHandler = (
tetrapletsObject = JSON.parse(tetraplets);
} catch (err) {
console.error('Cannot parse arguments: ' + JSON.stringify(err));
log.error('Cannot parse arguments: ' + JSON.stringify(err));
return {
result: JSON.stringify('Cannot parse arguments: ' + JSON.stringify(err)),
ret_code: 1,
@ -187,7 +189,7 @@ function newImportObject(particleHandler: ParticleHandler, cfg: HostImportsConfi
return_current_peer_id(wasm, peerIdStr, arg0);
},
__wbindgen_throw: (arg: any) => {
console.log(`wbindgen throw: ${JSON.stringify(arg)}`);
log.error(`wbindgen throw: ${JSON.stringify(arg)}`);
},
},
host: log_import(cfg),
@ -206,17 +208,13 @@ function newLogImport(cfg: HostImportsConfig): ImportObject {
}
/// Instantiates AIR interpreter, and returns its `invoke` function as closure
/// NOTE: an interpreter is also called a stepper from time to time
export async function instantiateInterpreter(
particleHandler: ParticleHandler,
peerId: PeerId,
): Promise<InterpreterInvoke> {
async function instantiateInterpreter(particleHandler: ParticleHandler, peerId: PeerId): Promise<InterpreterInvoke> {
let cfg = new HostImportsConfig((cfg) => {
return newImportObject(particleHandler, cfg, peerId);
});
let instance = await interpreterInstance(cfg);
return (init_user_id: string, script: string, prev_data: Uint8Array, data: Uint8Array) => {
return (init_peer_id: string, script: string, prev_data: Uint8Array, data: Uint8Array) => {
let logLevel = log.getLevel();
let logLevelStr = 'info';
if (logLevel === 0) {
@ -233,13 +231,13 @@ export async function instantiateInterpreter(
logLevelStr = 'off';
}
return aqua.invoke(instance.exports, init_user_id, script, prev_data, data, logLevelStr);
return aqua.invoke(instance.exports, init_peer_id, script, prev_data, data, logLevelStr);
};
}
/// Instantiate AIR interpreter with host imports containing only logger, but not call_service
/// peerId isn't actually required for AST parsing, but host imports require it, and I don't see any workaround
export async function parseAstClosure(): Promise<(script: string) => string> {
async function parseAstClosure(): Promise<(script: string) => string> {
let cfg = new HostImportsConfig((cfg) => newLogImport(cfg));
let instance = await interpreterInstance(cfg);
@ -250,7 +248,49 @@ export async function parseAstClosure(): Promise<(script: string) => string> {
/// Parses script and returns AST in JSON format
/// NOTE & TODO: interpreter is instantiated every time, make it a lazy constant?
export async function parseAIR(script: string): Promise<string> {
async function parseAIR(script: string): Promise<string> {
let closure = await parseAstClosure();
return closure(script);
}
export class AquamarineInterpreter {
private wasmWrapper;
constructor(wasmWrapper) {
this.wasmWrapper = wasmWrapper;
}
static async create(config: { particleHandler: ParticleHandler; peerId: PeerId }) {
const cfg = new HostImportsConfig((cfg) => {
return newImportObject(config.particleHandler, cfg, config.peerId);
});
const instance = await interpreterInstance(cfg);
const res = new AquamarineInterpreter(instance);
return res;
}
invoke(init_peer_id: string, script: string, prev_data: Uint8Array, data: Uint8Array): string {
let logLevel = log.getLevel();
let logLevelStr = 'info';
if (logLevel === 0) {
logLevelStr = 'trace';
} else if (logLevel === 1) {
logLevelStr = 'debug';
} else if (logLevel === 2) {
logLevelStr = 'info';
} else if (logLevel === 3) {
logLevelStr = 'warn';
} else if (logLevel === 4) {
logLevelStr = 'error';
} else if (logLevel === 5) {
logLevelStr = 'off';
}
return aqua.invoke(this.wasmWrapper.exports, init_peer_id, script, prev_data, data, logLevelStr);
}
parseAir(script: string): string {
return aqua.ast(this.wasmWrapper.exports, script);
}
}

View File

@ -14,11 +14,10 @@
* limitations under the License.
*/
import bs58 from 'bs58';
import { sendParticleAsFetch } from '../api';
import { Particle } from './particle';
import { FluenceClient } from '../FluenceClient';
import { RequestFlow } from './RequestFlow';
import { ModuleConfig } from './moduleConfig';
import { RequestFlowBuilder } from './RequestFlowBuilder';
import { FluenceClient } from 'src/api.unstable';
const nodeIdentityCall = (client: FluenceClient): string => {
return `(call "${client.relayPeerId}" ("op" "identity") [])`;
@ -56,7 +55,13 @@ const requestResponse = async <T>(
)
`;
const res = await sendParticleAsFetch<any[]>(client, new Particle(script, data, ttl), name);
const [request, promise] = new RequestFlowBuilder()
.withRawScript(script)
.withVariables(data)
.withTTL(ttl)
.buildAsFetch<any[]>('_callback', name);
await client.initiateFlow(request);
const res = await promise;
return handleResponse(res);
};
@ -67,21 +72,25 @@ const requestResponse = async <T>(
*/
export const getModules = async (client: FluenceClient, ttl?: number): Promise<string[]> => {
let callbackFn = 'getModules';
const particle = new Particle(
`
const [req, promise] = new RequestFlowBuilder()
.withRawScript(
`
(seq
(call __relay ("dist" "list_modules") [] result)
(call myPeerId ("_callback" "${callbackFn}") [result])
)
`,
{
)
.withVariables({
__relay: client.relayPeerId,
myPeerId: client.selfPeerId,
},
ttl,
);
})
.withTTL(ttl)
.buildAsFetch<[string[]]>('_callback', callbackFn);
client.initiateFlow(req);
return sendParticleAsFetch(client, particle, callbackFn);
const [res] = await promise;
return res;
};
/**
@ -91,8 +100,9 @@ export const getModules = async (client: FluenceClient, ttl?: number): Promise<s
*/
export const getInterfaces = async (client: FluenceClient, ttl?: number): Promise<string[]> => {
let callbackFn = 'getInterfaces';
const particle = new Particle(
`
const [req, promise] = new RequestFlowBuilder()
.withRawScript(
`
(seq
(seq
(seq
@ -109,14 +119,17 @@ export const getInterfaces = async (client: FluenceClient, ttl?: number): Promis
(call myPeerId ("_callback" "${callbackFn}") [interfaces])
)
`,
{
)
.withVariables({
relay: client.relayPeerId,
myPeerId: client.selfPeerId,
},
ttl,
);
})
.withTTL(ttl)
.buildAsFetch<[string[]]>('_callback', callbackFn);
const [res] = await sendParticleAsFetch<[string[]]>(client, particle, callbackFn);
client.initiateFlow(req);
const [res] = await promise;
return res;
};
@ -153,15 +166,22 @@ export const uploadModule = async (
data.set('__relay', client.relayPeerId);
data.set('myPeerId', client.selfPeerId);
let script = `
const [req, promise] = new RequestFlowBuilder()
.withRawScript(
`
(seq
(call __relay ("dist" "add_module") [module_bytes module_config] result)
(call myPeerId ("_callback" "getModules") [result])
)
`;
`,
)
.withVariables(data)
.withTTL(ttl)
.buildAsFetch<[string[]]>('_callback', 'getModules');
return sendParticleAsFetch(client, new Particle(script, data, ttl), 'getModules', '_callback');
await client.initiateFlow(req);
await promise;
};
/**

View File

@ -26,7 +26,7 @@ export type ParticleHandler = (
tetraplets: SecurityTetraplet[][],
) => CallServiceResult;
export interface StepperOutcome {
export interface InterpreterOutcome {
ret_code: number;
data: Uint8Array;
next_peer_pks: string[];
@ -44,3 +44,10 @@ export interface SecurityTetraplet extends ResolvedTriplet {
}
export type PeerIdB58 = string;
export enum ResultCodes {
success = 0,
noServiceFound = 1,
exceptionInHandler = 2,
unkownError = 1024,
}

View File

@ -18,43 +18,9 @@ import { v4 as uuidv4 } from 'uuid';
import { fromByteArray, toByteArray } from 'base64-js';
import PeerId from 'peer-id';
import { encode } from 'bs58';
import { injectDataIntoParticle } from './ParticleProcessor';
import { PeerIdB58 } from './commonTypes';
import log, { LogLevel } from 'loglevel';
const DEFAULT_TTL = 7000;
/**
* The class representing Particle - a data structure used to perform operations on Fluence Network. It originates on some peer in the network, travels the network through a predefined path, triggering function execution along its way.
*/
export class Particle {
script: string;
data: Map<string, any>;
ttl: number;
/**
* Creates a particle with specified parameters.
* @param { String }script - Air script which defines the execution of a particle its path, functions it triggers on peers, and so on.
* @param { Map<string, any> | Record<string, any> } data - Variables passed to the particle in the form of either JS Map or JS object with keys representing variable names and values representing values correspondingly
* @param { [Number]=7000 } ttl - Time to live, a timout after which the particle execution is stopped by Aquamarine.
*/
constructor(script: string, data?: Map<string, any> | Record<string, any>, ttl?: number) {
this.script = script;
if (data === undefined) {
this.data = new Map();
} else if (data instanceof Map) {
this.data = data;
} else {
this.data = new Map();
for (let k in data) {
this.data.set(k, data[k]);
}
}
this.ttl = ttl ?? DEFAULT_TTL;
}
}
export interface ParticleDto {
export interface Particle {
id: string;
init_peer_id: string;
timestamp: number;
@ -65,6 +31,10 @@ export interface ParticleDto {
data: Uint8Array;
}
export const logParticle = (fn: Function, message: string, particle: Particle) => {
fn(message, particle);
};
/**
* Represents particle action to send to a node
*/
@ -79,86 +49,10 @@ interface ParticlePayload {
data: string;
}
function wrapWithVariableInjectionScript(script: string, fields: string[]): string {
fields.forEach((v) => {
script = `
(seq
(call %init_peer_id% ("__magic" "load") ["${v}"] ${v})
${script}
)
`;
});
return script;
}
function wrapWithXor(script: string): string {
return `
(xor
${script}
(seq
(call __magic_relay ("op" "identity") [])
(call %init_peer_id% ("__magic" "handle_xor") [%last_error%])
)
)`;
}
function wrapWithXorLocal(script: string): string {
return `
(xor
${script}
(call %init_peer_id% ("__magic" "handle_xor") [%last_error%])
)`;
}
export async function build(
peerId: PeerId,
relay: PeerIdB58 | undefined,
script: string,
data?: Map<string, any>,
ttl?: number,
customId?: string,
): Promise<ParticleDto> {
const id = customId ?? genUUID();
let currentTime = new Date().getTime();
if (data === undefined) {
data = new Map();
}
if (ttl === undefined) {
ttl = DEFAULT_TTL;
}
if (relay) {
data.set('__magic_relay', relay);
}
injectDataIntoParticle(id, data, ttl);
script = wrapWithVariableInjectionScript(script, Array.from(data.keys()));
if (relay) {
script = wrapWithXor(script);
} else {
script = wrapWithXorLocal(script);
}
let particle: ParticleDto = {
id: id,
init_peer_id: peerId.toB58String(),
timestamp: currentTime,
ttl: ttl,
script: script,
// TODO: sign particle
signature: '',
data: Buffer.from([]),
};
return particle;
}
/**
* Creates an action to send to a node.
*/
export function toPayload(particle: ParticleDto): ParticlePayload {
export function toPayload(particle: Particle): ParticlePayload {
return {
action: 'Particle',
id: particle.id,
@ -172,7 +66,7 @@ export function toPayload(particle: ParticleDto): ParticlePayload {
};
}
export function parseParticle(str: string): ParticleDto {
export function parseParticle(str: string): Particle {
let json = JSON.parse(str);
return {