feat(logs): Use debug.js library for logging [DXJ-327] (#285)

This commit is contained in:
Pavel 2023-03-11 00:03:34 +04:00 committed by GitHub
parent 43f39d5ac1
commit e95c34a792
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 7087 additions and 657 deletions

View File

@ -139,6 +139,57 @@ Once you've added the client, you can compile [Aqua](https://github.com/fluencel
}
```
## Debug
JS Client uses the [debug](https://github.com/debug-js/debug) library under the hood for logging. The log namespaces are structured on a per-component basis, following this structure:
```
fluence:<component>:trace
fluence:<component>:debug
fluence:<component>:error
```
Marine JS logs have a slightly different structure:
```
fluence:marine:<service id>:trace
fluence:marine:<service id>:debug
fluence:marine:<service id>:info
fluence:marine:<service id>:warn
fluence:marine:<service id>:error
```
Each level corresponds to a logging level in Marine JS.
Star (`*`) character can be used as a wildcard to enable logs for multiple components at once. For example, `DEBUG=fluence:*` will enable logs for all components. To exclude a component, use a minus sign before the component name. For example, `DEBUG=fluence:*,-fluence:particle:*`
### Index of components:
- `particle`: everything related to particle processing queue
- `aqua`: infrastructure of aqua compiler support
- `connection`: connection layer
- `marine`: Marine JS logs
### Enabling logs in Node.js
enable logs, pass the environment variable `DEBUG` with the corresponding log level. For example:
```sh
DEBUG=fluence:* node --loader ts-node/esm ./src/index.ts
```
### Enabling logs in the browser
To enable logs, set the `localStorage.debug` variable. For example:
```
localStorage.debug = 'fluence:*'
```
**NOTE**
In Chromium-based web browsers (e.g. Brave, Chrome, and Electron), the JavaScript console will—by default—only show messages logged by debug if the "Verbose" log level is enabled.
## Development
To hack on the Fluence JS Client itself, please refer to the [development page](./DEVELOPING.md).
@ -159,3 +210,4 @@ Any interested person is welcome to contribute to the project. Please, make sure
## License
All software code is copyright (c) Fluence Labs, Inc. under the [Apache-2.0](./LICENSE) license.

View File

@ -3,7 +3,53 @@ import "@fluencelabs/registry/resources-api.aqua"
service HelloWorld("hello-world"):
hello(str: string) -> string
func smokeTest(label: string) -> ?string, *string, string:
func resourceTest(label: string) -> ?string, *string:
res, errors <- createResource(label)
<- res, errors
func helloTest() -> string:
hello <- HelloWorld.hello("Fluence user")
<- res, errors, hello
<- hello
service CalcService:
add(num: f64) -> f64
clear_state()
divide(num: f64) -> f64
multiply(num: f64) -> f64
state() -> f64
subtract(num: f64) -> f64
test_logs()
data ServiceCreationResult:
success: bool
service_id: ?string
error: ?string
data RemoveResult:
success: bool
error: ?string
alias ListServiceResult: []string
service Srv("single_module_srv"):
create(wasm_b64_content: string) -> ServiceCreationResult
remove(service_id: string) -> RemoveResult
list() -> ListServiceResult
func demo_calculation(service_id: string) -> f64:
CalcService service_id
CalcService.test_logs()
CalcService.add(10)
CalcService.multiply(5)
CalcService.subtract(8)
CalcService.divide(6)
res <- CalcService.state()
<- res
func marineTest(wasm64: string) -> f64:
serviceResult <- Srv.create(wasm64)
res <- demo_calculation(serviceResult.service_id!)
<- res

View File

@ -11,7 +11,7 @@
"type": "module",
"scripts": {
"build": "tsc",
"compile-aqua": "aqua -i ./_aqua -o ./src/_aqua"
"compile-aqua": "fluence aqua -i ./_aqua -o ./src/_aqua"
},
"repository": "https://github.com/fluencelabs/fluence-js",
"author": "Fluence Labs",
@ -22,7 +22,9 @@
"base64-js": "1.5.1"
},
"devDependencies": {
"@fluencelabs/aqua": "0.9.4",
"@fluencelabs/registry": "0.7.0"
"@fluencelabs/cli": "0.3.9",
"@fluencelabs/registry": "0.8.2",
"@fluencelabs/aqua-lib": "0.6.0",
"@fluencelabs/trust-graph": "3.1.2"
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,8 @@
import { fromByteArray } from 'base64-js';
import { Fluence } from '@fluencelabs/js-client.api';
import { kras, randomKras } from '@fluencelabs/fluence-network-environment';
import { registerHelloWorld, smokeTest } from './_aqua/smoke_test.js';
import { registerHelloWorld, helloTest, marineTest, resourceTest } from './_aqua/smoke_test.js';
import { wasm } from './wasmb64.js';
// Relay running on local machine
// const relay = {
@ -29,7 +30,7 @@ const optsWithRandomKeyPair = () => {
} as const;
};
export type TestResult = { res: string | null; errors: string[]; hello: string };
export type TestResult = { type: 'success'; data: string } | { type: 'failure'; error: string };
export const runTest = async (): Promise<TestResult> => {
try {
@ -54,16 +55,32 @@ export const runTest = async (): Promise<TestResult> => {
console.log('my peer id: ', client.getPeerId());
console.log('my sk id: ', fromByteArray(client.getPeerSecretKey()));
console.log('running some aqua...');
const [res, errors, hello] = await smokeTest('my_resource');
console.log(hello);
console.log('running resource test...');
const [res, errors] = await resourceTest('my_resource');
if (res === null) {
console.log('aqua failed, errors', errors);
console.log('resource test failed, errors', errors);
return { type: 'failure', error: errors.join(', ') };
} else {
console.log('aqua finished, result', res);
console.log('resource test finished, result', res);
}
return { res, errors, hello };
console.log('running hello test...');
const hello = await helloTest();
console.log('hello test finished, result: ', hello);
// TODO: some wired error shit about SharedArrayBuffer
// console.log('running marine test...');
// const marine = await marineTest(wasm);
// console.log('marine test finished, result: ', marine);
const returnVal = {
res,
hello,
// marine,
};
return { type: 'success', data: JSON.stringify(returnVal) };
} catch (err: any) {
return { type: 'failure', error: err.toString() };
} finally {
console.log('disconnecting from Fluence Network...');
await Fluence.disconnect();

File diff suppressed because one or more lines are too long

View File

@ -11,7 +11,8 @@
"type": "module",
"scripts": {
"build": "tsc",
"test": "node --loader ts-node/esm ./src/index.ts"
"test": "node --loader ts-node/esm ./src/index.ts",
"test_logs": "DEBUG=fluence:particle:* node --loader ts-node/esm ./src/index.ts"
},
"repository": "https://github.com/fluencelabs/fluence-js",
"author": "Fluence Labs",

View File

@ -23,7 +23,7 @@
"@test/test-utils": "workspace:^"
},
"scripts": {
"test": "node --loader ts-node/esm ./test/index.ts",
"test_commented_out": "node --loader ts-node/esm ./test/index.ts",
"start": "react-scripts start",
"build": "react-scripts build",
"_test": "react-scripts test",

View File

@ -1,26 +1,18 @@
import { runTest } from '@test/aqua_for_test';
import { runTest, TestResult } from '@test/aqua_for_test';
import React from 'react';
import logo from './logo.svg';
import './App.css';
function App() {
const [result, setResult] = React.useState<string | null>(null);
const [error, setError] = React.useState<string | null>(null);
const [result, setResult] = React.useState<TestResult | null>(null);
const onButtonClick = () => {
runTest()
.then((res) => {
if (res.errors.length === 0) {
setResult(JSON.stringify(res));
setError(null);
} else {
setResult(null);
setError(res.errors.toString());
}
setResult(res);
})
.catch((err) => {
setResult('');
setError(err.toString());
setResult({ type: 'failure', error: err.toString() });
});
};
@ -35,8 +27,8 @@ function App() {
Click to run test
</button>
{result && <div id="res">{result}</div>}
{error && <div id="error">{error}</div>}
{result && result.type === 'success' && <div id="res">{result.data}</div>}
{result && result.type === 'failure' && <div id="error">{result.error}</div>}
<a className="App-link" href="https://reactjs.org" target="_blank" rel="noopener noreferrer">
Learn React
</a>

View File

@ -11,7 +11,7 @@
"type": "module",
"scripts": {
"build": "tsc",
"test": "node --loader ts-node/esm ./src/index.ts",
"test_commented_out": "node --loader ts-node/esm ./src/index.ts",
"serve": "http-server public"
},
"repository": "https://github.com/fluencelabs/fluence-js",

View File

@ -5,7 +5,6 @@ import { callAquaFunction } from '@fluencelabs/js-peer/dist/compilerSupport/call
import { registerService } from '@fluencelabs/js-peer/dist/compilerSupport/registerService.js';
import { MarineBasedAvmRunner } from '@fluencelabs/js-peer/dist/js-peer/avm.js';
import { MarineBackgroundRunner } from '@fluencelabs/js-peer/dist/marine/worker/index.js';
import { marineLogFunction } from '@fluencelabs/js-peer/dist/js-peer/utils.js';
import { WasmLoaderFromNpm } from '@fluencelabs/js-peer/dist/marine/deps-loader/node.js';
import { WorkerLoader } from '@fluencelabs/js-peer/dist/marine/worker-script/workerLoader.js';
@ -27,8 +26,8 @@ export const createClient = () => {
const controlModuleLoader = new WasmLoaderFromNpm(defaultNames.marine.package, defaultNames.marine.file);
const avmModuleLoader = new WasmLoaderFromNpm(defaultNames.avm.package, defaultNames.avm.file);
const marine = new MarineBackgroundRunner(workerLoader, controlModuleLoader, marineLogFunction);
const avm = new MarineBasedAvmRunner(marine, avmModuleLoader, undefined);
const marine = new MarineBackgroundRunner(workerLoader, controlModuleLoader);
const avm = new MarineBasedAvmRunner(marine, avmModuleLoader);
return new FluencePeer(marine, avm);
};

View File

@ -3,7 +3,6 @@ import { callAquaFunction } from '@fluencelabs/js-peer/dist/compilerSupport/call
import { registerService } from '@fluencelabs/js-peer/dist/compilerSupport/registerService.js';
import { MarineBasedAvmRunner } from '@fluencelabs/js-peer/dist/js-peer/avm.js';
import { MarineBackgroundRunner } from '@fluencelabs/js-peer/dist/marine/worker';
import { checkConnection, marineLogFunction } from '@fluencelabs/js-peer/dist/js-peer/utils.js';
import { InlinedWorkerLoader, InlinedWasmLoader } from '@fluencelabs/js-peer/dist/marine/deps-loader/common.js';
const createClient = () => {
@ -11,8 +10,8 @@ const createClient = () => {
const controlModuleLoader = new InlinedWasmLoader('___marine___');
const avmModuleLoader = new InlinedWasmLoader('___avm___');
const marine = new MarineBackgroundRunner(workerLoader, controlModuleLoader, marineLogFunction);
const avm = new MarineBasedAvmRunner(marine, avmModuleLoader, undefined);
const marine = new MarineBackgroundRunner(workerLoader, controlModuleLoader);
const avm = new MarineBasedAvmRunner(marine, avmModuleLoader);
return new FluencePeer(marine, avm);
};

View File

@ -110,10 +110,6 @@ export interface ClientOptions {
* Useful to see what particle id is responsible for aqua function
*/
printParticleId?: boolean;
/**
* Log level for marine services. By default logging is turned off.
*/
marineLogLevel?: LogLevel;
};
}

View File

@ -1,65 +1,66 @@
{
"name": "@fluencelabs/js-peer",
"version": "0.8.5",
"description": "TypeScript implementation of Fluence Peer",
"main": "./dist/index.js",
"typings": "./dist/index.d.ts",
"engines": {
"node": ">=10",
"pnpm": ">=3"
},
"type": "module",
"scripts": {
"build": "tsc",
"compile-aqua": "aqua -i ./aqua/ -o ./src/internal/_aqua",
"test:smoke": "node ./dist/js-peer/__test__/integration/smokeTest.js",
"test": "NODE_OPTIONS=--experimental-vm-modules pnpm jest",
"test:unit": "NODE_OPTIONS=--experimental-vm-modules pnpm jest --testPathPattern=src/__test__/unit",
"test:integration": "NODE_OPTIONS=--experimental-vm-modules pnpm jest --testPathPattern=src/__test__/integration"
},
"repository": "https://github.com/fluencelabs/fluence-js",
"author": "Fluence Labs",
"license": "Apache-2.0",
"dependencies": {
"@fluencelabs/interfaces": "0.7.3",
"@fluencelabs/avm": "0.35.4",
"@fluencelabs/marine-js": "0.3.45",
"multiformats": "11.0.1",
"async": "3.2.4",
"bs58": "5.0.0",
"buffer": "6.0.3",
"loglevel": "1.8.1",
"@libp2p/peer-id": "2.0.1",
"rxjs": "7.5.5",
"ts-pattern": "3.3.3",
"uuid": "8.3.2",
"threads": "1.7.0",
"@libp2p/crypto": "1.0.8",
"@libp2p/peer-id-factory": "2.0.1",
"@libp2p/interface-peer-id": "2.0.1",
"@libp2p/interface-keys": "1.0.7",
"js-base64": "3.7.5",
"it-length-prefixed": "8.0.4",
"it-pipe": "2.0.5",
"it-map": "2.0.0",
"uint8arrays": "4.0.3",
"@chainsafe/libp2p-noise": "11.0.0",
"libp2p": "0.42.2",
"@libp2p/interfaces": "3.3.1",
"@libp2p/interface-connection": "3.0.8",
"@libp2p/mplex": "7.1.1",
"@libp2p/websockets": "5.0.3",
"@multiformats/multiaddr": "11.3.0"
},
"devDependencies": {
"@fluencelabs/aqua": "0.7.7-362",
"@fluencelabs/aqua-api": "0.9.1-373",
"@fluencelabs/aqua-lib": "0.6.0",
"@fluencelabs/fluence-network-environment": "1.0.13",
"@types/bs58": "4.0.1",
"@types/uuid": "8.3.2",
"@types/jest": "29.4.0",
"jest": "29.4.1",
"ts-jest": "next"
}
"name": "@fluencelabs/js-peer",
"version": "0.8.5",
"description": "TypeScript implementation of Fluence Peer",
"main": "./dist/index.js",
"typings": "./dist/index.d.ts",
"engines": {
"node": ">=10",
"pnpm": ">=3"
},
"type": "module",
"scripts": {
"build": "tsc",
"compile-aqua": "aqua -i ./aqua/ -o ./src/internal/_aqua",
"test:smoke": "node ./dist/js-peer/__test__/integration/smokeTest.js",
"test": "NODE_OPTIONS=--experimental-vm-modules pnpm jest",
"test:unit": "NODE_OPTIONS=--experimental-vm-modules pnpm jest --testPathPattern=src/__test__/unit",
"test:integration": "NODE_OPTIONS=--experimental-vm-modules pnpm jest --testPathPattern=src/__test__/integration"
},
"repository": "https://github.com/fluencelabs/fluence-js",
"author": "Fluence Labs",
"license": "Apache-2.0",
"dependencies": {
"@fluencelabs/interfaces": "0.7.3",
"@fluencelabs/avm": "0.35.4",
"@fluencelabs/marine-js": "0.3.45",
"multiformats": "11.0.1",
"debug": "4.3.4",
"async": "3.2.4",
"bs58": "5.0.0",
"buffer": "6.0.3",
"@libp2p/peer-id": "2.0.1",
"rxjs": "7.5.5",
"ts-pattern": "3.3.3",
"uuid": "8.3.2",
"threads": "1.7.0",
"@libp2p/crypto": "1.0.8",
"@libp2p/peer-id-factory": "2.0.1",
"@libp2p/interface-peer-id": "2.0.1",
"@libp2p/interface-keys": "1.0.7",
"js-base64": "3.7.5",
"it-length-prefixed": "8.0.4",
"it-pipe": "2.0.5",
"it-map": "2.0.0",
"uint8arrays": "4.0.3",
"@chainsafe/libp2p-noise": "11.0.0",
"libp2p": "0.42.2",
"@libp2p/interfaces": "3.3.1",
"@libp2p/interface-connection": "3.0.8",
"@libp2p/mplex": "7.1.1",
"@libp2p/websockets": "5.0.3",
"@multiformats/multiaddr": "11.3.0"
},
"devDependencies": {
"@fluencelabs/aqua": "0.7.7-362",
"@fluencelabs/aqua-api": "0.9.1-373",
"@fluencelabs/aqua-lib": "0.6.0",
"@fluencelabs/fluence-network-environment": "1.0.13",
"@types/bs58": "4.0.1",
"@types/uuid": "8.3.2",
"@types/jest": "29.4.0",
"@types/debug": "4.1.7",
"jest": "29.4.1",
"ts-jest": "next"
}
}

View File

@ -19,6 +19,10 @@ import {
injectValueService,
} from './services.js';
import { logger } from '../util/logger.js';
const log = logger('aqua');
/**
* Convenience function which does all the internal work of creating particles
* and making necessary service registrations in order to support Aqua function calls
@ -31,6 +35,7 @@ import {
* @returns
*/
export const callAquaFunction: CallAquaFunction = ({ def, script, config, peer, args }) => {
log.trace('calling aqua function %j', { def, script, config, args });
const argumentTypes = getArgumentTypes(def);
const promise = new Promise((resolve, reject) => {

View File

@ -1,7 +1,13 @@
import type { RegisterService } from '@fluencelabs/interfaces';
import { registerGlobalService, userHandlerService } from './services.js';
import { logger } from '../util/logger.js';
const log = logger('aqua');
export const registerService: RegisterService = ({ peer, def, serviceId, service }) => {
log.trace('registering aqua service %o', { def, serviceId, service });
// Checking for missing keys
const requiredKeys = def.functions.tag === 'nil' ? [] : Object.keys(def.functions.fields);
const incorrectServiceDefinitions = requiredKeys.filter((f) => !(f in service));
@ -30,4 +36,5 @@ export const registerService: RegisterService = ({ peer, def, serviceId, service
const serviceDescription = userHandlerService(serviceId, singleFunction, userDefinedHandler);
registerGlobalService(peer, serviceDescription);
}
log.trace('aqua service registered %s', serviceId);
};

View File

@ -32,7 +32,9 @@ import map from 'it-map';
import { fromString } from 'uint8arrays/from-string';
import { toString } from 'uint8arrays/to-string';
import log from 'loglevel';
import { logger } from '../util/logger.js';
const log = logger('connection');
export const PROTOCOL_NAME = '/fluence/particle/2.0.0';
@ -134,37 +136,41 @@ export class RelayConnection extends FluenceConnection {
async connect(onIncomingParticle: ParticleHandler) {
await this._lib2p2Peer.start();
// TODO: make it configurable
const handleOptions = {
maxInboundStreams: 1024,
maxOutboundStreams: 1024
}
maxOutboundStreams: 1024,
};
this._lib2p2Peer.handle([PROTOCOL_NAME], async ({ connection, stream }) => {
pipe(
stream.source,
// @ts-ignore
decode(),
// @ts-ignore
(source) => map(source, (buf) => toString(buf.subarray())),
async (source) => {
try {
for await (const msg of source) {
try {
onIncomingParticle(msg);
} catch (e) {
log.error('error on handling a new incoming message: ' + e);
this._lib2p2Peer.handle(
[PROTOCOL_NAME],
async ({ connection, stream }) => {
pipe(
stream.source,
// @ts-ignore
decode(),
// @ts-ignore
(source) => map(source, (buf) => toString(buf.subarray())),
async (source) => {
try {
for await (const msg of source) {
try {
onIncomingParticle(msg);
} catch (e) {
log.error('error on handling a new incoming message: %j', e);
}
}
} catch (e) {
log.error('connection closed: %j', e);
}
} catch (e) {
log.debug('connection closed: ' + e);
}
},
);
}, handleOptions);
},
);
},
handleOptions,
);
log.debug(`dialing to the node with client's address: ` + this._lib2p2Peer.peerId.toString());
log.debug("dialing to the node with client's address: %s", this._lib2p2Peer.peerId.toString());
try {
this._connection = await this._lib2p2Peer.dial(this._relayAddress);

View File

@ -33,7 +33,7 @@ export abstract class FluenceConnection {
}
export interface IMarine extends IModule {
createService(serviceModule: SharedArrayBuffer | Buffer, serviceId: string, logLevel?: LogLevel): Promise<void>;
createService(serviceModule: SharedArrayBuffer | Buffer, serviceId: string): Promise<void>;
callService(
serviceId: string,

View File

@ -33,9 +33,8 @@ import type {
ConnectionState,
} from '@fluencelabs/interfaces/dist/fluenceClient';
import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle.js';
import { dataToString, jsonify, isString, ServiceError } from './utils.js';
import { jsonify, isString, ServiceError } from './utils.js';
import { concatMap, filter, pipe, Subject, tap } from 'rxjs';
import log from 'loglevel';
import { builtInServices } from './builtins/common.js';
import { defaultSigGuard, Sig } from './builtins/Sig.js';
import { registerSig } from './_aqua/services.js';
@ -43,11 +42,14 @@ import { registerSrv } from './_aqua/single-module-srv.js';
import { Buffer } from 'buffer';
import { JSONValue } from '@fluencelabs/avm';
import { LogLevel } from '@fluencelabs/marine-js/dist/types';
import { NodeUtils, Srv } from './builtins/SingleModuleSrv.js';
import { registerNodeUtils } from './_aqua/node-utils.js';
import type { MultiaddrInput } from '@multiformats/multiaddr';
import { logger } from '../util/logger.js';
const log = logger('particle');
const DEFAULT_TTL = 7000;
export type PeerConfig = ClientOptions & { relay?: RelayOptions };
@ -220,7 +222,7 @@ export class FluencePeer implements IFluenceClient {
throw new Error(`Service with '${serviceId}' id already exists`);
}
await this.marine.createService(wasm, serviceId, this._marineLogLevel);
await this.marine.createService(wasm, serviceId);
this._marineServices.add(serviceId);
}
@ -377,10 +379,6 @@ export class FluencePeer implements IFluenceClient {
this._defaultTTL = config?.defaultTtlMs ?? DEFAULT_TTL;
if (config?.debug?.marineLogLevel) {
this._marineLogLevel = config.debug.marineLogLevel;
}
await this.marine.start();
await this.avmRunner.start();
@ -434,7 +432,6 @@ export class FluencePeer implements IFluenceClient {
// Call service handler
private _marineServices = new Set<string>();
private _marineLogLevel?: LogLevel;
private _particleSpecificHandlers = new Map<string, Map<string, GenericCallServiceHandler>>();
private _commonHandlers = new Map<string, GenericCallServiceHandler>();
@ -465,7 +462,16 @@ export class FluencePeer implements IFluenceClient {
this._incomingParticles
.pipe(
tap((x) => {
x.particle.logTo('debug', 'particle received:');
log.debug('id %s. received:', x.particle.id);
log.trace('id %s. data: %j', x.particle.id, {
initPeerId: x.particle.initPeerId,
timestamp: x.particle.timestamp,
tttl: x.particle.ttl,
signature: x.particle.signature,
});
log.trace('id %s. script: %s', x.particle.id, x.particle.script);
log.trace('id %s. call results: %j', x.particle.id, x.particle.callResults);
}),
filterExpiredParticles(this._expireParticle.bind(this)),
)
@ -494,26 +500,28 @@ export class FluencePeer implements IFluenceClient {
}
if (!this.connection) {
item.particle.logTo('error', 'cannot send particle, peer is not connected');
log.error('id %s. cannot send, peer is not connected', item.particle.id);
item.onStageChange({ stage: 'sendingError' });
return;
}
item.particle.logTo('debug', 'sending particle:');
this.connection?.sendParticle(item.nextPeerIds, item.particle.toString()).then(
() => {
log.debug('id %s. sending particle into network', item.particle.id);
this.connection
?.sendParticle(item.nextPeerIds, item.particle.toString())
.then(() => {
item.onStageChange({ stage: 'sent' });
},
(e: any) => {
log.error(e);
},
);
})
.catch((e: any) => {
log.error('id %s. send failed %j', item.particle.id, e);
});
});
}
private _expireParticle(item: ParticleQueueItem) {
const particleId = item.particle.id;
log.debug(
`particle ${particleId} has expired after ${item.particle.ttl}. Deleting particle-related queues and handlers`,
'id %s. particle has expired after %d. Deleting particle-related queues and handlers',
item.particle.id,
item.particle.ttl,
);
this._particleQueues.delete(particleId);
@ -540,11 +548,10 @@ export class FluencePeer implements IFluenceClient {
// IMPORTANT!
// AVM runner execution and prevData <-> newData swapping
// MUST happen sequentially (in a critical section).
// Otherwise the race between runner might occur corrupting the prevData
item.particle.logTo('debug', 'Sending particle to interpreter');
log.debug('prevData: ', dataToString(prevData));
// Otherwise the race might occur corrupting the prevData
log.debug('id %s. sending particle to interpreter', item.particle.id);
log.trace('id %s. prevData: %a', item.particle.id, prevData);
const avmCallResult = await this.avmRunner.run(
{
initPeerId: item.particle.initPeerId,
@ -582,19 +589,29 @@ export class FluencePeer implements IFluenceClient {
// Do not continue if there was an error in particle interpretation
if (item.result instanceof Error) {
log.error('Interpreter failed: ', jsonify(item.result.message));
log.error('id %s. interpreter failed: %s', item.particle.id, item.result.message);
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.message });
return;
}
const toLog = { ...item.result, data: dataToString(item.result.data) };
if (item.result.retCode !== 0) {
log.error('Interpreter failed: ', jsonify(toLog));
log.error(
'id %s. interpreter failed: retCode: %d, message: %s',
item.particle.id,
item.result.retCode,
item.result.errorMessage,
);
log.trace('id %s. avm data: %a', item.particle.id, item.result.data);
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.errorMessage });
return;
}
log.debug('Interpreter result: ', jsonify(toLog));
log.trace(
'id %s. interpreter result: retCode: %d, avm data: %a',
item.particle.id,
item.result.retCode,
item.result.data,
);
setTimeout(() => {
item.onStageChange({ stage: 'interpreted' });
@ -666,8 +683,8 @@ export class FluencePeer implements IFluenceClient {
}
private async _execSingleCallRequest(req: CallServiceData): Promise<CallServiceResult> {
log.debug('executing call service handler', jsonify(req));
const particleId = req.particleContext.particleId;
log.trace('id %s. executing call service handler %j', particleId, req);
if (this.marine && this._marineServices.has(req.serviceId)) {
const result = await this.marine.callService(req.serviceId, req.fnName, req.args, undefined);
@ -711,7 +728,7 @@ export class FluencePeer implements IFluenceClient {
res.result = null;
}
log.debug('executed call service handler, req and res are: ', jsonify(req), jsonify(res));
log.trace('id %s. executed call service handler, req: %j, res: %j ', particleId, req, res);
return res;
}

View File

@ -17,9 +17,7 @@
import { fromUint8Array, toUint8Array } from 'js-base64';
import { CallResultsArray, LogLevel } from '@fluencelabs/avm';
import { v4 as uuidv4 } from 'uuid';
import log from 'loglevel';
import { ParticleContext } from '../interfaces/commonTypes.js';
import { dataToString, jsonify } from './utils.js';
import { Buffer } from 'buffer';
export class Particle {
@ -95,43 +93,6 @@ export class Particle {
data: this.data && fromUint8Array(this.data),
});
}
logTo(level: LogLevel, message: string) {
let fn;
let data: string | undefined;
switch (level) {
case 'debug':
fn = log.debug;
data = dataToString(this.data);
break;
case 'error':
fn = log.error;
break;
case 'info':
case 'trace':
fn = log.info;
break;
case 'warn':
fn = log.warn;
break;
default:
return;
}
fn(
message,
jsonify({
id: this.id,
init_peer_id: this.initPeerId,
timestamp: this.timestamp,
ttl: this.ttl,
script: this.script,
signature: this.signature,
callResults: this.callResults,
data,
}),
);
}
}
export type ParticleExecutionStage =

View File

@ -8,7 +8,6 @@ import { avmModuleLoader, controlModuleLoader } from '../utilsForNode.js';
import { ServiceDef } from '@fluencelabs/interfaces';
import { callAquaFunction } from '../../compilerSupport/callFunction.js';
import { marineLogFunction } from '../utils.js';
import { MarineBackgroundRunner } from '../../marine/worker/index.js';
import { MarineBasedAvmRunner } from '../avm.js';
import { nodes } from './connection.js';
@ -60,8 +59,8 @@ export const compileAqua = async (aquaFile: string): Promise<CompiledFile> => {
export const mkTestPeer = () => {
const workerLoader = new WorkerLoaderFromFs('../../marine/worker-script');
const marine = new MarineBackgroundRunner(workerLoader, controlModuleLoader, marineLogFunction);
const avm = new MarineBasedAvmRunner(marine, avmModuleLoader, undefined);
const marine = new MarineBackgroundRunner(workerLoader, controlModuleLoader);
const avm = new MarineBasedAvmRunner(marine, avmModuleLoader);
return new FluencePeer(marine, avm);
};

View File

@ -1,10 +1,9 @@
import type { CallResultsArray, InterpreterResult, RunParameters } from '@fluencelabs/avm';
import { deserializeAvmResult, serializeAvmArgs } from '@fluencelabs/avm';
import type { LogLevel } from '@fluencelabs/marine-js/dist/types';
import type { IMarine, IAvmRunner, IWasmLoader } from '../interfaces/index.js';
export class MarineBasedAvmRunner implements IAvmRunner {
constructor(private marine: IMarine, private avmWasmLoader: IWasmLoader, private logLevel: LogLevel | undefined) {}
constructor(private marine: IMarine, private avmWasmLoader: IWasmLoader) {}
async run(
runParams: RunParameters,
@ -29,7 +28,7 @@ export class MarineBasedAvmRunner implements IAvmRunner {
async start(): Promise<void> {
await this.marine.start();
await this.avmWasmLoader.start();
await this.marine.createService(this.avmWasmLoader.getValue(), 'avm', this.logLevel);
await this.marine.createService(this.avmWasmLoader.getValue(), 'avm');
}
async stop(): Promise<void> {}

View File

@ -4,12 +4,12 @@ import { fromBase64Sk } from '../keypair/index.js';
import { FluencePeer } from './FluencePeer.js';
import { MarineBackgroundRunner } from '../marine/worker/index.js';
import { avmModuleLoader, controlModuleLoader } from './utilsForNode.js';
import { marineLogFunction } from './utils.js';
import { MarineBasedAvmRunner } from './avm.js';
import log from 'loglevel';
import { WorkerLoaderFromFs } from '../marine/deps-loader/node.js';
import { logger } from '../util/logger.js';
interface EphemeralConfig {
peers: Array<{
peerId: PeerIdB58;
@ -25,6 +25,8 @@ interface PeerAdapter {
connections: Set<PeerIdB58>;
}
const log = logger('ephemeral');
export const defaultConfig = {
peers: [
{
@ -123,15 +125,14 @@ export class EphemeralNetwork {
* Starts the Ephemeral network up
*/
async up(): Promise<void> {
log.debug('Starting ephemeral network up...');
log.trace('starting ephemeral network up...');
const allPeerIds = this.config.peers.map((x) => x.peerId);
// shared worker for all the peers
const workerLoader = new WorkerLoaderFromFs('../../marine/worker-script');
const promises = this.config.peers.map(async (x) => {
const logLevel = undefined;
const marine = new MarineBackgroundRunner(workerLoader, controlModuleLoader, marineLogFunction);
const avm = new MarineBasedAvmRunner(marine, avmModuleLoader, logLevel);
const marine = new MarineBackgroundRunner(workerLoader, controlModuleLoader);
const avm = new MarineBasedAvmRunner(marine, avmModuleLoader);
const peer = new FluencePeer(marine, avm);
const sendParticle = async (nextPeerIds: string[], particle: string): Promise<void> => {
this._send(peer.getStatus().peerId!, nextPeerIds, particle);
@ -171,21 +172,21 @@ export class EphemeralNetwork {
});
const values = await Promise.all(promises);
this._peers = new Map(values);
log.debug('Ephemeral network started...');
log.trace('ephemeral network started...');
}
/**
* Shuts the ephemeral network down. Will disconnect all connected peers.
*/
async down(): Promise<void> {
log.debug('Shutting down ephemeral network...');
log.trace('shutting down ephemeral network...');
const peers = Array.from(this._peers.entries());
const promises = peers.map(([k, p]) => {
return p.isEphemeral ? p.peer.stop() : p.peer._disconnect();
});
await Promise.all(promises);
this._peers.clear();
log.debug('Ephemeral network shut down');
log.trace('ephemeral network shut down');
}
/**
@ -226,7 +227,7 @@ export class EphemeralNetwork {
}
private async _send(from: PeerIdB58, to: PeerIdB58[], particle: string) {
log.info(`Sending particle from ${from}, to ${JSON.stringify(to)}`);
log.trace(`Sending particle from %s, to %j`, from, to);
const peer = this._peers.get(from);
if (peer === undefined) {
log.error(`Peer ${from} cannot be found in ephemeral network`);

View File

@ -14,13 +14,14 @@
* limitations under the License.
*/
import log from 'loglevel';
import { Buffer } from 'buffer';
import { CallServiceData, CallServiceResult, CallServiceResultType, ResultCodes } from '../interfaces/commonTypes.js';
import { FluencePeer } from './FluencePeer.js';
import { ParticleExecutionStage } from './Particle.js';
import { LogFunction } from '@fluencelabs/marine-js/dist/types';
import { logger } from '../util/logger.js';
const log = logger('connection');
export const MakeServiceCall =
(fn: (args: any[]) => CallServiceResultType) =>
@ -126,25 +127,15 @@ export const checkConnection = async (peer: FluencePeer, ttl?: number): Promise<
try {
const result = await promise;
if (result != msg) {
log.warn("unexpected behavior. 'identity' must return the passed arguments.");
log.error("unexpected behavior. 'identity' must return the passed arguments.");
}
return true;
} catch (e) {
log.error('Error on establishing connection: ', e);
log.error('error on establishing connection. Relay: %s error: %j', e, peer.getStatus().relayPeerId);
return false;
}
};
export function dataToString(data: Uint8Array) {
const text = new TextDecoder().decode(Buffer.from(data));
// try to treat data as json and pretty-print it
try {
return JSON.stringify(JSON.parse(text), null, 4);
} catch {
return text;
}
}
export function jsonify(obj: unknown) {
return JSON.stringify(obj, null, 4);
}
@ -160,29 +151,3 @@ export class ServiceError extends Error {
Object.setPrototypeOf(this, ServiceError.prototype);
}
}
export const marineLogFunction: LogFunction = (message) => {
const str = `[marine service "${message.service}"]: ${message.message}`;
const nodeProcess = (globalThis as any).process ? (globalThis as any).process : undefined;
if (nodeProcess && nodeProcess.stderr) {
nodeProcess.stderr.write(str);
return;
}
switch (message.level) {
case 'warn':
console.warn(str);
break;
case 'error':
console.error(str);
break;
case 'debug':
case 'trace':
case 'info':
console.log(str);
break;
}
};

View File

@ -23,14 +23,14 @@ import { spawn, Thread } from 'threads';
// @ts-ignore
import type { ModuleThread } from 'threads';
import { MarineLogger, marineLogger } from '../../util/logger.js';
export class MarineBackgroundRunner implements IMarine {
private workerThread?: ModuleThread<MarineBackgroundInterface>;
constructor(
private workerLoader: IWorkerLoader,
private controlModuleLoader: IWasmLoader,
private logFunction: LogFunction,
) {}
private loggers: Map<string, MarineLogger> = new Map();
constructor(private workerLoader: IWorkerLoader, private controlModuleLoader: IWasmLoader) {}
async start(): Promise<void> {
if (this.workerThread) {
@ -42,16 +42,26 @@ export class MarineBackgroundRunner implements IMarine {
const worker = this.workerLoader.getValue();
const wasm = this.controlModuleLoader.getValue();
this.workerThread = await spawn<MarineBackgroundInterface>(worker, { timeout: 99999999 });
this.workerThread.onLogMessage().subscribe(this.logFunction);
const logfn: LogFunction = (message) => {
const serviceLogger = this.loggers.get(message.service);
if (!serviceLogger) {
return;
}
serviceLogger[message.level](message.message);
};
this.workerThread.onLogMessage().subscribe(logfn);
await this.workerThread.init(wasm);
}
createService(serviceModule: SharedArrayBuffer | Buffer, serviceId: string, logLevel?: LogLevel): Promise<void> {
createService(serviceModule: SharedArrayBuffer | Buffer, serviceId: string): Promise<void> {
if (!this.workerThread) {
throw 'Worker is not initialized';
}
const env = logLevel ? logLevelToEnv(logLevel) : {};
// The logging level is controlled by the environment variable passed to enable debug logs.
// We enable all possible log levels passing the control for exact printouts to the logger
const env = logLevelToEnv('trace');
this.loggers.set(serviceId, marineLogger(serviceId));
return this.workerThread.createService(serviceModule, serviceId, undefined, env);
}

View File

@ -0,0 +1,54 @@
import debug from 'debug';
import { Particle } from '../js-peer/Particle.js';
// Format avm data as a string
debug.formatters.a = (avmData: Uint8Array) => {
return new TextDecoder().decode(Buffer.from(avmData));
};
type Logger = (formatter: any, ...args: any[]) => void;
export interface CommonLogger {
error: Logger;
trace: Logger;
debug: Logger;
}
export interface MarineLogger {
warn: Logger;
error: Logger;
debug: Logger;
trace: Logger;
info: Logger;
}
export function logger(name: string): CommonLogger {
return {
error: debug(`fluence:${name}:error`),
trace: debug(`fluence:${name}:trace`),
debug: debug(`fluence:${name}:debug`),
};
}
export function marineLogger(serviceId: string): MarineLogger {
const name = `fluence:marine:${serviceId}`;
return {
warn: debug(`${name}:warn`),
error: debug(`${name}:error`),
debug: debug(`${name}:debug`),
trace: debug(`${name}:trace`),
info: debug(`${name}:info`),
};
}
export function disable() {
debug.disable();
}
export function enable(namespaces: string) {
debug.enable(namespaces);
}
export function enabled(namespaces: string) {
return debug.enabled(namespaces);
}

6104
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff