diff --git a/packages/core/js-client-isomorphic/src/types.ts b/packages/core/js-client-isomorphic/src/types.ts index 6e81bf27..01b84814 100644 --- a/packages/core/js-client-isomorphic/src/types.ts +++ b/packages/core/js-client-isomorphic/src/types.ts @@ -14,7 +14,8 @@ * limitations under the License. */ -import { Worker } from "@fluencelabs/threads/master"; +import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker"; +import { ModuleThread } from "@fluencelabs/threads/master"; import versions from "./versions.js"; @@ -23,7 +24,7 @@ type VersionedPackage = { name: string; version: string }; export type GetWorkerFn = ( pkg: FetchedPackages, CDNUrl: string, -) => Promise; +) => Promise>; export const getVersionedPackage = (pkg: FetchedPackages): VersionedPackage => { return { diff --git a/packages/core/js-client-isomorphic/src/worker-resolvers/browser.ts b/packages/core/js-client-isomorphic/src/worker-resolvers/browser.ts index 7e2b1e71..8eddff98 100644 --- a/packages/core/js-client-isomorphic/src/worker-resolvers/browser.ts +++ b/packages/core/js-client-isomorphic/src/worker-resolvers/browser.ts @@ -14,7 +14,8 @@ * limitations under the License. */ -import { BlobWorker } from "@fluencelabs/threads/master"; +import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker"; +import { BlobWorker, ModuleThread, spawn } from "@fluencelabs/threads/master"; import { fetchResource } from "../fetchers/browser.js"; import type { FetchedPackages, GetWorkerFn } from "../types.js"; @@ -34,5 +35,9 @@ export const getWorker: GetWorkerFn = async ( }; const workerCode = await fetchWorkerCode(); - return BlobWorker.fromText(workerCode); + + const workerThread: ModuleThread = + await spawn(BlobWorker.fromText(workerCode)); + + return workerThread; }; diff --git a/packages/core/js-client-isomorphic/src/worker-resolvers/node.ts b/packages/core/js-client-isomorphic/src/worker-resolvers/node.ts index 44c49d70..09602a26 100644 --- a/packages/core/js-client-isomorphic/src/worker-resolvers/node.ts +++ b/packages/core/js-client-isomorphic/src/worker-resolvers/node.ts @@ -18,12 +18,13 @@ import { createRequire } from "module"; import { dirname, relative } from "path"; import { fileURLToPath } from "url"; -import { Worker } from "@fluencelabs/threads/master"; +import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker"; +import { ModuleThread, spawn, Worker } from "@fluencelabs/threads/master"; import type { FetchedPackages, GetWorkerFn } from "../types.js"; import { getVersionedPackage } from "../types.js"; -export const getWorker: GetWorkerFn = (pkg: FetchedPackages) => { +export const getWorker: GetWorkerFn = async (pkg: FetchedPackages) => { const require = createRequire(import.meta.url); const pathToThisFile = dirname(fileURLToPath(import.meta.url)); @@ -33,5 +34,8 @@ export const getWorker: GetWorkerFn = (pkg: FetchedPackages) => { const relativePathToWorker = relative(pathToThisFile, pathToWorker); - return Promise.resolve(new Worker(relativePathToWorker)); + const workerThread: ModuleThread = + await spawn(new Worker(relativePathToWorker)); + + return workerThread; }; diff --git a/packages/core/js-client/src/jsPeer/FluencePeer.ts b/packages/core/js-client/src/jsPeer/FluencePeer.ts index 926788b4..a6de998e 100644 --- a/packages/core/js-client/src/jsPeer/FluencePeer.ts +++ b/packages/core/js-client/src/jsPeer/FluencePeer.ts @@ -356,9 +356,8 @@ export abstract class FluencePeer { await this.connection.sendParticle(item.result.nextPeerPks, newParticle); log_particle.trace("id %s. send successful", newParticle.id); } catch (e) { - log_particle.error("id %s. send failed %j", newParticle.id, e); - const message = getErrorMessage(e); + log_particle.error("id %s. send failed %s", newParticle.id, message); item.onError( new SendError( diff --git a/packages/core/js-client/src/marine/loader.ts b/packages/core/js-client/src/marine/loader.ts index 1c64042f..678f3400 100644 --- a/packages/core/js-client/src/marine/loader.ts +++ b/packages/core/js-client/src/marine/loader.ts @@ -16,18 +16,19 @@ import { fetchResource } from "@fluencelabs/js-client-isomorphic/fetcher"; import { getWorker } from "@fluencelabs/js-client-isomorphic/worker-resolver"; -import { Worker } from "@fluencelabs/threads/master"; +import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker"; +import type { ModuleThread } from "@fluencelabs/threads/master"; type StrategyReturnType = [ marineJsWasm: ArrayBuffer, avmWasm: ArrayBuffer, - worker: Worker, + worker: ModuleThread, ]; export const loadMarineDeps = async ( CDNUrl: string, ): Promise => { - const [marineJsWasm, avmWasm] = await Promise.all([ + const [marineJsWasm, avmWasm, worker] = await Promise.all([ fetchResource( "@fluencelabs/marine-js", "/dist/marine-js.wasm", @@ -38,10 +39,8 @@ export const loadMarineDeps = async ( fetchResource("@fluencelabs/avm", "/dist/avm.wasm", CDNUrl).then((res) => { return res.arrayBuffer(); }), + getWorker("@fluencelabs/marine-worker", CDNUrl), ]); - // TODO: load worker in parallel with avm and marine, test that it works - const worker = await getWorker("@fluencelabs/marine-worker", CDNUrl); - return [marineJsWasm, avmWasm, worker]; }; diff --git a/packages/core/js-client/src/marine/worker/index.ts b/packages/core/js-client/src/marine/worker/index.ts index 232b3cb9..66c665cc 100644 --- a/packages/core/js-client/src/marine/worker/index.ts +++ b/packages/core/js-client/src/marine/worker/index.ts @@ -21,51 +21,29 @@ import type { JSONValueNonNullable, CallParameters, } from "@fluencelabs/marine-worker"; -import { - ModuleThread, - Thread, - spawn, - Worker, -} from "@fluencelabs/threads/master"; +import { ModuleThread, Thread } from "@fluencelabs/threads/master"; import { MarineLogger, marineLogger } from "../../util/logger.js"; import { IMarineHost } from "../interfaces.js"; export class MarineBackgroundRunner implements IMarineHost { - private workerThread?: ModuleThread; - private loggers = new Map(); constructor( private marineJsWasm: ArrayBuffer, private avmWasm: ArrayBuffer, - private worker: Worker, + private workerThread: ModuleThread, ) {} async hasService(serviceId: string) { - if (this.workerThread === undefined) { - throw new Error("Worker is not initialized"); - } - return this.workerThread.hasService(serviceId); } async removeService(serviceId: string) { - if (this.workerThread === undefined) { - throw new Error("Worker is not initialized"); - } - await this.workerThread.removeService(serviceId); } async start(): Promise { - if (this.workerThread !== undefined) { - throw new Error("Worker thread already initialized"); - } - - const workerThread: ModuleThread = - await spawn(this.worker); - const logfn: LogFunction = (message) => { const serviceLogger = this.loggers.get(message.service); @@ -76,9 +54,8 @@ export class MarineBackgroundRunner implements IMarineHost { serviceLogger[message.level](message.message); }; - workerThread.onLogMessage().subscribe(logfn); - await workerThread.init(this.marineJsWasm); - this.workerThread = workerThread; + this.workerThread.onLogMessage().subscribe(logfn); + await this.workerThread.init(this.marineJsWasm); await this.createService(this.avmWasm, "avm"); } @@ -86,10 +63,6 @@ export class MarineBackgroundRunner implements IMarineHost { serviceModule: ArrayBuffer | SharedArrayBuffer, serviceId: string, ): Promise { - if (this.workerThread === undefined) { - throw new Error("Worker is not initialized"); - } - this.loggers.set(serviceId, marineLogger(serviceId)); await this.workerThread.createService(serviceModule, serviceId); } @@ -100,10 +73,6 @@ export class MarineBackgroundRunner implements IMarineHost { args: Array | Record, callParams?: CallParameters, ): Promise { - if (this.workerThread === undefined) { - throw new Error("Worker is not initialized"); - } - return this.workerThread.callService( serviceId, functionName, @@ -113,10 +82,6 @@ export class MarineBackgroundRunner implements IMarineHost { } async stop(): Promise { - if (this.workerThread === undefined) { - return; - } - await this.workerThread.terminate(); await Thread.terminate(this.workerThread); }