nim_iterator_stream_experiment/stream

A stream behaves like an iterator. It generates items in a loop and the user does something with them.

A stream is meant to be consumed once, when a final operation is executed. Therefore, a stream must release all the resources it held when closing (file handles, non GC-ed memory, etc.).

Any exception raised while generating items in a stream will be re-raised after closing the stream.

Types of operations

Final

An operation is final if it does not return a stream or a procedure that returns one.

Intermediate

Operations that return a stream or a lambda that will are called intermediate. They can be stateless or stateful.

Operation properties

Stateless

Stateless operations always return a stream without changing the step type (S).

Examples: map, filter.

Stateful

Intermediate stateful operations can return:
  • A stream with a different step type.

    Examples: takeWhile, limit

  • A lambda that returns a stream.

    Examples: dropWhile, skip

    This is to prevent the parallelization of this kind of operation (API to be done).

Final operations can also be stateful, such as findFirst.

Short-circuiting

Short-circuiting operations can terminate a stream if it is unnecesary to keep generating items.

Examples: takeWhile, any, all, none.

Backend restrictions

JavaScript

There is currently a bug that prevents the compiler from evaluating closures at compile time. So, streams are not available for compile time JavaScript.

NimScript

NimScript cannot currently catch exceptions with try-except clauses. Therefore, our implementation cannot guarantee that the "close" event will be triggered when an exception is raised while streaming. Consequently, the event will always do nothing and the API to modify it is not available for NimScript.

Types

Initializer[T] = IO[T]
  Source Edit
OnCloseEvent[T] = Reader[T, Unit]
  Source Edit
Stream[S; T] = object
  initialStep: Initializer[S]
  loop: Loop[S, T]
  onCloseEvent: OnCloseEvent[S]
  Source Edit

Procs

proc run[S](self: Stream[S, Unit]): Unit
  Source Edit
proc mapSteps[SA; SB; T](self: Stream[SA, T]; extractor: SB -> SA;
                      stepperMapper: Stepper[SA] -> Stepper[SB];
                      initialStepMapper: SA -> SB): Stream[SB, T]
  Source Edit
proc forEach[S; T](self: Stream[S, T]; action: T -> Unit): Unit
  Source Edit
proc reduce[S; T; R](self: Stream[S, T]; reducer: Reducer[R, T]; initialResult: R): R
  Source Edit
proc reduceIfNotEmpty[S; T; R](self: Stream[S, T]; reducer: Reducer[R, T];
                            initialResult: () -> R): Optional[R]
  Source Edit
proc sum[S; N](self: Stream[S, N]): N
+ must be defined for N.   Source Edit
proc count[S; T](self: Stream[S, T]; N: typedesc[SomeNatural]): N:type
Counts the number of items generated by self.   Source Edit
proc count[S; T; N: SomeNatural](self: Stream[S, T]): N
  Source Edit
proc findFirst[S; T](self: Stream[S, T]; predicate: Predicate[T]): Optional[T]
  Source Edit
proc findFirst[S; T](self: Stream[S, T]): Optional[T]
Returns the first item generated by self, if any.   Source Edit
proc any[S; T](self: Stream[S, T]; predicate: Predicate[T]): bool

Returns whether any items in self verifies predicate.

If self is empty, false is returned.

  Source Edit
proc all[S; T](self: Stream[S, T]; predicate: Predicate[T]): bool

Returns whether all the items in self verifies predicate.

If self is empty, true is returned.

  Source Edit
proc none[S; T](self: Stream[S, T]; predicate: Predicate[T]): bool

Returns whether none of the items in self verifies predicate.

If self is empty, true is returned.

  Source Edit

Funcs

func initializer[T](self: IO[T]): Initializer[T]
  Source Edit
func onCloseEvent[T](self: Reader[T, Unit]): OnCloseEvent[T]
  Source Edit
func startingAt[S; T](loop: Loop[S, T]; initialStep: Initializer[S];
                    onCloseEvent: OnCloseEvent[S]): Stream[S, T]
  Source Edit
