A stream behaves like an iterator. It generates items in a loop and the user does something with them.
A stream is meant to be consumed once, when a final operation is executed. Therefore, a stream must release all the resources it held when closing (file handles, non GC-ed memory, etc.).
Any exception raised while generating items in a stream will be re-raised after closing the stream.
Types of operations
Final
An operation is final if it does not return a stream or a procedure that returns one.
Intermediate
Operations that return a stream or a lambda that will are called intermediate. They can be stateless or stateful.
Operation properties
Stateless
Stateless operations always return a stream without changing the step type (S).
Examples: map, filter.
Stateful
- Intermediate stateful operations can return:
A stream with a different step type.
Examples: takeWhile, limit
A lambda that returns a stream.
Examples: dropWhile, skip
This is to prevent the parallelization of this kind of operation (API to be done).
Final operations can also be stateful, such as findFirst.
Short-circuiting
Short-circuiting operations can terminate a stream if it is unnecesary to keep generating items.
Examples: takeWhile, any, all, none.
Backend restrictions
JavaScript
There is currently a bug that prevents the compiler from evaluating closures at compile time. So, streams are not available for compile time JavaScript.
NimScript
NimScript cannot currently catch exceptions with try-except clauses. Therefore, our implementation cannot guarantee that the "close" event will be triggered when an exception is raised while streaming. Consequently, the event will always do nothing and the API to modify it is not available for NimScript.
Imports
-
monad/identity, monad/io, monad/optional, monad/predicate, monad/reader, optics/lens, stream/loop, stream/streamsteps, stream/loop/loopscope, stream/loop/loopscope/runonceresult, utils/convert, utils/ifelse, utils/lambda, utils/operators, utils/pair, utils/partialprocs, utils/reducer, utils/somenatural, utils/unit, utils/variables
Types
Initializer[T] = IO[T]
- Source Edit
OnCloseEvent[T] = Reader[T, Unit]
- Source Edit
Stream[S; T] = object initialStep: Initializer[S] loop: Loop[S, T] onCloseEvent: OnCloseEvent[S]
- Source Edit
Procs
proc run[S](self: Stream[S, Unit]): Unit
- Source Edit
proc mapSteps[SA; SB; T](self: Stream[SA, T]; extractor: SB -> SA; stepperMapper: Stepper[SA] -> Stepper[SB]; initialStepMapper: SA -> SB): Stream[SB, T]
- Source Edit
proc forEach[S; T](self: Stream[S, T]; action: T -> Unit): Unit
- Source Edit
proc reduce[S; T; R](self: Stream[S, T]; reducer: Reducer[R, T]; initialResult: R): R
- Source Edit
proc reduceIfNotEmpty[S; T; R](self: Stream[S, T]; reducer: Reducer[R, T]; initialResult: () -> R): Optional[R]
- Source Edit
proc sum[S; N](self: Stream[S, N]): N
- + must be defined for N. Source Edit
proc count[S; T](self: Stream[S, T]; N: typedesc[SomeNatural]): N:type
- Counts the number of items generated by self. Source Edit
proc count[S; T; N: SomeNatural](self: Stream[S, T]): N
- Source Edit
proc findFirst[S; T](self: Stream[S, T]; predicate: Predicate[T]): Optional[T]
- Source Edit
proc findFirst[S; T](self: Stream[S, T]): Optional[T]
- Returns the first item generated by self, if any. Source Edit
proc any[S; T](self: Stream[S, T]; predicate: Predicate[T]): bool
-
Returns whether any items in self verifies predicate.
If self is empty, false is returned.
Source Edit proc all[S; T](self: Stream[S, T]; predicate: Predicate[T]): bool
-
Returns whether all the items in self verifies predicate.
If self is empty, true is returned.
Source Edit proc none[S; T](self: Stream[S, T]; predicate: Predicate[T]): bool
-
Returns whether none of the items in self verifies predicate.
If self is empty, true is returned.
Source Edit
Funcs
func initializer[T](self: IO[T]): Initializer[T]
- Source Edit
func onCloseEvent[T](self: Reader[T, Unit]): OnCloseEvent[T]
- Source Edit
func startingAt[S; T](loop: Loop[S, T]; initialStep: Initializer[S]; onCloseEvent: OnCloseEvent[S]): Stream[S, T]
- Source Edit
func startingAt[S; T](loop: Loop[S, T]; initialStep: Initializer[S]): Stream[S, T]
- An overload that will do nothing when the returned stream is closed. Source Edit
func initialStep[S; T](X: typedesc[Stream[S, T]]): Lens[X, Initializer[S]]
- Source Edit
func onCloseEvent[S; T](X: typedesc[Stream[S, T]]): Lens[X, OnCloseEvent[S]]
- Source Edit
func loop[S; A](X: typedesc[Stream[S, A]]; B: typedesc): PLens[X, Loop[S, A], Loop[S, B], Stream[S, B]]
- Source Edit
func loop[S; T](X: typedesc[Stream[S, T]]): Lens[X, Loop[S, T]]
- Source Edit
func scope[S; T](X: typedesc[Stream[S, T]]): Lens[X, LoopScope[S]]
- Source Edit
func condition[S; T](X: typedesc[Stream[S, T]]): Lens[X, Condition[S]]
- Source Edit
func generator[S; A](X: typedesc[Stream[S, A]]; B: typedesc): PLens[X, Generator[S, A], Generator[S, B], Stream[S, B]]
- Source Edit
func generator[S; T](X: typedesc[Stream[S, T]]): Lens[X, Generator[S, T]]
- Source Edit
func stepper[S; T](X: typedesc[Stream[S, T]]): Lens[X, Stepper[S]]
- Source Edit
func merge[SA; A; SB; B](self: Stream[SA, A]; other: Stream[SB, B]): Stream[Pair[SA, SB], Pair[A, B]]
-
Combines the items generated by self with the ones by other in a single stream.
If one of the stream stops generating items before the other, the returned stream will stop too.
Since 0.4.0.
Source Edit func emptyStream(T: typedesc): Stream[EmptyStep, T]
- Source Edit
func singleItemStream[T](item: () -> T): Stream[SingleStep, T]
- Source Edit
func onClose[S; T](self: Stream[S, T]; callback: () -> Unit): Stream[S, T]
-
Registers a callback to be called when closing self.
Callbacks will be called in the order of registration.
Source Edit func map[S; A; B](self: Stream[S, A]; f: A -> B): Stream[S, B]
- Source Edit
func filter[S; T](self: Stream[S, T]; predicate: Predicate[T]): Stream[S, T]
- Source Edit
func zip[SA; A; SB; B](self: Stream[SA, A]; other: Stream[SB, B]): Stream[ZipStep[SA, SB], Pair[A, B]]
-
An alias for merge.
Since 0.4.0.
Source Edit func peek[S; T](self: Stream[S, T]; f: T -> Unit): Stream[S, T]
- Useful when debugging a stream. Source Edit
func limit[S; T; N: SomeNatural](self: Stream[S, T]; n: N): Stream[LimitStep[S, N], T]
- Limits self to at most n items to be generated. Source Edit
func skip[S; T; N: SomeNatural](self: Stream[S, T]; n: N): () -> Stream[S, T]
-
- Returns a proc that will:
- Skip n items from self.
- Return a stream starting at the next item after the skipped ones, if it exists.
func takeWhile[S; T](self: Stream[S, T]; predicate: Predicate[T]): Stream[ TakeWhileStep[S, T], T]
- Returns a stream that will generate items while there are and they verify predicate. Source Edit
func dropWhile[S; T](self: Stream[S, T]; predicate: Predicate[T]): () -> Stream[S, T]
-
- Returns a proc that will:
- Skip the items in self while they verify predicate.
- Return a stream starting at the next item after the skipped ones.
Exports
-
Loop, stream/loop/loopscope, condition, stepType, looped, emptyLoop, Stepper, generator, stepper, stream/loop, breakIf, infiniteLoop, LoopScope, asReader, mapSteps, mapSteps, generator, infiniteLoop, Condition, generator, stepType, run, asReader, stepType, runOnce, emptyLoopScope, looped, map, condition, asReader, infinite, stepper, dropWhile, runOnce, merge, itemType, run, Generator, run, merge, generating, scope, itemType, mapSteps, mapSteps, stepType, stepper, condition, mapSteps, LoopScope, runOnce, emptyLoopScope, condition, stepType, looped, asReader, asReader, infinite, Stepper, merge, stepType, breakIf, stream/loop/loopscope, stepper, stepper, mapSteps, mapSteps, looped, run, run, Condition, mapSteps, condition