feat(compiler): CRDT-maps implementation (#1142)

This commit is contained in:
Dima 2024-06-06 11:24:47 +07:00 committed by GitHub
parent dad8db54cf
commit 934c20c98a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
47 changed files with 1605 additions and 424 deletions

View File

@ -1,10 +1,128 @@
aqua Job declares *
aqua StreamMapTest declares *
use "declare"
export testGetFunc, testGetStreamFunc, testKeysFunc, testKeysStreamFunc
export testContainsFunc, testForFunc, testParSeqMap, testForTupleFunc
export timeout
import "builtin.aqua"
func timeout() -> AquaName.Worker:
w <- AquaName.getWorker()
a = w.host_id
<- w
func testGetFunc() -> []string, []string, []string, u32:
streamMap: %string
key = "key"
resEmpty = streamMap.get(key)
streamMap <<- key, "first value"
resFirst = streamMap.get(key)
streamMap <<- key, "second value"
resSecond = streamMap.get(key)
<- resEmpty, resFirst, resSecond, resSecond.length
func testGetStreamFunc() -> []string, string, string:
streamMap: %string
key = "key"
resEmptyStream = streamMap.getStream(key)
streamMap <<- key, "first value"
resFirstStream = streamMap.getStream(key)
streamMap <<- key, "second value"
resSecondStream = streamMap.getStream(key)
resFirst = resFirstStream[0]
resSecond = resSecondStream[1]
<- resEmptyStream, resFirst, resSecond
func testKeysFunc() -> []string, []string, []string:
streamMap: %string
resEmpty = streamMap.keys()
streamMap <<- "key one", ""
resFirst = streamMap.keys()
streamMap <<- "key two", ""
streamMap <<- "key one", ""
streamMap <<- "key one", "text"
resSecond = streamMap.keys()
<- resEmpty, resFirst, resSecond
func testKeysStreamFunc() -> []string, []string, []string:
streamMap: %string
resEmpty = streamMap.keysStream()
streamMap <<- "key one", ""
resFirst = streamMap.keysStream()
streamMap <<- "key one", "new"
streamMap <<- "key two", ""
resSecond = streamMap.keysStream()
<- resEmpty, resFirst, resSecond
func testContainsFunc() -> bool, bool, bool, bool, bool:
keys = ["key one", "key two"]
streamMap: %string
resFirst = streamMap.contains(keys[0])
streamMap <<- keys[0], ""
resSecond = streamMap.contains(keys[0])
resThird = streamMap.contains(keys[1])
streamMap <<- keys[0], "new"
streamMap <<- keys[1], ""
resFourth = streamMap.contains(keys[0])
resFifth = streamMap.contains(keys[1])
<- resFirst, resSecond, resThird, resFourth, resFifth
func testForFunc() -> []string, []string:
streamMap: %string
streamMap <<- "key one", "1"
streamMap <<- "key one", "2"
streamMap <<- "key two", "3"
streamMap <<- "key two", "4"
streamMap <<- "key two", "5"
streamMap <<- "key three", "6"
streamMap <<- "key four", "7"
streamKeys: *string
streamValues: *string
for kv <- streamMap:
streamKeys <<- kv.key
streamValues <<- kv.value
<- streamKeys, streamValues
func testParSeqMap(relay1: string, relay2: string, relay3: string) -> string:
relays = [relay1, relay2, relay3]
map: %u64
map2: %u64
parseq r <- relays on r:
map <<- "time", Peer.timestamp_ms()
for r <- relays par:
on r:
join map.get("time")[relays.length - 1]
map2 <<- "time", Peer.timestamp_ms()
join map2.get("time")[relays.length - 1]
<- "ok"
func testForTupleFunc() -> []string, []string, []string:
streamMap: %string
streamMap <<- "key one", "1"
streamMap <<- "key one", "2"
streamMap <<- "key two", "3"
streamMap <<- "key two", "4"
streamMap <<- "key two", "5"
streamMap <<- "key three", "6"
streamMap <<- "key four", "7"
streamFirst: *string
streamSecond: *string
streamThird: *string
for k, v <- streamMap:
streamFirst <<- k
streamSecond <<- v
for k, v <- streamMap:
streamFirst <<- v
streamSecond <<- k
for k, v <- streamMap:
streamThird <<- streamMap.get(k)!
<- streamFirst, streamSecond, streamThird

View File

@ -1,3 +1,5 @@
aqua Builtin declares *
-- Default public interface of Fluence nodes
alias Field : []string

View File

@ -3,7 +3,7 @@ package aqua.backend.air
import aqua.model.*
import aqua.raw.ops.Call
import aqua.res.*
import aqua.types.{ArrayType, CanonStreamType, StreamMapType, StreamType, Type}
import aqua.types.{ArrayType, CanonStreamMapType, CanonStreamType, StreamMapType, StreamType, Type}
import cats.Eval
import cats.data.Chain
import cats.free.Cofree
@ -29,7 +29,8 @@ object AirGen extends Logging {
def varNameToString(name: String, `type`: Type): String =
(`type` match {
case _: StreamType => "$" + name
case _: CanonStreamType => "#" + name
case _: CanonStreamType => "#$" + name
case _: CanonStreamMapType => "#%" + name
case _: StreamMapType => "%" + name
case _ => name
}).replace('.', '_')

View File

@ -4,27 +4,17 @@ export testParSeq
import "@fluencelabs/aqua-lib/builtin.aqua"
service NumOp("op"):
identity(n: u64) -> u64
data PeerRelay:
peer: string
relay: string
func testParSeq(peer1: string, peer2: string, peer3: string, relay1: string, relay2: string, relay3: string) -> string:
pr1 = PeerRelay(peer = peer1, relay = relay1)
pr2 = PeerRelay(peer = peer2, relay = relay2)
pr3 = PeerRelay(peer = peer3, relay = relay3)
peers = [pr1, pr2, pr3]
func testParSeq(relay1: string, relay2: string, relay3: string) -> string:
relays = [relay1, relay2, relay3]
stream: *u64
stream2: *u64
parseq p <- peers on p.peer via p.relay:
parseq r <- relays on r:
stream <- Peer.timestamp_ms()
for p <- peers par:
on p.peer via p.relay:
join stream[peers.length - 1]
for r <- relays par:
on r:
join stream[relays.length - 1]
stream2 <<- Peer.timestamp_ms()
join stream2[peers.length - 1]
join stream2[relays.length - 1]
<- "ok"

View File

@ -0,0 +1,130 @@
aqua StreamMapTest declares *
export testGetFunc, testGetStreamFunc, testKeysFunc, testKeysStreamFunc
export testContainsFunc, testForFunc, testParSeqMap, testForTupleFunc
import "@fluencelabs/aqua-lib/builtin.aqua"
func testGetFunc() -> []string, []string, []string, u32:
streamMap: %string
key = "key"
resEmpty = streamMap.get(key)
streamMap <<- key, "first value"
resFirst = streamMap.get(key)
streamMap <<- key, "second value"
resSecond = streamMap.get(key)
<- resEmpty, resFirst, resSecond, resSecond.length
func testGetStreamFunc() -> []string, string, string:
streamMap: %string
key = "key"
resEmptyStream = streamMap.getStream(key)
streamMap <<- key, "first value"
resFirstStream = streamMap.getStream(key)
streamMap <<- key, "second value"
resSecondStream = streamMap.getStream(key)
resFirst = resFirstStream[0]
resSecond = resSecondStream[1]
<- resEmptyStream, resFirst, resSecond
func testKeysFunc() -> []string, []string, []string:
streamMap: %string
resEmpty = streamMap.keys()
streamMap <<- "key one", ""
resFirst = streamMap.keys()
streamMap <<- "key two", ""
streamMap <<- "key one", ""
streamMap <<- "key one", "text"
resSecond = streamMap.keys()
<- resEmpty, resFirst, resSecond
func testKeysStreamFunc() -> []string, []string, []string:
streamMap: %string
resEmpty = streamMap.keysStream()
streamMap <<- "key one", ""
resFirst = streamMap.keysStream()
streamMap <<- "key one", "new"
streamMap <<- "key two", ""
resSecond = streamMap.keysStream()
join resFirst[0]
join resSecond[2]
<- resEmpty, resFirst, resSecond
func testContainsFunc() -> bool, bool, bool, bool, bool:
keys = ["key one", "key two"]
streamMap: %string
resFirst = streamMap.contains(keys[0])
streamMap <<- keys[0], ""
resSecond = streamMap.contains(keys[0])
resThird = streamMap.contains(keys[1])
streamMap <<- keys[0], "new"
streamMap <<- keys[1], ""
resFourth = streamMap.contains(keys[0])
resFifth = streamMap.contains(keys[1])
<- resFirst, resSecond, resThird, resFourth, resFifth
func testForFunc() -> []string, []string:
streamMap: %string
streamMap <<- "key one", "1"
streamMap <<- "key one", "2"
streamMap <<- "key two", "3"
streamMap <<- "key two", "4"
streamMap <<- "key two", "5"
streamMap <<- "key three", "6"
streamMap <<- "key four", "7"
streamKeys: *string
streamValues: *string
for kv <- streamMap:
streamKeys <<- kv.key
streamValues <<- kv.value
<- streamKeys, streamValues
func testParSeqMap(relay1: string, relay2: string, relay3: string) -> string:
relays = [relay1, relay2, relay3]
map: %u64
map2: %u64
parseq r <- relays on r:
map <<- "time", Peer.timestamp_ms()
for r <- relays par:
on r:
join map.getStream("time")[relays.length - 1]
map2 <<- "time", Peer.timestamp_ms()
join map2.getStream("time")[relays.length - 1]
<- "ok"
func testForTupleFunc() -> []string, []string, []string:
streamMap: %string
streamMap <<- "key one", "1"
streamMap <<- "key one", "2"
streamMap <<- "key two", "3"
streamMap <<- "key two", "4"
streamMap <<- "key two", "5"
streamMap <<- "key three", "6"
streamMap <<- "key four", "7"
streamFirst: *string
streamSecond: *string
streamThird: *string
for k, v <- streamMap:
streamFirst <<- k
streamSecond <<- v
for k, v <- streamMap:
streamFirst <<- v
streamSecond <<- k
for k, v <- streamMap:
streamThird <<- streamMap.get(k)!
<- streamFirst, streamSecond, streamThird

View File

@ -0,0 +1,41 @@
aqua StreamMapAbilities
export streamMapAbilityTest
ability Streams:
stream: *string
map: %string
ability Adds:
addToStream(s: string)
addToMap(k: string, v: string)
func addToStreamClosure(str: *string) -> string -> ():
cl = func (s: string):
str <<- s
<- cl
func addToMapClosure(str: %string) -> string, string -> ():
cl = func (k: string, v: string):
str <<- k, v
<- cl
func addTo{Streams}() -> Adds:
addStream = addToStreamClosure(Streams.stream)
addMap = addToMapClosure(Streams.map)
adds = Adds(addToStream = addStream, addToMap = addMap)
<- adds
func add{Adds}(s: string, k: string):
Adds.addToStream(s)
Adds.addToMap(k, k)
func streamMapAbilityTest() -> []string, []string:
stream: *string
map: %string
ab = Streams(stream = stream, map = map)
adds <- addTo{ab}()
add{adds}("one", "1")
add{adds}("two", "2")
add{adds}("three", "3")
<- stream, map.keys()

View File

@ -0,0 +1,50 @@
aqua StreamMapCapture
export testStreamMapCaptureSimple, testStreamMapCaptureReturn
-- SIMPLE
func useCaptureSimple(push: string, string -> ()):
push("two", "2")
func testStreamMapCaptureSimple() -> []string:
stream: %string
stream <<- "one", "1"
push = (s: string, n: string):
stream <<- s, n
useCaptureSimple(push)
push("three", "3")
<- stream.keys()
-- RETURN
func captureStream() -> (string, string -> []string):
stream: %string
stream <<- "one", "1"
capture = (s: string, n: string) -> []string:
stream <<- s, n
<- stream.keys()
capture("two", "2")
<- capture
func useCaptureReturn(capture: string, string -> []string):
capture("three", "3")
func rereturnCapture() -> (string, string -> []string):
capture <- captureStream()
useCaptureReturn(capture)
capture("four", "4")
<- capture
func testStreamMapCaptureReturn() -> []string:
on HOST_PEER_ID:
capture <- rereturnCapture()
<- capture("five", "5")

View File

@ -0,0 +1,14 @@
aqua StreamRestriction
export streamMapRes
func streamMapFold(arr: []string) -> %string:
res: %string
for n <- arr:
res <<- n, n
<- res
func streamMapRes(arr: []string) -> []string, []string:
res: %string
res2 <- streamMapFold(arr)
<- res.keys(), res2.keys()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,10 @@
import {
testParSeq
} from "../compiled/examples/parseq.js";
import { config } from "../config.js";
const relays = config.relays;
export async function testParSeqCall() {
return testParSeq(relays[3].peerId, relays[4].peerId, relays[5].peerId)
}

View File

@ -0,0 +1,7 @@
import {
streamMapAbilityTest
} from "../compiled/examples/streamMapAbilities.js";
export async function streamMapAbilityTestCall() {
return await streamMapAbilityTest();
}

View File

@ -0,0 +1,40 @@
import {
testGetFunc, testGetStreamFunc, testKeysFunc, testKeysStreamFunc, testContainsFunc,
testForFunc, testParSeqMap, testForTupleFunc
} from "../compiled/examples/streamMap.js";
import { config } from "../config.js";
const relays = config.relays;
export async function testGetFuncCall() {
return testGetFunc();
}
export async function testGetStreamFuncCall() {
return testGetStreamFunc();
}
export async function testKeysFuncCall() {
return testKeysFunc();
}
export async function testKeysStreamFuncCall() {
return testKeysStreamFunc();
}
export async function testContainsFuncCall() {
return testContainsFunc();
}
export async function testForFuncCall() {
return testForFunc();
}
export async function testForTupleFuncCall() {
return testForTupleFunc();
}
export async function testParSeqMapCall() {
return testParSeqMap(relays[3].peerId, relays[4].peerId, relays[5].peerId)
}

View File

@ -0,0 +1,12 @@
import {
testStreamMapCaptureSimple,
testStreamMapCaptureReturn,
} from "../compiled/examples/streamMapCapture.js";
export async function streamMapCaptureSimpleCall() {
return await testStreamMapCaptureSimple();
}
export async function streamMapCaptureReturnCall() {
return await testStreamMapCaptureReturn();
}

View File

@ -0,0 +1,5 @@
import { streamMapRes } from "../compiled/examples/streamMapRestriction.js";
export async function streamMapResCall(): Promise<any> {
return await streamMapRes(["a", "b", "c"]);
}

View File

@ -39,7 +39,7 @@ object ArrowInliner extends Logging {
private def getOutsideStreamNames[S: Exports]: State[S, Set[String]] =
Exports[S].exports
.map(exports =>
exports.collect { case (n, VarModel(_, StreamType(_), _)) =>
exports.collect { case (n, VarModel(_, _: StreamType, _)) =>
n
}.toSet
)

View File

@ -7,13 +7,18 @@ import aqua.model.inline.state.*
import aqua.model.inline.tag.*
import aqua.raw.ops.*
import aqua.raw.value.*
import aqua.types.{CanonStreamType, CollectionType, StreamType}
import aqua.types.{
CanonStreamMapType,
CanonStreamType,
MutableStreamType,
StreamMapType,
StreamType
}
import cats.data.{Chain, State, StateT}
import cats.instances.list.*
import cats.syntax.applicative.*
import cats.syntax.apply.*
import cats.syntax.bifunctor.*
import cats.syntax.functor.*
import cats.syntax.option.*
import cats.syntax.traverse.*
@ -32,7 +37,7 @@ object TagInliner extends Logging {
import aqua.model.inline.Inline.parDesugarPrefix
import RawValueInliner.{valueListToModel, valueToModel}
import RawValueInliner.valueToModel
/**
* Result of [[RawTag]] inlining
@ -167,11 +172,12 @@ object TagInliner extends Logging {
op: Option[OpModel.Tree]
): State[S, (ValueModel, Option[OpModel.Tree])] = {
vm match {
case ValueModel.Stream(v @ VarModel(n, _, l), StreamType(t)) =>
case ValueModel.MutableStream(v @ VarModel(n, _, l), mt: MutableStreamType) =>
val canonName = n + "_canon"
val canonType = mt.toCanon
for {
canonN <- Mangler[S].findAndForbidName(canonName)
canonV = VarModel(canonN, CanonStreamType(t), l)
canonV = VarModel(canonN, canonType, l)
canonOp = CanonicalizeModel(
v.copy(properties = Chain.empty),
CallModel.Export(canonV.name, canonV.baseType)
@ -206,8 +212,8 @@ object TagInliner extends Logging {
case TryTag =>
TryTagInliner.inlined
case ForTag(item, iterable, mode) =>
ForTagInliner(item, iterable, mode).inlined
case ForTag(item, iterable, mode, keyValue) =>
ForTagInliner(item, iterable, mode, keyValue).inlined
case PushToStreamTag(operand, exportTo) =>
(
@ -229,6 +235,24 @@ object TagInliner extends Logging {
)
}
case PushToMapTag(key, operand, exportTo) =>
(
valueToModel(key),
valueToModel(operand),
valueToModel(exportTo.toRaw)
).mapN {
case ((k, kp), (v, p), (VarModel(name, st: StreamMapType, Chain.nil), None)) =>
TagInlined.Single(
model = InsertKeyValueModel(k, v, name, st),
prefix = SeqModel.wrap(kp.toList ++ p.toList).some
)
case (_, _, (vm, prefix)) =>
internalError(
s"stream ($exportTo) resolved " +
s"to ($vm) with prefix ($prefix)"
)
}
case CanonicalizeTag(operand, exportTo) =>
valueToModel(operand).flatMap {
// pass literals as is
@ -339,7 +363,7 @@ object TagInliner extends Logging {
case DeclareStreamTag(value) =>
value match
case VarRaw(name, t: StreamType) =>
case VarRaw(name, t: MutableStreamType) =>
for {
_ <- Exports[S].resolved(name, VarModel(name, t))
} yield TagInlined.Empty()

View File

@ -355,6 +355,29 @@ object ApplyPropertiesRawInliner extends RawInliner[ApplyPropertyRaw] with Loggi
s"Unfolded stream ($vr) cannot be a literal"
)
}
case (
vr @ VarRaw(_, st @ StreamMapType(_)),
Some(iar @ IntoArrowRaw(arrowName, _, args), otherProperties)
) =>
unfold(vr).flatMap {
case (VarModel(nameVM, _, _), inl) =>
for {
argsInlined <- args.traverse(unfold(_)).map(_.unzip)
(argsVM, argsInline) = argsInlined
resultInlined <- ApplyStreamMapRawInliner(arrowName, nameVM, st, argsVM)
(resultVM, resultInline) = resultInlined
propsInlined <- unfoldProperties(
(argsInline :+ resultInline).combineAll,
resultVM,
otherProperties,
propertiesAllowed
)
} yield propsInlined
case _ =>
internalError(
s"Unfolded stream map ($vr) cannot be a literal"
)
}
case (_, _) =>
unfold(raw).flatMap {

View File

@ -0,0 +1,364 @@
package aqua.model.inline.raw
import aqua.errors.Errors.internalError
import aqua.mangler.ManglerState
import aqua.model.inline.Inline
import aqua.model.inline.RawValueInliner.unfold
import aqua.model.inline.state.*
import aqua.model.{SeqModel, *}
import aqua.raw.value.IntoArrowRaw
import aqua.types.StreamMapType.Func.*
import aqua.types.{
ArrayType,
CanonStreamMapType,
CanonStreamType,
DataType,
ScalarType,
StreamMapType,
StreamType,
StructType
}
import cats.Eval
import cats.data.{Chain, IndexedStateT, NonEmptyMap, State}
import cats.syntax.applicative.*
import cats.syntax.functor.*
import cats.syntax.monoid.*
import cats.syntax.option.*
object ApplyStreamMapRawInliner {
// make unique name that cannot be used in Aqua
private def mang[S: Mangler](name: String, suffix: String): State[S, String] =
Mangler[S].findAndForbidName(s"-${name}_$suffix")
private def intoField(vm: VarModel, field: String, errorMsg: String): VarModel =
vm.intoField(field).getOrElse(internalError(errorMsg))
// - fold a map in parallel
// - when a key matches index, push result to a stream
// - return the stream
private def getStreamModel(
mapName: String,
mapType: StreamMapType,
idxVar: ValueModel,
streamVar: VarModel,
iterName: String
): OpModel.Tree = {
val mapVar = VarModel(mapName, mapType)
val iter = VarModel(iterName, mapType.iterType("iterName_type"))
ParModel.wrap(
ForModel(iter.name, mapVar, ForModel.Mode.Never).wrap(
XorModel.wrap(
MatchMismatchModel(
intoField(iter, "key", "Unexpected. No 'key' field in 'getStream'"),
idxVar,
true
).wrap(
PushToStreamModel(
intoField(iter, "value", "Unexpected. No 'value' field in 'getStream'"),
CallModel.Export(streamVar)
).leaf
)
),
NextModel(iter.name).leaf
),
NullModel.leaf
)
}
// - create a stream
// - fold over the map in parallel
// - push keys to the stream
// - return stream
private def getKeysStreamModel(
mapName: String,
mapType: StreamMapType,
streamVar: VarModel,
iterName: String
): OpModel.Tree = {
val mapVar = VarModel(mapName, mapType)
val iter = VarModel(iterName, mapType.iterType("iterName_type"))
ParModel.wrap(
ForModel(iter.name, mapVar, ForModel.Mode.Never).wrap(
PushToStreamModel(
intoField(iter, "key", "Unexpected. No 'key' field in 'getKeysStream'"),
CallModel.Export(streamVar.name, streamVar.`type`)
).leaf,
NextModel(iter.name).leaf
),
NullModel.leaf
)
}
// - canonicalize the map
// - create a stream
// - fold over map and push all keys to the stream
// - canonicalize the stream
private def keysModel(
mapName: String,
mapType: StreamMapType,
streamName: String,
canonName: String,
iterName: String,
result: VarModel
): OpModel.Tree = {
val mapVar = VarModel(mapName, mapType)
val streamVar = VarModel(streamName, StreamType(ScalarType.string))
val iterType = mapType.iterType("iterName_type")
val canonMap = VarModel(canonName, CanonStreamMapType(iterType))
val iter = VarModel(iterName, iterType)
RestrictionModel(streamVar.name, streamVar.`type`).wrap(
CanonicalizeModel(mapVar, CallModel.Export(canonMap)).leaf,
ForModel(iter.name, canonMap).wrap(
PushToStreamModel(
intoField(iter, "key", "Unexpected. No 'key' field in 'keys'"),
CallModel.Export(streamVar)
).leaf,
NextModel(iter.name).leaf
),
CanonicalizeModel(streamVar, CallModel.Export(result)).leaf
)
}
// - canonicalize map
// - get a key as from JSON object
private def getModel(
mapName: String,
mapType: StreamMapType,
idxVar: ValueModel,
mapCanonName: String,
idxName: String
): (VarModel, OpModel.Tree) = {
val (idx, idxModel) = idxVar match {
case vm: VarModel =>
vm -> EmptyModel.leaf
case lm: LiteralModel =>
VarModel(idxName, ScalarType.string) -> FlattenModel(lm, idxName).leaf
}
val mapVar = VarModel(mapName, mapType)
val canonMap = VarModel(mapCanonName, CanonStreamMapType(mapType.element))
canonMap.withProperty(IntoIndexModel(idx.name, ArrayType(mapType.element))) -> SeqModel.wrap(
CanonicalizeModel(mapVar, CallModel.Export(canonMap.name, canonMap.`type`)).leaf,
idxModel
)
}
// - get elements by key
// - check if an array is empty
// - return this check
private def containsModel(
mapName: String,
mapType: StreamMapType,
keyVar: ValueModel,
arrayName: String,
resultName: String,
mapCanonName: String,
idxName: String
): OpModel.Tree = {
val (result, getElementTree) =
getModel(mapName, mapType, keyVar, mapCanonName, idxName)
val arrayVar = VarModel(arrayName, ArrayType(mapType.element))
SeqModel.wrap(
getElementTree,
// flatten canonicalized map
FlattenModel(result, arrayName).leaf,
XorModel.wrap(
MatchMismatchModel(
arrayVar.withProperty(FunctorModel("length", ScalarType.u32)),
LiteralModel.number(0),
true
).wrap(
FlattenModel(LiteralModel.bool(false), resultName).leaf
),
FlattenModel(LiteralModel.bool(true), resultName).leaf
)
)
}
private def contains[S: Mangler: Exports: Arrows: Config](
mapName: String,
mapType: StreamMapType,
keyVar: ValueModel
): State[S, (VarModel, Inline)] = {
for {
arrayName <- mang(mapName, "array")
resultName <- mang(mapName, "contains_result")
canonName <- mang(mapName, "canon")
idxName <- mang(mapName, "idx")
} yield {
val result = VarModel(resultName, ScalarType.bool)
val getKeysTree = containsModel(
mapName = mapName,
mapType = mapType,
keyVar = keyVar,
arrayName = arrayName,
mapCanonName = canonName,
resultName = resultName,
idxName = idxName
)
val inline = Inline(predo = Chain.one(getKeysTree))
(result, inline)
}
}
private def getKeys[S: Mangler: Exports: Arrows: Config](
mapName: String,
mapType: StreamMapType
): State[S, (VarModel, Inline)] = {
for {
streamName <- mang(mapName, "stream")
iterName <- mang(mapName, "iter")
resultName <- mang(mapName, "keys_result")
canonName <- mang(mapName, "canon")
result = VarModel(resultName, CanonStreamType(ScalarType.string))
} yield {
val getKeysTree = keysModel(
mapName = mapName,
mapType = mapType,
streamName = streamName,
iterName = iterName,
canonName = canonName,
result = result
)
val inline = Inline(predo = Chain.one(getKeysTree))
(result, inline)
}
}
private def getKeysStream[S: Mangler: Exports: Arrows: Config](
mapName: String,
mapType: StreamMapType
): State[S, (VarModel, Inline)] = {
for {
streamName <- mang(mapName, "keys_stream_result")
iterName <- mang(mapName, "iter")
streamVar = VarModel(streamName, StreamType(ArrayType(mapType.element)))
// add resulted stream to restrict it after inlining
_ <- Exports[S].resolved(streamName, streamVar)
} yield {
val getKeysTree = getKeysStreamModel(
mapName = mapName,
mapType = mapType,
streamVar = streamVar,
iterName = iterName
)
val inline = Inline(predo = Chain.one(getKeysTree))
(streamVar, inline)
}
}
private def getStream[S: Mangler: Exports: Arrows: Config](
mapName: String,
mapType: StreamMapType,
idxVar: ValueModel
): State[S, (VarModel, Inline)] = {
for {
streamName <- mang(mapName, "get_stream_result")
iterName <- mang(mapName, "iter")
value = VarModel(
streamName,
StreamType(mapType.element)
)
// add resulted stream to restrict it after inlining
_ <- Exports[S].resolved(streamName, value)
} yield {
val getStreamResultTree = getStreamModel(
mapName = mapName,
mapType = mapType,
idxVar = idxVar,
streamVar = value,
iterName = iterName
)
val inline = Inline(predo = Chain.one(getStreamResultTree))
(value, inline)
}
}
private def get[S: Mangler: Exports: Arrows: Config](
mapName: String,
mapType: StreamMapType,
idxVar: ValueModel
): State[S, (VarModel, Inline)] = {
for {
canonMapName <- mang(mapName, "canon")
idxName <- mang(mapName, "idx")
} yield {
val (result, getResultTree) = getModel(
mapName,
mapType,
idxVar,
canonMapName,
idxName
)
val inline = Inline(predo = Chain.one(getResultTree))
(result, inline)
}
}
private def flatProps[S: Mangler](
arg: ValueModel,
op: ValueModel => State[S, (VarModel, Inline)]
): State[S, (VarModel, Inline)] =
(arg match {
case vm @ VarModel(name, _, properties) if properties.nonEmpty =>
mang(name, "ap").map { n =>
VarModel(n, vm.`type`) -> Some(FlattenModel(vm, n).leaf)
}
case a => State.pure(a -> None)
}).flatMap { case (v, inl) =>
op(v).map { case (res, resInl) =>
res -> resInl.prepend(inl)
}
}
def apply[S: Mangler: Exports: Arrows: Config](
funcName: String,
mapName: String,
mapType: StreamMapType,
args: List[ValueModel]
): State[S, (VarModel, Inline)] =
StreamMapType
.funcByString(funcName)
.tupleRight(args)
.map {
// flat properties for Get and Contains, because argument uses in IntoIndexModel
case (Get, arg :: Nil) =>
flatProps(arg, get(mapName, mapType, _))
case (Contains, arg :: Nil) =>
flatProps(arg, contains(mapName, mapType, _))
case (GetStream, arg :: Nil) =>
getStream(mapName, mapType, arg)
case (Keys, Nil) =>
getKeys(mapName, mapType)
case (KeysStream, Nil) =>
getKeysStream(mapName, mapType)
case (n, args) =>
internalError(
s"StreamMap '$mapName' has wrong arguments '$args' for function '$n'"
)
}
.getOrElse {
internalError(
s"StreamMap '$mapName' doesn't support function '$funcName''"
)
}
}

View File

@ -9,7 +9,6 @@ import aqua.types.{ArrayType, CanonStreamType, ScalarType, StreamType}
import cats.data.Chain
import cats.data.State
import cats.instances.stream
import cats.syntax.applicative.*
import cats.syntax.monoid.*
import cats.syntax.option.*

View File

@ -1,10 +1,8 @@
package aqua.model.inline.state
import aqua.model.ValueModel.{Ability, Stream}
import aqua.model.ValueModel.{Ability, MutableStream, Stream}
import aqua.model.{LiteralModel, ValueModel, VarModel}
import aqua.types.StreamType
import aqua.types.{AbilityType, GeneralAbilityType, NamedType}
import aqua.types.{AbilityType, GeneralAbilityType, MutableStreamType, NamedType, StreamType}
import cats.data.{NonEmptyList, State}
import cats.syntax.apply.*
import cats.syntax.traverse.*
@ -78,9 +76,9 @@ trait Exports[S] extends Scoped[S] {
def deleteStreams(names: Set[String]): State[S, Unit]
def streams: State[S, Map[String, StreamType]]
def streams: State[S, Map[String, MutableStreamType]]
def streamScope[T](inside: State[S, T]): State[S, (T, Map[String, StreamType])] =
def streamScope[T](inside: State[S, T]): State[S, (T, Map[String, MutableStreamType])] =
for {
streamsBefore <- streams
tree <- inside
@ -103,7 +101,7 @@ trait Exports[S] extends Scoped[S] {
override def resolved(exports: Map[String, ValueModel]): State[R, Unit] =
self.resolved(exports).transformS(f, g)
override def streams: State[R, Map[String, StreamType]] =
override def streams: State[R, Map[String, MutableStreamType]] =
self.streams.transformS(f, g)
override def deleteStreams(names: Set[String]): State[R, Unit] =
@ -186,7 +184,7 @@ object Exports {
*/
case class ExportsState(
values: Map[String, ValueModel] = Map.empty,
streams: Map[String, StreamType] = Map.empty
streams: Map[String, MutableStreamType] = Map.empty
)
object Simple extends Exports[ExportsState] {
@ -220,7 +218,7 @@ object Exports {
case Ability(vm, at) if vm.properties.isEmpty =>
val pairs = getAbilityPairs(vm.name, exportName, at, state.values)
state.copy(values = state.values ++ pairs.toList.toMap + (exportName -> value))
case Stream(VarModel(streamName, _, _), st) if exportName == streamName =>
case MutableStream(VarModel(streamName, _, _), st) if exportName == streamName =>
state.copy(
values = state.values + (exportName -> value),
streams = state.streams + (exportName -> st)
@ -237,7 +235,7 @@ object Exports {
override def resolved(exports: Map[String, ValueModel]): State[ExportsState, Unit] =
State.modify(st => st.copy(values = st.values ++ exports))
override def streams: State[ExportsState, Map[String, StreamType]] =
override def streams: State[ExportsState, Map[String, MutableStreamType]] =
State.get.map(_.streams)
override def deleteStreams(names: Set[String]): State[ExportsState, Unit] =

View File

@ -9,22 +9,24 @@ import aqua.model.inline.RawValueInliner.valueToModel
import aqua.model.inline.TagInliner.TagInlined
import aqua.model.inline.TagInliner.flat
import aqua.model.inline.state.*
import aqua.raw.ops.ForTag
import aqua.raw.ops.{ForKeyValue, ForTag}
import aqua.raw.value.ValueRaw
import aqua.types.CollectionType
import aqua.types.StreamType
import aqua.types.{CollectionType, ScalarType, StreamType}
import cats.Eval
import cats.data.Reader
import cats.data.{Chain, State}
import cats.syntax.applicative.*
import cats.syntax.apply.*
import cats.syntax.flatMap.*
import cats.syntax.traverse.*
final case class ForTagInliner(
item: String,
iterable: ValueRaw,
mode: ForTag.Mode
) {
item: String,
iterable: ValueRaw,
mode: ForTag.Mode,
keyValue: Option[ForKeyValue]
) {
def inlined[S: Mangler: Exports: Arrows: Config]: State[S, TagInlined[S]] = for {
vp <- valueToModel(iterable)
@ -41,14 +43,32 @@ final case class ForTagInliner(
s"non-box type variable '$iterable' in 'for' expression."
)
}
_ <- Exports[S].resolved(item, VarModel(n, elementType))
itemVar = VarModel(n, elementType)
_ <- Exports[S].resolved(item, itemVar)
pref <- keyValue.traverse(kv =>
for {
keyName <- Mangler[S].findAndForbidName(kv.key)
_ <- Exports[S].resolved(kv.key, VarModel(keyName, ScalarType.string))
valueName <- Mangler[S].findAndForbidName(kv.value)
_ <- Exports[S].resolved(kv.value, VarModel(valueName, elementType))
} yield SeqModel.wrap(
FlattenModel(
itemVar.withProperty(IntoFieldModel("key", ScalarType.string)),
keyName
).leaf,
FlattenModel(
itemVar.withProperty(IntoFieldModel("value", elementType)),
valueName
).leaf
)
)
modeModel = mode match {
case ForTag.Mode.SeqMode | ForTag.Mode.TryMode => ForModel.Mode.Null
case ForTag.Mode.ParMode | ForTag.Mode.RecMode => ForModel.Mode.Never
}
model = ForModel(n, v, modeModel)
} yield TagInlined.Around(
model = StreamRestrictions.restrictStreams(model.wrap),
model = StreamRestrictions.restrictStreams(ss => model.wrap(Chain.fromOption(pref) ++ ss)),
prefix = p
)
}

View File

@ -159,17 +159,23 @@ case class NextTag(item: String) extends RawTag {
override def mapValues(f: ValueRaw => ValueRaw): RawTag = this
}
case class ForTag(item: String, iterable: ValueRaw, mode: ForTag.Mode) extends SeqGroupTag {
case class ForKeyValue(key: String, value: String) {
def toSet: Set[String] = Set(key, value)
override def restrictsVarNames: Set[String] = Set(item)
def rename(map: Map[String, String]): ForKeyValue = copy(key = map.getOrElse(key, key), value = map.getOrElse(value, value))
}
case class ForTag(item: String, iterable: ValueRaw, mode: ForTag.Mode, keyValue: Option[ForKeyValue] = None) extends SeqGroupTag {
override def restrictsVarNames: Set[String] = Set(item) ++ keyValue.toSet.flatMap(_.toSet)
override def usesVarNames: Set[String] = iterable.varNames
override def mapValues(f: ValueRaw => ValueRaw): RawTag =
ForTag(item, iterable.map(f), mode)
ForTag(item, iterable.map(f), mode, keyValue)
override def renameExports(map: Map[String, String]): RawTag =
copy(item = map.getOrElse(item, item))
copy(item = map.getOrElse(item, item), keyValue = keyValue.map(_.rename(map)))
}
object ForTag {
@ -187,11 +193,11 @@ object ForTag {
case ParMode, SeqMode, TryMode, RecMode
}
def par(item: String, iterable: ValueRaw): ForTag =
ForTag(item, iterable, Mode.ParMode)
def par(item: String, iterable: ValueRaw, keyValue: Option[ForKeyValue] = None): ForTag =
ForTag(item, iterable, Mode.ParMode, keyValue)
def seq(item: String, iterable: ValueRaw): ForTag =
ForTag(item, iterable, Mode.SeqMode)
def seq(item: String, iterable: ValueRaw, keyValue: Option[ForKeyValue] = None): ForTag =
ForTag(item, iterable, Mode.SeqMode, keyValue)
}
case class CallArrowRawTag(
@ -376,6 +382,21 @@ case class PushToStreamTag(operand: ValueRaw, exportTo: Call.Export) extends Raw
override def toString: String = s"(push $operand $exportTo)"
}
case class PushToMapTag(key: ValueRaw, operand: ValueRaw, exportTo: Call.Export) extends RawTag {
override def exportsVarNames: Set[String] = Set.empty
override def usesVarNames: Set[String] = key.varNames ++ operand.varNames + exportTo.name
override def mapValues(f: ValueRaw => ValueRaw): RawTag =
PushToMapTag(key.map(f), operand.map(f), exportTo)
override def renameExports(map: Map[String, String]): RawTag =
copy(exportTo = exportTo.mapName(n => map.getOrElse(n, n)))
override def toString: String = s"(pushmap ($key $operand) $exportTo)"
}
case class FlattenTag(operand: ValueRaw, assignTo: String) extends RawTag {
override def exportsVarNames: Set[String] = Set(assignTo)

View File

@ -1,6 +1,6 @@
package aqua.model
import aqua.model.ValueModel.{Ability, Stream}
import aqua.model.ValueModel.{Ability, MutableStream}
import aqua.raw.ops.Call
import aqua.raw.value.VarRaw
import aqua.types.*
@ -65,7 +65,7 @@ case class ArgsCall(args: ProductType, callWith: List[ValueModel]) {
* definition does not matter.
*/
lazy val streamArgs: Map[String, VarModel] =
zipped.collect { case ((name, _: MutableStreamType), Stream(vr, _)) =>
zipped.collect { case ((name, _: MutableStreamType), MutableStream(vr, _)) =>
name -> vr
}.toMap
@ -82,13 +82,13 @@ case class ArgsCall(args: ProductType, callWith: List[ValueModel]) {
*/
lazy val streamToImmutableArgs: Map[String, VarModel] =
zipped.collect {
case ((name, _: ImmutableCollectionType), vr @ VarModel(_, StreamType(_), _)) =>
case ((name, _: ImmutableCollectionType), vr@MutableStream(_)) =>
name -> vr
}.toMap
lazy val streamToImmutableArgsWithTypes: Map[String, (VarModel, StreamType)] =
lazy val streamToImmutableArgsWithTypes: Map[String, (VarModel, MutableStreamType)] =
zipped.collect {
case ((name, _: ImmutableCollectionType), vr@Stream(_, t)) =>
case ((name, _: ImmutableCollectionType), vr@MutableStream(_, t)) =>
name -> (vr, t)
}.toMap

View File

@ -27,5 +27,9 @@ object CallModel {
override def toString: String = s"$name:${`type`}"
}
object Export {
def apply(vm: VarModel): Export = Export(vm.name, vm.`type`)
}
def callExport(ex: Call.Export): Export = Export(ex.name, ex.`type`)
}

View File

@ -78,7 +78,17 @@ object ValueModel {
def unapply(vm: VarModel): Option[(VarModel, StreamType)] =
vm match {
case vm @ VarModel(_, t: StreamType, _) =>
case vm@VarModel(_, t: StreamType, _) =>
(vm, t).some
case _ => none
}
}
object MutableStream {
def unapply(vm: VarModel): Option[(VarModel, MutableStreamType)] =
vm match {
case vm @ VarModel(_, t: MutableStreamType, _) =>
(vm, t).some
case _ => none
}

View File

@ -20,7 +20,7 @@ final case class Tracing(
children.headOption
.filter(_ => children.length == 1)
.getOrElse(
SeqModel.wrap(children.toList: _*)
SeqModel.wrap(children.toList*)
)
override def folder: OpTransform.OpFolder = {

View File

@ -2,30 +2,38 @@ package aqua.parser.expr.func
import aqua.parser.Expr
import aqua.parser.expr.*
import aqua.parser.expr.func.ForExpr.NameOrPair
import aqua.parser.lexer.Token.*
import aqua.parser.lexer.{Name, ValueToken}
import aqua.parser.lift.LiftParser
import aqua.parser.lift.LiftParser.*
import aqua.parser.lift.Span
import aqua.parser.lift.Span.{given, *}
import aqua.parser.lift.{LiftParser, Span}
import cats.parse.Parser as P
import cats.syntax.comonad.*
import cats.syntax.either.*
import cats.{Comonad, ~>}
case class ForExpr[F[_]](
item: Name[F],
item: NameOrPair[F],
iterable: ValueToken[F],
mode: Option[ForExpr.Mode]
) extends Expr[F](ForExpr, item) {
) extends Expr[F](ForExpr, iterable) {
override def mapK[K[_]: Comonad](fk: F ~> K): ForExpr[K] =
copy(item.mapK(fk), iterable.mapK(fk))
copy(item.bimap(p => (p._1.mapK(fk), p._2.mapK(fk)), v => v.mapK(fk)), iterable.mapK(fk))
}
object ForExpr extends Expr.AndIndented {
enum Mode { case ParMode, TryMode, RecMode }
type NameOrPair[S[_]] = Either[(Name[S], Name[S]), Name[S]]
private val pair: P[(Name[S], Name[S])] =
(Name.p <* (` `.? ~ `,` ~ ` `.?)) ~ Name.p
val nameOrPair: P[NameOrPair[S]] = pair.backtrack.eitherOr(Name.p).map(_.swap)
override def validChildren: List[Expr.Lexem] = ArrowExpr.funcChildren
private lazy val modeP: P[Mode] =
@ -36,7 +44,7 @@ object ForExpr extends Expr.AndIndented {
).lift).map(_.extract)
override def p: P[ForExpr[Span.S]] =
((`for` *> ` ` *> Name.p <* ` <- `) ~ ValueToken.`value` ~ modeP.?).map {
((`for` *> ` ` *> nameOrPair <* ` <- `) ~ ValueToken.`value` ~ modeP.?).map {
case ((item, iterable), mode) =>
ForExpr(item, iterable, mode)
}

View File

@ -11,24 +11,31 @@ import aqua.parser.lift.Span.{given, *}
import cats.parse.Parser as P
import cats.syntax.comonad.*
import cats.syntax.either.*
import cats.{Comonad, ~>}
case class ParSeqExpr[F[_]](
item: Name[F],
item: ForExpr.NameOrPair[F],
iterable: ValueToken[F],
peerId: ValueToken[F],
via: List[ValueToken[F]]
) extends Expr[F](ParSeqExpr, item) {
) extends Expr[F](ParSeqExpr, iterable) {
override def mapK[K[_]: Comonad](fk: F ~> K): ParSeqExpr[K] =
copy(item.mapK(fk), iterable.mapK(fk), peerId.mapK(fk), via.map(_.mapK(fk)))
copy(
item.bimap(p => (p._1.mapK(fk), p._2.mapK(fk)), v => v.mapK(fk)),
iterable.mapK(fk),
peerId.mapK(fk),
via.map(_.mapK(fk))
)
}
object ParSeqExpr extends Expr.AndIndented {
override def validChildren: List[Expr.Lexem] = ArrowExpr.funcChildren
private lazy val parseqPart = (`parseq` *> ` ` *> Name.p <* ` <- `) ~ ValueToken.`value`
private lazy val parseqPart =
(`parseq` *> ` ` *> ForExpr.nameOrPair <* ` <- `) ~ ValueToken.`value`
private lazy val onPart =
`on` *> ` ` *> ValueToken.`value` ~ (` ` *> `via` *> ` ` *> ValueToken.`value`).rep0

View File

@ -2,28 +2,39 @@ package aqua.parser.expr.func
import aqua.parser.Expr
import aqua.parser.expr.func.PushToStreamExpr
import aqua.parser.expr.func.PushToStreamExpr.ValueOrPair
import aqua.parser.lexer.Token.*
import aqua.parser.lexer.{Name, ValueToken}
import aqua.parser.lift.LiftParser
import aqua.parser.lift.Span
import aqua.parser.lexer.{Name, Token, ValueToken}
import aqua.parser.lift.Span.{given, *}
import aqua.parser.lift.{LiftParser, Span}
import cats.parse.Parser as P
import cats.syntax.either.*
import cats.{Comonad, ~>}
case class PushToStreamExpr[F[_]](
stream: Name[F],
value: ValueToken[F]
value: ValueOrPair[F]
) extends Expr[F](PushToStreamExpr, stream) {
override def mapK[K[_]: Comonad](fk: F ~> K): PushToStreamExpr[K] =
copy(stream.mapK(fk), value.mapK(fk))
copy(stream.mapK(fk), value.bimap(p => (p._1.mapK(fk), p._2.mapK(fk)), v => v.mapK(fk)))
}
object PushToStreamExpr extends Expr.Leaf {
type ValueOrPair[S[_]] = Either[(ValueToken[S], ValueToken[S]), ValueToken[S]]
private val valueOrPair: P[ValueOrPair[S]] =
P.repSep(ValueToken.`value` <* ` `.?, 1, 2, `,`).map { l =>
l.tail match {
case r :: _ => Left(l.head -> r)
case _ => Right(l.head)
}
}
override val p: P[PushToStreamExpr[Span.S]] =
((Name.p <* ` <<- `).with1 ~ ValueToken.`value`).map { case (variable, value) =>
((Name.p <* ` <<- `).with1 ~ valueOrPair).map { case (variable, value) =>
PushToStreamExpr(variable, value)
}
}

View File

@ -54,6 +54,20 @@ object StreamTypeToken {
}
case class StreamMapTypeToken[S[_] : Comonad](override val unit: S[Unit], data: BasicTypeToken[S])
extends BasicTypeToken[S] {
override def as[T](v: T): S[T] = unit.as(v)
override def mapK[K[_] : Comonad](fk: S ~> K): StreamMapTypeToken[K] = copy(fk(unit), data.mapK(fk))
}
object StreamMapTypeToken {
val `streammaptypedef`: P[StreamMapTypeToken[Span.S]] =
(`%`.lift ~ BasicTypeToken.`compositetypedef`).map(ud => StreamMapTypeToken(ud._1, ud._2))
}
case class OptionTypeToken[F[_]: Comonad](override val unit: F[Unit], data: BasicTypeToken[F])
extends BasicTypeToken[F] {
override def as[T](v: T): F[T] = unit.as(v)
@ -176,6 +190,7 @@ object BasicTypeToken {
P.defer(`topbottomdef`) ::
P.defer(ArrayTypeToken.`arraytypedef`) ::
P.defer(StreamTypeToken.`streamtypedef`) ::
P.defer(StreamMapTypeToken.`streammaptypedef`) ::
P.defer(OptionTypeToken.`optiontypedef`) ::
ScalarTypeToken.`scalartypedef` ::
NamedTypeToken.dotted :: Nil

View File

@ -23,6 +23,8 @@ sealed trait ValueToken[F[_]] extends Token[F] {
def mapK[K[_]: Comonad](fk: F ~> K): ValueToken[K]
}
case class PropertyToken[F[_]: Comonad](
value: ValueToken[F],
properties: NonEmptyList[PropertyOp[F]]

View File

@ -84,7 +84,7 @@ class CoExprSpec extends AnyFlatSpec with Matchers with Inside with AquaSpec {
_ should be(
co(
ForExpr(
toName("w"),
Right(toName("w")),
CallArrowToken(toName("getWorkers"), Nil),
None
)

View File

@ -15,27 +15,27 @@ class ForExprSpec extends AnyFlatSpec with Matchers with AquaSpec {
mode: Option[ForExpr.Mode]
): Unit = {
parseFor(s"for some <- 1$modeStr") should be(
ForExpr[Id]("some", toNumber(1), mode)
ForExpr[Id](Right("some"), toNumber(1), mode)
)
parseFor(s"for some <- false$modeStr") should be(
ForExpr[Id]("some", toBool(false), mode)
ForExpr[Id](Right("some"), toBool(false), mode)
)
parseFor(s"for some <- \"a\"$modeStr") should be(
ForExpr[Id]("some", toStr("a"), mode)
ForExpr[Id](Right("some"), toStr("a"), mode)
)
parseFor(s"for i <- []$modeStr") should be(
ForExpr[Id]("i", toArr(Nil), mode)
ForExpr[Id](Right("i"), toArr(Nil), mode)
)
parseFor(s"for i <- [1, 2, 3]$modeStr") should be(
ForExpr[Id]("i", toArr(List(toNumber(1), toNumber(2), toNumber(3))), mode)
ForExpr[Id](Right("i"), toArr(List(toNumber(1), toNumber(2), toNumber(3))), mode)
)
parseFor(s"for i <- stream$modeStr") should be(
ForExpr[Id]("i", toVar("stream"), mode)
ForExpr[Id](Right("i"), toVar("stream"), mode)
)
}

View File

@ -84,7 +84,7 @@ class ParExprSpec extends AnyFlatSpec with Matchers with Inside with AquaSpec {
_ should be(
par(
ForExpr(
toName("w"),
Right(toName("w")),
CallArrowToken(toName("getWorkers"), Nil),
None
)

View File

@ -12,16 +12,16 @@ class ParSeqExprSpec extends AnyFlatSpec with Matchers with AquaSpec {
"parseq" should "be parsed" in {
parseParSeq("parseq s <- strings on \"peerId\"") should be(
ParSeqExpr[Id](toName("s"), toVar("strings"), toStr("peerId"), Nil)
ParSeqExpr[Id](Right(toName("s")), toVar("strings"), toStr("peerId"), Nil)
)
parseParSeq("parseq s <- strings on \"peerId\" via \"relay\"") should be(
ParSeqExpr[Id](toName("s"), toVar("strings"), toStr("peerId"), toStr("relay") :: Nil)
ParSeqExpr[Id](Right(toName("s")), toVar("strings"), toStr("peerId"), toStr("relay") :: Nil)
)
parseParSeq("parseq s <- strings on \"peerId\" via \"relay\" via \"relay2\"") should be(
ParSeqExpr[Id](
toName("s"),
Right(toName("s")),
toVar("strings"),
toStr("peerId"),
toStr("relay") :: toStr("relay2") :: Nil
@ -29,7 +29,11 @@ class ParSeqExprSpec extends AnyFlatSpec with Matchers with AquaSpec {
)
parseParSeq("parseq s <- strings on peerId via relay") should be(
ParSeqExpr[Id](toName("s"), toVar("strings"), toVar("peerId"), toVar("relay") :: Nil)
ParSeqExpr[Id](Right(toName("s")), toVar("strings"), toVar("peerId"), toVar("relay") :: Nil)
)
parseParSeq("parseq s, v <- strings on peerId via relay") should be(
ParSeqExpr[Id](Left((toName("s"), toName("v"))), toVar("strings"), toVar("peerId"), toVar("relay") :: Nil)
)
}

View File

@ -11,11 +11,21 @@ class PushToStreamExprSpec extends AnyFlatSpec with Matchers with AquaSpec {
"assign" should "be parsed" in {
parsePush("a <<- \"b\"") should be(
PushToStreamExpr[Id]("a", toStr("b"))
PushToStreamExpr[Id]("a", Right(toStr("b")))
)
parsePush("a <<- b") should be(
PushToStreamExpr[Id]("a", toVar("b"))
PushToStreamExpr[Id]("a", Right(toVar("b")))
)
}
"assign with tuple" should "be parsed" in {
parsePush("a <<- \"b\", \"c\"") should be(
PushToStreamExpr[Id]("a", Left((toStr("b"), toStr("c"))))
)
parsePush("a <<- b, c") should be(
PushToStreamExpr[Id]("a", Left((toVar("b"), toVar("c"))))
)
}
}

View File

@ -9,6 +9,7 @@ import aqua.raw.value.ValueRaw
import aqua.semantics.Prog
import aqua.semantics.rules.ValuesAlgebra
import aqua.semantics.rules.abilities.AbilitiesAlgebra
import aqua.semantics.rules.mangler.ManglerAlgebra
import aqua.semantics.rules.names.NamesAlgebra
import aqua.semantics.rules.types.TypesAlgebra
import aqua.types.*
@ -26,8 +27,9 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal {
def program[F[_]: Monad](using
V: ValuesAlgebra[S, F],
N: NamesAlgebra[S, F],
T: TypesAlgebra[S, F],
A: AbilitiesAlgebra[S, F]
A: AbilitiesAlgebra[S, F],
M: ManglerAlgebra[F],
T: TypesAlgebra[S, F]
): Prog[F, Raw] =
Prog
.around(
@ -50,10 +52,12 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal {
case ForTag.Mode.RecMode => ParTag
}
val forTag = ForTag(expr.item.value, vm, mode).wrap(
val (item, pair) = ForSem.itemOrPair(expr.item)
val forTag = ForTag(item, vm, mode, pair).wrap(
innerTag.wrap(
op,
NextTag(expr.item.value).leaf
NextTag(item).leaf
)
)
@ -75,18 +79,40 @@ class ForSem[S[_]](val expr: ForExpr[S]) extends AnyVal {
object ForSem {
def itemOrPair[S[_]](nameOrPair: ForExpr.NameOrPair[S]): (String, Option[ForKeyValue]) =
nameOrPair match {
case Right(v) => (v.value, None)
case Left(k, v) => ("-iterable-", ForKeyValue(k.value, v.value).some)
}
def beforeFor[S[_], F[_]: Monad](
item: Name[S],
item: ForExpr.NameOrPair[S],
iterable: ValueToken[S]
)(using
V: ValuesAlgebra[S, F],
N: NamesAlgebra[S, F],
M: ManglerAlgebra[F],
T: TypesAlgebra[S, F]
): F[Option[ValueRaw]] = (for {
value <- V.valueToIterable(iterable)
(raw, typ) = value
_ <- OptionT.liftF(
N.define(item, typ.element)
)
} yield raw).value
res <- (typ, item) match {
case (smt: StreamMapType, Left(key, value)) =>
OptionT.liftF(for {
_ <- N.define(key, ScalarType.string)
_ <- N.define(value, smt.element)
} yield raw)
case (smt: StreamMapType, Right(it)) =>
val typeName = s"KVPair(${smt.element})"
OptionT.liftF(for {
newTypeName <- M.rename(typeName)
iterType = smt.iterType(newTypeName)
_ <- N.define(it, iterType)
} yield raw)
case (_, Left(_, _)) =>
OptionT(T.ensureTypeMatches(iterable, StreamMapType(TopType), typ).as(None))
case (_, Right(it)) =>
OptionT.liftF(N.define(it, typ.element).as(raw))
}
} yield res).value
}

View File

@ -8,6 +8,7 @@ import aqua.raw.value.ValueRaw
import aqua.semantics.Prog
import aqua.semantics.rules.ValuesAlgebra
import aqua.semantics.rules.abilities.AbilitiesAlgebra
import aqua.semantics.rules.mangler.ManglerAlgebra
import aqua.semantics.rules.names.NamesAlgebra
import aqua.semantics.rules.types.TypesAlgebra
import aqua.types.{ArrayType, CollectionType, StreamType}
@ -26,7 +27,8 @@ class ParSeqSem[S[_]](val expr: ParSeqExpr[S]) extends AnyVal {
V: ValuesAlgebra[S, F],
N: NamesAlgebra[S, F],
T: TypesAlgebra[S, F],
A: AbilitiesAlgebra[S, F]
A: AbilitiesAlgebra[S, F],
M: ManglerAlgebra[F]
): Prog[F, Raw] =
Prog
.around(
@ -49,10 +51,7 @@ class ParSeqSem[S[_]](val expr: ParSeqExpr[S]) extends AnyVal {
viaVM: List[ValueRaw],
ops: Raw
)(using
V: ValuesAlgebra[S, F],
N: NamesAlgebra[S, F],
T: TypesAlgebra[S, F],
A: AbilitiesAlgebra[S, F]
V: ValuesAlgebra[S, F]
): F[Raw] =
V.valueToRaw(expr.peerId).map((_, iterableVM, ops)).flatMap {
case (Some(peerId), Some(vm), FuncOp(op)) =>
@ -62,16 +61,18 @@ class ParSeqSem[S[_]](val expr: ParSeqExpr[S]) extends AnyVal {
strategy = OnTag.ReturnStrategy.Relay.some
)
val (item, pair) = ForSem.itemOrPair(expr.item)
/**
* `parseq` => par (`never` as `last` in `fold`)
* So that peer initiating `parseq` would not continue execution past it
*/
ForTag
.par(expr.item.value, vm)
.par(item, vm, pair)
.wrap(
ParTag.wrap(
onTag.wrap(op),
NextTag(expr.item.value).leaf
NextTag(item).leaf
)
)
.toFuncOp

View File

@ -4,7 +4,7 @@ import aqua.helpers.syntax.optiont.*
import aqua.parser.expr.func.PushToStreamExpr
import aqua.parser.lexer.Token
import aqua.raw.Raw
import aqua.raw.ops.{Call, PushToStreamTag}
import aqua.raw.ops.{Call, PushToMapTag, PushToStreamTag}
import aqua.semantics.Prog
import aqua.semantics.rules.ValuesAlgebra
import aqua.semantics.rules.names.NamesAlgebra
@ -18,6 +18,8 @@ import cats.syntax.applicative.*
import cats.syntax.apply.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.option.*
import cats.syntax.traverse.*
class PushToStreamSem[S[_]](val expr: PushToStreamExpr[S]) extends AnyVal {
@ -25,9 +27,11 @@ class PushToStreamSem[S[_]](val expr: PushToStreamExpr[S]) extends AnyVal {
streamToken: Token[S],
elementToken: Token[S],
stream: Type,
element: Type
element: Type,
isMap: Boolean
)(using T: TypesAlgebra[S, Alg]): Alg[Boolean] = (
T.typeToStream(streamToken, stream),
(if (isMap) T.typeToStreamMap(streamToken, stream) else T.typeToStream(streamToken, stream))
.map(s => s: MutableStreamType),
T.typeToCollectible(elementToken, element)
).merged.semiflatMap { case (st, et) =>
T.ensureTypeMatches(elementToken, st.element, et)
@ -38,25 +42,27 @@ class PushToStreamSem[S[_]](val expr: PushToStreamExpr[S]) extends AnyVal {
T: TypesAlgebra[S, Alg],
V: ValuesAlgebra[S, Alg]
): Prog[Alg, Raw] =
V.valueToRaw(expr.value).flatMap {
case Some(vm) =>
N.read(expr.stream).flatMap {
case None => Raw.error("Cannot resolve stream type").pure[Alg]
case Some(t) =>
ensureStreamElementMatches(
expr.token,
expr.value,
t,
vm.`type`
).map {
case false =>
Raw.error("Stream type and element type does not match")
case true =>
PushToStreamTag(vm, Call.Export(expr.stream.value, t)).funcOpLeaf
}
}
case _ => Raw.error("Cannot resolve value").pure[Alg]
}
(for {
st <- OptionT(N.read(expr.stream))
(key, value) = expr.value.fold(
{ case (key, value) => (key.some, value) },
value => (none, value)
)
keyRaw <- key.traverse(k =>
for {
raw <- OptionT(V.valueToRaw(k))
_ <- OptionT.withFilterF(
T.ensureTypeMatches(k, ScalarType.string, raw.`type`)
)
} yield raw
)
valueRaw <- OptionT(V.valueToRaw(value))
_ <- OptionT.withFilterF(
ensureStreamElementMatches(expr.token, value, st, valueRaw.`type`, key.isDefined)
)
} yield (keyRaw match {
case Some(k) => PushToMapTag(k, valueRaw, Call.Export(expr.stream.value, st))
case None => PushToStreamTag(valueRaw, Call.Export(expr.stream.value, st))
}).funcOpLeaf).getOrElse(Raw.error("Cannot resolve push to stream"))
}

View File

@ -35,14 +35,14 @@ class ExportSem[S[_]: Comonad, C](expr: ExportExpr[S])(using
(),
error(
token,
s"The function '$name' cannot be exported, because it returns an arrow or an ability"
s"The function '$name' cannot be exported, because it returns an arrow, an ability or a stream map"
)
) combine Validated.condNec(
!ctx.funcAcceptAbility(name),
(),
error(
token,
s"The function '$name' cannot be exported, because it accepts an ability"
s"The function '$name' cannot be exported, because it accepts an ability or a stream map"
)
)

View File

@ -2,8 +2,7 @@ package aqua.semantics.header
import aqua.helpers.data.PName
import aqua.raw.{RawContext, RawPart}
import aqua.types.{AbilityType, ArrowType, Type}
import aqua.types.{AbilityType, ArrowType, StreamMapType, Type}
import cats.Semigroup
import cats.syntax.foldable.*
import cats.syntax.semigroup.*
@ -81,16 +80,18 @@ object Picker {
Picker[A].setExports(p, exports)
}
private def returnsAbilityOrArrow(arrowType: ArrowType): Boolean =
private def returnsAbilityOrArrowOrStreamMap(arrowType: ArrowType): Boolean =
arrowType.codomain.toList.exists {
case _: AbilityType => true
case _: ArrowType => true
case _: StreamMapType => true
case _ => false
}
private def acceptsAbility(arrowType: ArrowType): Boolean =
private def acceptsAbilityOrStreamMap(arrowType: ArrowType): Boolean =
arrowType.domain.toList.exists {
case _: AbilityType => true
case _: StreamMapType => true
case _ => false
}
@ -111,10 +112,10 @@ object Picker {
ctx.types.get(name).exists(isAbilityType)
override def funcReturnAbilityOrArrow(ctx: RawContext, name: String): Boolean =
ctx.funcs.get(name).map(_.arrow.`type`).exists(returnsAbilityOrArrow)
ctx.funcs.get(name).map(_.arrow.`type`).exists(returnsAbilityOrArrowOrStreamMap)
override def funcAcceptAbility(ctx: RawContext, name: String): Boolean =
ctx.funcs.get(name).map(_.arrow.`type`).exists(acceptsAbility)
ctx.funcs.get(name).map(_.arrow.`type`).exists(acceptsAbilityOrStreamMap)
override def funcNames(ctx: RawContext): Set[String] = ctx.funcs.keySet

View File

@ -12,6 +12,7 @@ import aqua.semantics.rules.names.NamesAlgebra
import aqua.semantics.rules.report.ReportAlgebra
import aqua.semantics.rules.types.TypesAlgebra
import aqua.types.*
import aqua.types.Type.isStreamMapType
import cats.Monad
import cats.data.{NonEmptyList, OptionT}
@ -85,7 +86,7 @@ class ValuesAlgebra[S[_], Alg[_]: Monad](using
idx <- OptionT(op.idx.fold(LiteralRaw.Zero.some.pure)(valueToRaw))
valueType <- OptionT(T.resolveIntoIndex(op, rootType, idx.`type`))
} yield IntoIndexRaw(idx, valueType)).value
case op: IntoApply[S] =>
case _: IntoApply[S] =>
internalError("Unexpected. `IntoApply` expected to be transformed into `NamedValueToken`")
}
@ -377,6 +378,9 @@ class ValuesAlgebra[S[_], Alg[_]: Monad](using
valueToRaw(v).flatMap(
_.flatTraverse {
case ca: CallArrowRaw => (ca, ca.baseType).some.pure[Alg]
case ApplyPropertyRaw(value, _) if isStreamMapType(value.baseType) =>
// TODO: make it possible to use `<-`
report.error(v, s"Use `=` instead of `<-` with map functions").as(none)
case apr @ ApplyPropertyRaw(_, IntoArrowRaw(_, arrowType, _)) =>
(apr, arrowType).some.pure[Alg]
// TODO: better error message (`raw` formatting)

View File

@ -56,6 +56,8 @@ object TypeResolution {
resolveCollection(dtt, "Array", ArrayType.apply)(state)
case StreamTypeToken(_, dtt) =>
resolveCollection(dtt, "Stream", StreamType.apply)(state)
case StreamMapTypeToken(_, dtt) =>
resolveCollection(dtt, "StreamMap", StreamMapType.apply)(state)
case OptionTypeToken(_, dtt) =>
resolveCollection(dtt, "Option", OptionType.apply)(state)
case ntt: NamedTypeToken[S] =>

View File

@ -13,7 +13,7 @@ trait TypesAlgebra[S[_], Alg[_]] {
def resolveType(token: TypeToken[S], mustBeDefined: Boolean = true): Alg[Option[Type]]
def resolveStreamType(token: TypeToken[S]): Alg[Option[StreamType]]
def resolveStreamType(token: TypeToken[S]): Alg[Option[MutableStreamType]]
def resolveNamedType(token: TypeToken[S]): Alg[Option[AbilityType | StructType]]
@ -130,6 +130,7 @@ trait TypesAlgebra[S[_], Alg[_]] {
def typeToCollectible(token: Token[S], givenType: Type): OptionT[Alg, CollectibleType]
def typeToStream(token: Token[S], givenType: Type): OptionT[Alg, StreamType]
def typeToStreamMap(token: Token[S], givenType: Type): OptionT[Alg, StreamMapType]
def typeToIterable(token: Token[S], givenType: Type): OptionT[Alg, CollectionType]

View File

@ -41,7 +41,10 @@ class TypesInterpreter[S[_], X](using
type ST[A] = State[X, A]
override def resolveType(token: TypeToken[S], mustBeDefined: Boolean = true): State[X, Option[Type]] =
override def resolveType(
token: TypeToken[S],
mustBeDefined: Boolean = true
): State[X, Option[Type]] =
getState.map(TypeResolution.resolveTypeToken(token)).flatMap {
case Valid(TypeResolution(typ, tokens)) =>
val tokensLocs = tokens.map { case (t, n) => n -> t }
@ -53,10 +56,10 @@ class TypesInterpreter[S[_], X](using
case _ => none.pure
}
override def resolveStreamType(token: TypeToken[S]): State[X, Option[StreamType]] =
override def resolveStreamType(token: TypeToken[S]): State[X, Option[MutableStreamType]] =
OptionT(resolveType(token)).flatMapF {
case st: StreamType => st.some.pure[ST]
case t => report.error(token, s"Expected stream type, got $t").as(none)
case st: MutableStreamType => st.some.pure[ST]
case t => report.error(token, s"Expected stream or stream map type, got $t").as(none)
}.value
def resolveNamedType(token: TypeToken[S]): State[X, Option[AbilityType | StructType]] =
@ -226,6 +229,62 @@ class TypesInterpreter[S[_], X](using
}
}
private def checkArrowType(
op: IntoArrow[S],
tOp: Option[Type],
rootName: String,
availableStr: String,
types: List[Type]
): State[X, Option[ArrowType]] = {
val opName = op.name.value
tOp match {
case Some(at: ArrowType) =>
val reportNotEnoughArguments =
/* Report at position of arrow application */
report
.error(
op,
s"Not enough arguments for arrow `$opName` in `$rootName`, " +
s"expected: ${at.domain.length}, given: ${op.arguments.length}"
)
.whenA(op.arguments.length < at.domain.length)
val reportTooManyArguments =
/* Report once at position of the first extra argument */
op.arguments.drop(at.domain.length).headOption.traverse_ { arg =>
report
.error(
arg,
s"Too many arguments for arrow `$opName` in `$rootName`, " +
s"expected: ${at.domain.length}, given: ${op.arguments.length}"
)
}
val checkArgumentTypes =
op.arguments
.zip(types)
.zip(at.domain.toList)
.forallM { case ((arg, argType), expectedType) =>
ensureTypeMatches(arg, expectedType, argType)
}
reportNotEnoughArguments *>
reportTooManyArguments *>
checkArgumentTypes.map(typesMatch =>
Option.when(
typesMatch && at.domain.length == op.arguments.length
)(at)
)
case Some(t) =>
report
.error(op, s"Field `$opName` has non arrow type `$t` in `$rootName`")
.as(None)
case None =>
report
.error(op, s"Arrow `$opName` not found in `$rootName`, available: $availableStr")
.as(None)
}
}
override def resolveIntoArrow(
op: IntoArrow[S],
rootT: Type,
@ -240,55 +299,12 @@ class TypesInterpreter[S[_], X](using
rootT match {
case ab: GeneralAbilityType =>
val abName = ab.fullName
ab.fields.lookup(opName) match {
case Some(at: ArrowType) =>
val reportNotEnoughArguments =
/* Report at position of arrow application */
report
.error(
op,
s"Not enough arguments for arrow `$opName` in `$abName`, " +
s"expected: ${at.domain.length}, given: ${op.arguments.length}"
)
.whenA(op.arguments.length < at.domain.length)
val reportTooManyArguments =
/* Report once at position of the first extra argument */
op.arguments.drop(at.domain.length).headOption.traverse_ { arg =>
report
.error(
arg,
s"Too many arguments for arrow `$opName` in `$abName`, " +
s"expected: ${at.domain.length}, given: ${op.arguments.length}"
)
}
val checkArgumentTypes =
op.arguments
.zip(types)
.zip(at.domain.toList)
.forallM { case ((arg, argType), expectedType) =>
ensureTypeMatches(arg, expectedType, argType)
}
locations.pointFieldLocation(ab.name, opName, op) *>
reportNotEnoughArguments *>
reportTooManyArguments *>
checkArgumentTypes.map(typesMatch =>
Option.when(
typesMatch && at.domain.length == op.arguments.length
)(at)
)
case Some(t) =>
report
.error(op, s"Field `$opName` has non arrow type `$t` in `$abName`")
.as(None)
case None =>
val available = ab.arrowFields.keys.map(k => s"`$k`").mkString(", ")
report
.error(op, s"Arrow `$opName` not found in `$abName`, available: $available")
.as(None)
}
val avStr = ab.arrowFields.keys.map(k => s"`$k`").mkString(", ")
locations.pointFieldLocation(ab.name, opName, op) *>
checkArrowType(op, ab.fields.lookup(opName), abName, avStr, types)
case st: StreamMapType =>
val avStr = StreamMapType.allFuncs.map(k => s"`${k.name}`").mkString(", ")
checkArrowType(op, st.funcByString(opName), st.toString, avStr, types)
case t =>
/* NOTE: Arrows are only supported on services and abilities,
(`.copy(...)` for structs is resolved by separate method) */
@ -536,7 +552,17 @@ class TypesInterpreter[S[_], X](using
typeTo[StreamType](
token,
givenType,
s"Expected stream value, got value of type '$givenType'"
s"Expected stream value (*), got value of type '$givenType'"
)
override def typeToStreamMap(
token: Token[S],
givenType: Type
): OptionT[State[X, *], StreamMapType] =
typeTo[StreamMapType](
token,
givenType,
s"Expected stream map value (%), got value of type '$givenType'"
)
override def typeToIterable(

View File

@ -607,7 +607,7 @@ class SemanticsSpec extends AnyFlatSpec with Matchers with Inside {
|""".stripMargin
insideBody(script) { body =>
matchSubtree(body) { case (ForTag("p", _, ForTag.Mode.ParMode), forTag) =>
matchSubtree(body) { case (ForTag("p", _, ForTag.Mode.ParMode, _), forTag) =>
matchChildren(forTag) { case (ParTag, parTag) =>
matchChildren(parTag)(
{ case (OnTag(_, _, strat), _) =>
@ -629,7 +629,7 @@ class SemanticsSpec extends AnyFlatSpec with Matchers with Inside {
|""".stripMargin
insideBody(script) { body =>
matchSubtree(body) { case (ForTag("i", stream, ForTag.Mode.RecMode), forTag) =>
matchSubtree(body) { case (ForTag("i", stream, ForTag.Mode.RecMode, _), forTag) =>
stream.`type` shouldBe StreamType(ScalarType.i32)
matchChildren(forTag) { case (ParTag, parTag) =>
matchChildren(parTag)(

View File

@ -46,7 +46,9 @@ sealed trait Type {
*/
def airPrefix: String = this match {
case _: StreamType => "$"
case _: CanonStreamType => "#"
case _: CanonStreamType => "#$"
case _: StreamMapType => "%"
case _: CanonStreamMapType => "#%"
case _ => ""
}
}
@ -312,7 +314,9 @@ object CollectionType {
sealed trait ImmutableCollectionType extends CollectionType with DataType {
def withElement(t: DataType): ImmutableCollectionType
}
sealed trait MutableStreamType extends CollectionType
sealed trait MutableStreamType extends CollectionType {
def toCanon: ImmutableCollectionType
}
case class CanonStreamType(
override val element: DataType
@ -320,7 +324,18 @@ case class CanonStreamType(
override val isStream: Boolean = false
override def toString: String = "#" + element
override def toString: String = "#$" + element
override def withElement(t: DataType): ImmutableCollectionType = copy(element = t)
}
case class CanonStreamMapType(
override val element: DataType
) extends ImmutableCollectionType {
override val isStream: Boolean = false
override def toString: String = "#%" + element
override def withElement(t: DataType): ImmutableCollectionType = copy(element = t)
}
@ -349,14 +364,57 @@ case class OptionType(
case class StreamMapType(override val element: DataType) extends MutableStreamType {
import StreamMapType.Func
import StreamMapType.Func.*
override val isStream: Boolean = true
override def withElement(t: DataType): MutableStreamType = copy(element = t)
override def toString: String = s"%$element"
def getFunc(f: Func): ArrowType ={
val (args, rets) = f match {
case Get =>
(ScalarType.string :: Nil) -> (ArrayType(element) :: Nil)
case GetStream =>
(ScalarType.string :: Nil) -> (StreamType(element) :: Nil)
case Keys =>
Nil -> (ArrayType(ScalarType.string) :: Nil)
case KeysStream =>
Nil -> (StreamType(ScalarType.string) :: Nil)
case Contains =>
(ScalarType.string :: Nil) -> (ScalarType.bool :: Nil)
}
ArrowType(ProductType(args), ProductType(rets))
}
def funcByString(s: String): Option[ArrowType] = {
StreamMapType.funcByString(s).map(getFunc)
}
def iterType(name: String): StructType =
StructType(name, NonEmptyMap.of("key" -> ScalarType.string, "value" -> element))
def toCanon: ImmutableCollectionType = CanonStreamMapType(element)
}
object StreamMapType {
enum Func(val name: String) {
case Get extends Func("get")
case GetStream extends Func("getStream")
case Keys extends Func("keys")
case KeysStream extends Func("keysStream")
case Contains extends Func("contains")
}
def funcByString(s: String): Option[Func] =
Func.values.find(_.name == s)
lazy val allFuncs: List[Func] = Func.values.toList
def top(): StreamMapType = StreamMapType(TopType)
}
@ -367,6 +425,8 @@ case class StreamType(override val element: DataType) extends MutableStreamType
override def toString: String = s"*$element"
override def withElement(t: DataType): StreamType = copy(element = t)
def toCanon: ImmutableCollectionType = CanonStreamType(element)
}
sealed trait NamedType extends Type {
@ -526,14 +586,20 @@ object Type {
/**
* `StreamType` is collectible with canonicalization
*/
type CollectibleType = DataType | StreamType
type CollectibleType = DataType | MutableStreamType
def isStreamType(t: Type): Boolean =
t match {
case _: MutableStreamType => true
case _ => false
}
def isStreamMapType(t: Type): Boolean =
t match {
case _: StreamMapType => true
case _ => false
}
given PartialOrder[Type] =
CompareTypes.partialOrder