1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
use std::sync::Arc;
use std::sync::mpsc::{Sender, Receiver, channel};
use std::iter::FromIterator;

use super::Pollresult::*;
use super::Future;

use cvmx::CvMx;

/// Stream of multiple `Future`s
///
/// A `FutureStream` can be used to wait for multiple `Future`s, and return them incrementally as
/// they are resolved.
///
/// It implements an iterator over completed `Future`s, and can be constructed from an iterator of
/// `Future`s.
///
/// May be cloned and the clones passed to other threads so that `Future`s may be added from multiple
/// threads.
#[derive(Clone)]
pub struct FutureStream<T> {
    tx: Sender<Option<T>>,                  // values from Promise
    inner: Arc<CvMx<FutureStreamInner<T>>>, // set of waited-for futures
}

/// Waiter for `Future`s in a `FutureStream`.
///
/// A singleton waiter for `Future`s, associated with a specific `FutureStream`. This may be used
/// in a multithreaded environment to wait for `Futures` to resolve while other threads fulfill
/// `Promises` and add new `Future`s to the `FutureStream`.
///
/// ```
/// # use ::promising_future::{Future,FutureStream};
/// # let future = Future::with_value(());
/// let fs = FutureStream::new();
/// fs.add(future);
/// // ...
/// let mut waiter = fs.waiter();
/// while let Some(future) = waiter.wait() {
///     match future.value() {
///       None => (),         // Future unfulfilled
///       Some(val) => val,
///     }
/// }
/// ```
///
/// It may also be converted into an `Iterator` over the values yielded by resolved `Future`s
/// (unfulfilled `Promise`s are ignored).
///
/// ```
/// # use ::promising_future::{Future,FutureStream};
/// # let fut1 = Future::with_value(());
/// let fs = FutureStream::new();
/// fs.add(fut1);
/// for val in fs.waiter() {
///    // ...
/// }
/// ```
pub struct FutureStreamWaiter<'a, T: 'a> {
    fs: &'a FutureStream<T>,
    rx: Option<Receiver<Option<T>>>,        // Option so that Drop can remove it
}

struct FutureStreamInner<T> {
    pending: usize,
    rx: Option<Receiver<Option<T>>>,        // value receiver (if not passed to a waiter)
}

impl<T> FutureStream<T> {
    pub fn new() -> FutureStream<T> {
        let (tx, rx) = channel();
        let inner = FutureStreamInner {
            rx: Some(rx),
            pending: 0,
        };

        FutureStream {
            tx: tx,
            inner: Arc::new(CvMx::new(inner)),
        }
    }

    /// Add a `Future` to the stream.
    pub fn add(&self, fut: Future<T>) where T: Send + 'static {
        let mut inner = self.inner.mx.lock().unwrap();
        let tx = self.tx.clone();

        inner.pending += 1;
        // If `tx.send()` fails, then it just means the waiter/FutureStream has gone away
        fut.callback_unit(move |v| { let _ = tx.send(v); })
    }

    /// Return number of outstanding `Future`s.
    pub fn outstanding(&self) -> usize {
        self.inner.mx.lock().unwrap().pending
    }

    /// Return a singleton `FutureStreamWaiter`. If one already exists, block until it is released.
    pub fn waiter<'fs>(&'fs self) -> FutureStreamWaiter<'fs, T> {
        let mut inner = self.inner.mx.lock().unwrap();

        loop {
            match inner.rx.take() {
                None => { inner = self.inner.cv.wait(inner).unwrap() },
                Some(rx) => return FutureStreamWaiter::new(self, rx),
            }
        }
    }

    /// Return a singleton `FutureStreamWaiter`. Returns `None` if one already exists.
    pub fn try_waiter<'fs>(&'fs self) -> Option<FutureStreamWaiter<'fs, T>> {
        let mut inner = self.inner.mx.lock().unwrap();

        match inner.rx.take() {
            None => None,
            Some(rx) => Some(FutureStreamWaiter::new(self, rx)),
        }
    }

    fn return_waiter(&self, rx: Receiver<Option<T>>) {
        let mut inner = self.inner.mx.lock().unwrap();

        assert!(inner.rx.is_none());
        inner.rx = Some(rx);
        self.inner.cv.notify_one();
    }

    /// Return a resolved `Future` if any, but don't wait for more to resolve.
    pub fn poll(&self) -> Option<Future<T>> {
        self.waiter().poll()
    }

    /// Return resolved `Future`s. Blocks if there are outstanding `Futures` which are not yet
    /// resolved. Returns `None` when there are no more outstanding `Future`s.
    pub fn wait(&self) -> Option<Future<T>> {
        self.waiter().wait()
    }
}