func startingAt[S; T](loop: Loop[S, T]; initialStep: Initializer[S]): Stream[S, T]
An overload that will do nothing when the returned stream is closed.   Source Edit
func initialStep[S; T](X: typedesc[Stream[S, T]]): Lens[X, Initializer[S]]
  Source Edit
func onCloseEvent[S; T](X: typedesc[Stream[S, T]]): Lens[X, OnCloseEvent[S]]
  Source Edit
func loop[S; A](X: typedesc[Stream[S, A]]; B: typedesc): PLens[X, Loop[S, A], Loop[S, B],
    Stream[S, B]]
  Source Edit
func loop[S; T](X: typedesc[Stream[S, T]]): Lens[X, Loop[S, T]]
  Source Edit
func scope[S; T](X: typedesc[Stream[S, T]]): Lens[X, LoopScope[S]]
  Source Edit
func condition[S; T](X: typedesc[Stream[S, T]]): Lens[X, Condition[S]]
  Source Edit
func generator[S; A](X: typedesc[Stream[S, A]]; B: typedesc): PLens[X, Generator[S, A],
    Generator[S, B], Stream[S, B]]
  Source Edit
func generator[S; T](X: typedesc[Stream[S, T]]): Lens[X, Generator[S, T]]
  Source Edit
func stepper[S; T](X: typedesc[Stream[S, T]]): Lens[X, Stepper[S]]
  Source Edit
func merge[SA; A; SB; B](self: Stream[SA, A]; other: Stream[SB, B]): Stream[Pair[SA, SB],
    Pair[A, B]]

Combines the items generated by self with the ones by other in a single stream.

If one of the stream stops generating items before the other, the returned stream will stop too.

Since 0.4.0.

  Source Edit
func emptyStream(T: typedesc): Stream[EmptyStep, T]
  Source Edit
func singleItemStream[T](item: () -> T): Stream[SingleStep, T]
  Source Edit
func onClose[S; T](self: Stream[S, T]; callback: () -> Unit): Stream[S, T]

Registers a callback to be called when closing self.

Callbacks will be called in the order of registration.

  Source Edit
func map[S; A; B](self: Stream[S, A]; f: A -> B): Stream[S, B]
  Source Edit
func filter[S; T](self: Stream[S, T]; predicate: Predicate[T]): Stream[S, T]
  Source Edit
func zip[SA; A; SB; B](self: Stream[SA, A]; other: Stream[SB, B]): Stream[ZipStep[SA, SB],
    Pair[A, B]]

An alias for merge.

Since 0.4.0.

  Source Edit
func peek[S; T](self: Stream[S, T]; f: T -> Unit): Stream[S, T]
Useful when debugging a stream.   Source Edit
func limit[S; T; N: SomeNatural](self: Stream[S, T]; n: N): Stream[LimitStep[S, N], T]
Limits self to at most n items to be generated.   Source Edit
func skip[S; T; N: SomeNatural](self: Stream[S, T]; n: N): () -> Stream[S, T]
Returns a proc that will:
  • Skip n items from self.
  • Return a stream starting at the next item after the skipped ones, if it exists.
  Source Edit
func takeWhile[S; T](self: Stream[S, T]; predicate: Predicate[T]): Stream[
    TakeWhileStep[S, T], T]
Returns a stream that will generate items while there are and they verify predicate.   Source Edit
func dropWhile[S; T](self: Stream[S, T]; predicate: Predicate[T]): () -> Stream[S, T]
Returns a proc that will:
  • Skip the items in self while they verify predicate.
  • Return a stream starting at the next item after the skipped ones.
  Source Edit

Templates

template stepType[S; T; ](X: typedesc[Stream[S, T]]): typedesc[S]
  Source Edit
template stepType[S; T; ](self: Stream[S, T]): typedesc[S]
  Source Edit
template itemType[S; T; ](X: typedesc[Stream[S, T]]): typedesc[T]
  Source Edit
template itemType[S; T; ](self: Stream[S, T]): typedesc[T]
  Source Edit