Skip to content

Flesh out the Stream API #12

@carllerche

Description

@carllerche

This issue is to track the overall API of Stream at a highish level and is a
work in progress. I'll update the original post as the discussion evolves.

impl Stream<T, E> {

    // Counts the number of elements in the Stream
    fn count(self) -> Future<usize, E>;

    // Chain this stream with another (could also be named concat)
    fn chain(self, next: Stream<T, E>) -> Stream<T, E>;

    // Creates a stream that iterates over both this and the specified streams
    // simultaneously, yielding the two elements as pairs.
    fn zip(self, other: Stream<U, E>) -> Stream<(T, U), E>;

    // Create a new stream that represents the application of the specified
    // function to each value of the original stream.
    fn map<U, F: Fn(T) -> U>(self, f: F) -> Stream<U, E>;

    // Creates a new stream that contains the values of the original stream
    // that match the specified predicate
    fn filter<P: F: Fn(T) -> bool>(self, f: P) -> Stream<T, E>;

    // Creates a new stream that both filters and maps elements
    fn filter_map<U, F: Fn(T) -> Option<U>>(self, f: F) -> Stream<U, E>;

    // Creates a stream that yields a pair of the original value and the
    // current iteration index
    fn enumerate(self) -> Stream<(u64, T), E>;

    // Limit the number of values to up to n
    fn take(self, n: u64) -> Stream<T, E>;

    // Take values as long as the supplied predicate returns true
    fn take_while<F: Fn(T) -> bool>(self, f: F) -> Stream<T, E>;

    // Take values from the original stream until the condition (supplied as an async value)
    // completes.
    fn take_until<A: Async>(self, a: A) -> Stream<T, E>;

    // Skip the first n values of the stream
    fn skip(self, n: u64) -> Stream<T, E>;

    // Skip values as long as the supplied predicate returns true
    fn skip_while<f: Fn(T) -> bool>(self, f: F) -> Stream<T, E>;

    // Skip values until the condition (supplied as an async value) completes.
    fn skip_until<A: Async>(self, a: A) -> Stream<T, E>;

    // Computes up to N values in parallel ahead of the consumer requesting
    // them, storing the computed values in the `Stream`.
    fn buffer(self, n: usize) -> Stream<T, E>;

    // Most of the functions from IteratorExt should be here implemented for Stream
}

Still needed

Waiting for a chunk of values

A function batch that blocks until the N first values of the stream are
completed, returning them as a Vec. What should happen if the stream doesn't
have n values left?

fn batch(self, n: usize) -> Future<(Vec<T>, Stream<T, E>), E>

Parallelizing computation

A function that allows processing the values of the stream "asynchronously"
returning a new Stream that represents the results as the computations
complete.

For example a stream of URLs.

url_stream.map(|url| http::get(url)) // => Stream<Future<Response>>

Now the goal would be to map that into Stream<Response> such that at most N
http requests are in flight and the resulting stream contains the responses as
the complete, regardless of the original order of the URLs.

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions