From ba76e4919dd4e56599dc7eba39fe1a9d8316b6e9 Mon Sep 17 00:00:00 2001 From: Pavel Murygin Date: Fri, 10 Dec 2021 16:07:46 +0300 Subject: [PATCH] fix subscribtions --- src/internal/FluencePeer.ts | 145 +++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 70 deletions(-) diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 77b769a9..0501ebd3 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -461,87 +461,92 @@ export class FluencePeer { }), ); - interpreted.pipe( + const successful = interpreted.pipe( + // force new line + filter((x) => x.isInterpretationSuccessful), + ); + + const failed = interpreted.pipe( + // force new line filter((x) => !x.isInterpretationSuccessful), - tap((item) => - setTimeout(() => { - item.onStageChange({ - stage: 'interpreterError', - errorMessage: item.interpreterResult.errorMessage, - }); - return; - }, 0), - ), ); - interpreted.pipe( - filter((x) => x.isInterpretationSuccessful), - tap((item) => - setTimeout(() => { - item.onStageChange({ stage: 'interpreted' }); - }, 0), - ), + failed.subscribe((item) => + setTimeout(() => { + item.onStageChange({ + stage: 'interpreterError', + errorMessage: item.interpreterResult.errorMessage, + }); + return; + }, 0), ); - interpreted.pipe( - filter((x) => x.isInterpretationSuccessful), - filter((x) => x.hasPeerPks), - tap((item) => { + successful.subscribe((item) => + setTimeout(() => { + item.onStageChange({ stage: 'interpreted' }); + }, 0), + ); + + successful + .pipe( + // force new line + filter((x) => x.hasPeerPks), + ) + .subscribe((item) => { const newParticle = item.particle.clone(); newParticle.data = item.newData; this._outgoingParticles.next({ ...item, particle: newParticle }); - }), - ); + }); - interpreted.pipe( - filter((x) => x.isInterpretationSuccessful), - filter((x) => !x.hasCallRequests), - tap((item) => { - item.onStageChange({ stage: 'localWorkDone' }); - }), - ); - - interpreted + successful .pipe( - filter((x) => x.isInterpretationSuccessful), - filter((x) => x.hasCallRequests), - concatMap((item) => item.interpreterResult.callRequests.map(([key, cr]) => [item, key, cr] as const)), - map(([item, key, cr]) => { - const req = { - fnName: cr.functionName, - args: cr.arguments, - serviceId: cr.serviceId, - tetraplets: cr.tetraplets, - particleContext: item.particle.getParticleContext(), - }; - return [item, key, req] as const; - }), - concatMap(([item, key, req]) => { - return this._execSingleCallRequest(req) - .catch( - (err): CallServiceResult => ({ - retCode: ResultCodes.exceptionInHandler, - result: `Handler failed. fnName="${req.fnName}" serviceId="${ - req.serviceId - }" error: ${err.toString()}`, - }), - ) - .then((res) => [item, key, res] as const); - }), - map(([item, key, res]) => { - const serviceResult = { - result: w(res.result), - retCode: res.retCode, - }; - - const newParticle = item.particle.clone(); - newParticle.callResults = [[key, serviceResult]]; - newParticle.data = Buffer.from([]); - - return { particle: newParticle, onStageChange: item.onStageChange }; - }), + // force new line + filter((x) => !x.hasCallRequests), ) - .subscribe((item) => particlesQueue.next(item)); + .subscribe((item) => { + item.onStageChange({ stage: 'localWorkDone' }); + }); + + const readyToSend = successful.pipe( + filter((x) => x.hasCallRequests), + concatMap((item) => item.interpreterResult.callRequests.map(([key, cr]) => [item, key, cr] as const)), + map(([item, key, cr]) => { + const req = { + fnName: cr.functionName, + args: cr.arguments, + serviceId: cr.serviceId, + tetraplets: cr.tetraplets, + particleContext: item.particle.getParticleContext(), + }; + return [item, key, req] as const; + }), + concatMap(([item, key, req]) => { + return this._execSingleCallRequest(req) + .catch( + (err): CallServiceResult => ({ + retCode: ResultCodes.exceptionInHandler, + result: `Handler failed. fnName="${req.fnName}" serviceId="${ + req.serviceId + }" error: ${err.toString()}`, + }), + ) + .then((res) => [item, key, res] as const); + }), + map(([item, key, res]) => { + const serviceResult = { + result: w(res.result), + retCode: res.retCode, + }; + + const newParticle = item.particle.clone(); + newParticle.callResults = [[key, serviceResult]]; + newParticle.data = Buffer.from([]); + + return { particle: newParticle, onStageChange: item.onStageChange }; + }), + ); + + readyToSend.subscribe((item) => particlesQueue.next(item)); return particlesQueue; }