Stream

public class Stream<T> implements IObserver<T>

Constructors

Stream

public Stream(IProducer<T> producer)

Methods

combine

public <T2, R> Stream<R> combine(ICombinator<T, T2, R> combinator, Stream<T2> other)

debug

public Stream<T> debug(ISpy<T> spy)

Returns an output stream that identically behaves like the input Stream, but also runs a spy function fo each event, to help you debug your app.

debug takes a spy function as argument, and runs that for each event happening on the input stream. This helps you to understand the flow of events through some operator chain.

Please note that if the output stream has no subscribers, then it will not start, which means spy will never run because no actual event happens in that case.

Marble diagram:

--1----2-----3-----4--
       debug
--1----2-----3-----4--
Parameters:
  • spy – A function that takes an event as argument, and does not need to return anything.

See also: DebugOperator, ISpy

end

public void end()

error

public void error(Exception e)

filter

public Stream<T> filter(IPredicate<T> predicate)

Only allows events that pass the test given by the passes argument.

Each event from the input stream is given to the passes function. If the function returns true, the event is forwarded to the output stream, otherwise it is ignored and not forwarded.

Marble diagram:

--1---2--3-----4-----5---6--7-8--
   filter(i -> i % 2 == 0)
------2--------4---------6----8--
Parameters:
  • predicate – A function of type (t: T) +> boolean that takes an event from the input Stream and checks if it passes, by returning a boolean.

See also: FilterOperator, IPredicate

fold

public <U> Stream<U> fold(IAccumulator<U, T> accumulator)

Equivalent to Stream.fold(accumulator, null)

See also: Stream.fold(IAccumulator,Object)

fold

public <R> Stream<R> fold(IAccumulator<R, T> accumulator, R acc)

“Folds” the Stream onto itself.

Combines events from the past throughout the entire execution of the input stream, allowing you to accumulate them together. Also known as reduce. The returned stream is a MemoryStream, which means it is already Stream.remember()‘d.

The output stream starts by emitting the seed which you give as argument. Then, when an event happens on the input stream, it is combined with that seed value through the accumulate function, and the output value is emitted on the output stream. fold remembers that output value as acc (“accumulator”), and then when a new input event t happens, acc will be combined with that to produce the new acc and so forth.

Marble diagram:

------1-----1--2----1----1------
  fold((acc, x) -> acc + x, 3)
3-----4-----5--7----8----9------
Parameters:
  • accumulator – A function of type (acc: R, t: T) => R that takes the previous accumulated value acc and the incoming event from the input stream and produces the new accumulated value.
  • acc – The initial accumulated value, of type R.

See also: FoldOperator, IAccumulator

last

public Stream<T> last()

map

public <U> Stream<U> map(IProjection<T, U> projection)

Transforms each event from the input Stream through a project function, to get a Stream that emits those transformed events.

Marble diagram:

--1---3--5-----7------
   map(i => i * 10)
--10--30-50----70-----
Parameters:
  • projection – A function of type (t: T) => R that takes event t of type T from the input Stream and produces an event of type R, to be emitted on the output Stream.

See also: MapOperator, IProjection

merge

public final Stream<T> merge(Stream<T>... streams)

Blends multiple streams together. See Stream.merge(Collection)

merge

public Stream<T> merge(Collection<Stream<T>> streams)

Blends multiple streams together, emitting events from all of them concurrently.

merge takes multiple streams as arguments, and creates a stream that behaves like each of the argument streams, in parallel.

Marble diagram:

--1----2-----3--------4---
----a-----b----c---d------
           merge
--1-a--2--b--3-c---d--4---
Parameters:
  • streams – A collection of streams to merge with current stream

See also: MergeOperator

remember

public Stream<T> remember()

Returns an output Stream that behaves like the input stream, but also remembers the most recent event that happens on the input stream, so that a newly added subscriber will immediately receive that memorised event.

See also: MemoryStream

startWith

public Stream<T> startWith(T initial)

Prepends the given initial value to the sequence of events emitted by the input Stream. The returned stream is a MemoryStream, which means it is already Stream.remember()‘d.

Marble diagram:

---1---2-----3---
  startWith(0)
0--1---2-----3---
Parameters:
  • initial – The value or event to prepend..

See also: StartWithOperator, MemoryStream

subscribe

public void subscribe(IObserver<T> observer)

take

public Stream<T> take(int max)

Lets the first max many events from the input stream pass to the output stream, then makes the output stream complete.

Marble diagram:

--a---b--c----d---e--
  take(3)
--a---b--c|
Parameters:
  • max – How many events to allow from the input stream before completing the output stream.

See also: TakeOperator

takeUntil

public <R> Stream<T> takeUntil(Stream<R> other)

unsubscribe

public void unsubscribe(IObserver<T> observer)