Pushing a stream into a stream (#344)

* Remove premature handling of pushing a stream into a stream

* Handling PushToStreamTag with literal tmpName

* Handling PushToStreamTag on Topology level

* Compilation bugfix

* Better handling for CanonicalizeTag

* Mutable i
This commit is contained in:
Dmitry Kurinskiy 2021-10-27 14:21:12 +03:00 committed by GitHub
parent 1bd640e773
commit 5dd4ea6b0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 104 additions and 84 deletions

View File

@ -154,36 +154,9 @@ case class FuncCallable(
val (ops, rets) = (call.exportTo zip resolvedResult)
.map[(Option[FuncOp], ValueModel)] {
case (exp @ Call.Export(_, StreamType(_)), res) if isStream(res) =>
// TODO move this logic to ReturnSem
// Fix for https://github.com/fluencelabs/aqua/issues/277
val definedNames =
FuncOp(callableFuncBody).definesVarNames.value ++ resolvedExports.keySet
val resName = ValueModel.varName(res).getOrElse(exp.name)
val opaqueName = LazyList.from(0, 1).map(n => s"$resName-$n").collectFirst {
case n if !definedNames(n) => n
}
val opaqueType = res.`type` match {
case StreamType(s) => ArrayType(s)
case _ => res.`type`
}
opaqueName.map(opN =>
FuncOps.seq(
FuncOps.can(res, Call.Export(opN, opaqueType)),
FuncOps.ap(
VarModel(
opN,
opaqueType
),
exp
)
)
) -> exp.model
case (exp @ Call.Export(_, StreamType(_)), res) =>
// pass nested function results to a stream
Some(FuncOps.ap(res, exp)) -> exp.model
Some(FuncOps.pushToStream(res, exp)) -> exp.model
case (_, res) =>
None -> res
}

View File

@ -25,7 +25,9 @@ case class FuncOp(tree: Cofree[Chain, RawTag]) extends Model {
def definesVarNames: Eval[Set[String]] = cata[Set[String]] {
case (CallArrowTag(_, Call(_, exportTo)), acc) if exportTo.nonEmpty =>
Eval.later(acc.foldLeft(exportTo.map(_.name).toSet)(_ ++ _))
case (ApTag(_, exportTo), acc) =>
case (PushToStreamTag(_, exportTo), acc) =>
Eval.later(acc.foldLeft(Set(exportTo.name))(_ ++ _))
case (CanonicalizeTag(_, exportTo), acc) =>
Eval.later(acc.foldLeft(Set(exportTo.name))(_ ++ _))
case (CallServiceTag(_, _, Call(_, exportTo)), acc) if exportTo.nonEmpty =>
Eval.later(acc.foldLeft(exportTo.map(_.name).toSet)(_ ++ _))
@ -38,7 +40,9 @@ case class FuncOp(tree: Cofree[Chain, RawTag]) extends Model {
Eval.later(acc.foldLeft(exportTo.map(_.name).toSet)(_ ++ _))
case (CallServiceTag(_, _, Call(_, exportTo)), acc) if exportTo.nonEmpty =>
Eval.later(acc.foldLeft(exportTo.map(_.name).toSet)(_ ++ _))
case (ApTag(_, exportTo), acc) =>
case (PushToStreamTag(_, exportTo), acc) =>
Eval.later(acc.foldLeft(Set(exportTo.name))(_ ++ _))
case (CanonicalizeTag(_, exportTo), acc) =>
Eval.later(acc.foldLeft(Set(exportTo.name))(_ ++ _))
case (_, acc) => Eval.later(acc.foldLeft(Set.empty[String])(_ ++ _))
}
@ -49,7 +53,9 @@ case class FuncOp(tree: Cofree[Chain, RawTag]) extends Model {
Eval.later(acc.foldLeft(call.argVarNames)(_ ++ _))
case (CallServiceTag(_, _, call), acc) =>
Eval.later(acc.foldLeft(call.argVarNames)(_ ++ _))
case (ApTag(operand, _), acc) =>
case (PushToStreamTag(operand, _), acc) =>
Eval.later(acc.foldLeft(ValueModel.varName(operand).toSet)(_ ++ _))
case (CanonicalizeTag(operand, _), acc) =>
Eval.later(acc.foldLeft(ValueModel.varName(operand).toSet)(_ ++ _))
case (MatchMismatchTag(a, b, _), acc) =>
Eval.later(acc.foldLeft(ValueModel.varName(a).toSet ++ ValueModel.varName(b))(_ ++ _))
@ -73,7 +79,10 @@ case class FuncOp(tree: Cofree[Chain, RawTag]) extends Model {
} match {
case c: CallArrowTag => c.copy(call = c.call.mapExport(n => vals.getOrElse(n, n)))
case c: CallServiceTag => c.copy(call = c.call.mapExport(n => vals.getOrElse(n, n)))
case a: ApTag => a.copy(exportTo = a.exportTo.mapName(n => vals.getOrElse(n, n)))
case a: PushToStreamTag =>
a.copy(exportTo = a.exportTo.mapName(n => vals.getOrElse(n, n)))
case a: CanonicalizeTag =>
a.copy(exportTo = a.exportTo.mapName(n => vals.getOrElse(n, n)))
case a: AssignmentTag => a.copy(assignTo = vals.getOrElse(a.assignTo, a.assignTo))
case t: ForTag if vals.contains(t.item) => t.copy(item = vals(t.item))
case t: NextTag if vals.contains(t.item) => t.copy(item = vals(t.item))

View File

@ -10,18 +10,17 @@ object FuncOps {
def noop: FuncOp =
FuncOp.leaf(CallServiceTag(LiteralModel.quote("op"), "noop", Call(Nil, Nil)))
def ap(what: ValueModel, to: Call.Export): FuncOp =
def pushToStream(what: ValueModel, to: Call.Export): FuncOp =
FuncOp.leaf(
ApTag(what, to)
PushToStreamTag(what, to)
)
/**
* Canonicalizes [[what]] into [[to]], [[what]] is expected to be a stream.
* As we don't have canonicalization at the moment, op identity is used instead
* Canonicalizes [[what]] into [[to]], [[what]] is expected to be a stream
*/
def can(what: ValueModel, to: Call.Export): FuncOp =
def canonicalize(what: ValueModel, to: Call.Export): FuncOp =
FuncOp.leaf(
CallServiceTag(LiteralModel.quote("op"), "identity", Call(what :: Nil, to :: Nil))
CanonicalizeTag(what, to)
)
def callService(srvId: ValueModel, funcName: String, call: Call): FuncOp =

View File

@ -22,8 +22,13 @@ sealed trait RawTag {
funcName,
call.mapValues(f)
)
case ApTag(operand, exportTo) =>
ApTag(
case PushToStreamTag(operand, exportTo) =>
PushToStreamTag(
f(operand),
exportTo
)
case CanonicalizeTag(operand, exportTo) =>
CanonicalizeTag(
f(operand),
exportTo
)
@ -89,6 +94,10 @@ case class CallServiceTag(
override def toString: String = s"(call _ ($serviceId $funcName) $call)"
}
case class ApTag(operand: ValueModel, exportTo: Call.Export) extends RawTag {
override def toString: String = s"(ap $operand $exportTo)"
case class PushToStreamTag(operand: ValueModel, exportTo: Call.Export) extends RawTag {
override def toString: String = s"(push $operand $exportTo)"
}
case class CanonicalizeTag(operand: ValueModel, exportTo: Call.Export) extends RawTag {
override def toString: String = s"(can $operand $exportTo)"
}

View File

@ -26,7 +26,7 @@ case class ArgsFromService(dataServiceId: ValueModel, names: List[(String, DataT
item,
VarModel(iter, ArrayType(t.element), Chain.empty),
FuncOps.seq(
FuncOps.ap(VarModel(item, t.element), Call.Export(name, t)),
FuncOps.pushToStream(VarModel(item, t.element), Call.Export(name, t)),
FuncOps.next(item)
)
)

View File

@ -3,13 +3,14 @@ package aqua.model.transform.res
import aqua.model.func.Call
import aqua.model.func.raw.*
import aqua.model.transform.topology.Topology.Res
import aqua.model.{LiteralModel, ValueModel}
import aqua.model.{LiteralModel, ValueModel, VarModel}
import aqua.types.{ArrayType, StreamType}
import cats.Eval
import cats.data.Chain
import cats.free.Cofree
object MakeRes {
val nilTail: Eval[Chain[Cofree[Chain, ResolvedOp]]] = Eval.now(Chain.empty)
val nilTail: Eval[Chain[Res]] = Eval.now(Chain.empty)
def leaf(op: ResolvedOp): Res = Cofree[Chain, ResolvedOp](op, nilTail)
@ -31,24 +32,59 @@ object MakeRes {
def noop(onPeer: ValueModel): Res =
leaf(CallServiceRes(LiteralModel.quote("op"), "noop", CallRes(Nil, None), onPeer))
def resolve(
currentPeerId: Option[ValueModel]
): PartialFunction[RawTag, ResolvedOp] = {
case SeqTag => SeqRes
case _: OnTag => SeqRes
case MatchMismatchTag(a, b, s) => MatchMismatchRes(a, b, s)
case ForTag(item, iter) => FoldRes(item, iter)
case ParTag | ParTag.Detach => ParRes
case XorTag | XorTag.LeftBiased => XorRes
case NextTag(item) => NextRes(item)
case ApTag(operand, exportTo) => ApRes(operand, exportTo)
case CallServiceTag(serviceId, funcName, Call(args, exportTo)) =>
def canon(onPeer: ValueModel, operand: ValueModel, target: Call.Export): Res =
leaf(
CallServiceRes(
serviceId,
funcName,
CallRes(args, exportTo.headOption),
LiteralModel.quote("op"),
"identity",
CallRes(operand :: Nil, Some(target)),
onPeer
)
)
def resolve(
currentPeerId: Option[ValueModel],
i: Int
): PartialFunction[RawTag, Res] = {
case SeqTag => leaf(SeqRes)
case _: OnTag => leaf(SeqRes)
case MatchMismatchTag(a, b, s) => leaf(MatchMismatchRes(a, b, s))
case ForTag(item, iter) => leaf(FoldRes(item, iter))
case ParTag | ParTag.Detach => leaf(ParRes)
case XorTag | XorTag.LeftBiased => leaf(XorRes)
case NextTag(item) => leaf(NextRes(item))
case PushToStreamTag(operand, exportTo) =>
operand.`type` match {
case StreamType(st) =>
val tmpName = s"push-to-stream-$i"
seq(
canon(
currentPeerId
.getOrElse(LiteralModel.initPeerId),
operand,
Call.Export(tmpName, ArrayType(st))
),
leaf(ApRes(VarModel(tmpName, ArrayType(st)), exportTo))
)
case _ =>
leaf(ApRes(operand, exportTo))
}
case CanonicalizeTag(operand, exportTo) =>
canon(
currentPeerId
.getOrElse(LiteralModel.initPeerId)
.getOrElse(LiteralModel.initPeerId),
operand,
exportTo
)
case CallServiceTag(serviceId, funcName, Call(args, exportTo)) =>
leaf(
CallServiceRes(
serviceId,
funcName,
CallRes(args, exportTo.headOption),
currentPeerId
.getOrElse(LiteralModel.initPeerId)
)
)
}
}

View File

@ -48,14 +48,21 @@ object Topology extends Logging {
def resolveOnMoves(op: Tree): Eval[Res] = {
val cursor = RawCursor(NonEmptyList.one(ChainZipper.one(op)))
// TODO: remove var
var i = 0
def nextI = {
i = i + 1
i
}
val resolvedCofree = cursor
.cata(wrap) { rc =>
logger.debug(s"<:> $rc")
val resolved = MakeRes
.resolve(rc.currentPeerId)
.lift
.apply(rc.tag)
.map(MakeRes.leaf)
val resolved =
MakeRes
.resolve(rc.currentPeerId, nextI)
.lift
.apply(rc.tag)
val chainZipperEv = resolved.traverse(cofree =>
Eval.later {
val cz = ChainZipper(
@ -73,6 +80,7 @@ object Topology extends Logging {
cz
}
)
OptionT[Eval, ChainZipper[Res]](chainZipperEv)
}

View File

@ -2,7 +2,7 @@ package aqua.semantics.expr
import aqua.model.ValueModel.varName
import aqua.model.func.Call
import aqua.model.func.raw.{ApTag, FuncOp, FuncOps}
import aqua.model.func.raw.{FuncOp, FuncOps, PushToStreamTag}
import aqua.model.{LiteralModel, Model, VarModel}
import aqua.parser.expr.PushToStreamExpr
import aqua.parser.lexer.Token
@ -53,25 +53,11 @@ class PushToStreamSem[F[_]](val expr: PushToStreamExpr[F]) extends AnyVal {
expr.value,
t,
vm.lastType
).flatMap {
).map {
case false =>
Free.pure[Alg, Model](Model.error("Stream type and element type does not match"))
Model.error("Stream type and element type does not match")
case true =>
vm.lastType match {
case StreamType(lt) =>
// https://github.com/fluencelabs/aqua/issues/277
// TODO: get Name from Value for the opaque name, as it points on Value, not on the stream
N.defineOpaque(expr.stream, ArrayType(lt)).map { n =>
val opaqueVar = VarModel(n.value, ArrayType(lt))
FuncOps.seq(
FuncOps.can(vm, Call.Export(opaqueVar.name, opaqueVar.lastType)),
FuncOps.ap(opaqueVar, Call.Export(expr.stream.value, t))
): Model
}
case _ =>
Free.pure[Alg, Model](FuncOps.ap(vm, Call.Export(expr.stream.value, t)): Model)
}
FuncOps.pushToStream(vm, Call.Export(expr.stream.value, t)): Model
}
}