impl<'fs, T> FutureStreamWaiter<'fs, T> {
    fn new(fs: &'fs FutureStream<T>, rx: Receiver<Option<T>>) -> FutureStreamWaiter<'fs, T> {
        FutureStreamWaiter { fs: fs, rx: Some(rx) }
    }

    /// Return resolved `Future`s. Blocks if there are outstanding `Futures` which are not yet
    /// resolved. Returns `None` when there are no more outstanding `Future`s.
    pub fn wait(&mut self) -> Option<Future<T>> {
        if { let l = self.fs.inner.mx.lock().unwrap(); l.pending == 0 } {
            // Nothing left
            None
        } else {
            // Wait for the next completion notification
            match self.rx.as_ref().unwrap().recv() {
                Ok(val) => {
                    let mut l = self.fs.inner.mx.lock().unwrap();
                    l.pending -= 1;
                    Some(Future::from(val))
                },
                Err(_) => None,
            }
        }
    }

    /// Return next resolved `Future`, but don't wait for more to resolve.
    pub fn poll(&mut self) -> Option<Future<T>> {
        let mut inner = self.fs.inner.mx.lock().unwrap();

        if inner.pending == 0 {
            None
        } else {
            match self.rx.as_ref().unwrap().try_recv() {
                Ok(val) => { inner.pending -= 1; Some(Future::from(val)) },
                Err(_) => None,
            }
        }
    }
}

impl<'fs, T> Drop for FutureStreamWaiter<'fs, T> {
    fn drop(&mut self) {
        // Return notifications to FutureStream
        self.fs.return_waiter(self.rx.take().unwrap())
    }
}

/// Iterator for completed `Future`s in a `FutureStream`. The iterator incrementally returns values
/// from resolved `Future`s, blocking while there are no unresolved `Future`s. `Future`s which
/// resolve to no value are discarded.
pub struct FutureStreamIter<'a, T: 'a>(FutureStreamWaiter<'a, T>);

impl<'fs, T: 'fs> IntoIterator for FutureStreamWaiter<'fs, T> {
    type Item = T;
    type IntoIter = FutureStreamIter<'fs, T>;

    fn into_iter(self) -> Self::IntoIter { FutureStreamIter(self) }
}

impl<'a, T: 'a> Iterator for FutureStreamIter<'a, T> {
    type Item = T;

    // Get next Future resolved with value, if any
    fn next(&mut self) -> Option<Self::Item> {
        loop {
            match self.0.wait() {
                None => return None,
                Some(fut) => {
                    match fut.poll() {
                        Unresolved(_) => panic!("FutureStreamWait.wait returned unresolved Future"),
                        Resolved(v@Some(_)) => return v,
                        Resolved(None) => (),
                    }
                },
            }
        }
    }
}

impl<'a, T: 'a> IntoIterator for &'a FutureStream<T> {
    type Item = T;
    type IntoIter = FutureStreamIter<'a, T>;

    fn into_iter(self) -> Self::IntoIter { self.waiter().into_iter() }
}

impl<T: Send + 'static> FromIterator<Future<T>> for FutureStream<T> {
    // XXX lazily consume input iterator?
    fn from_iter<I>(iterator: I) -> Self
        where I: IntoIterator<Item=Future<T>>
    {
        let stream = FutureStream::new();
        for f in iterator.into_iter() {
            stream.add(f)
        }

        stream
    }
}