Skip to main content

itertools/
groupbylazy.rs

1use alloc::vec::{self, Vec};
2use std::cell::{Cell, RefCell};
3use std::fmt::Debug;
4
5/// A trait to unify `FnMut` for `ChunkBy` with the chunk key in `IntoChunks`
6trait KeyFunction<A> {
7    type Key;
8    fn call_mut(&mut self, arg: A) -> Self::Key;
9}
10
11impl<A, K, F> KeyFunction<A> for F
12where
13    F: FnMut(A) -> K + ?Sized,
14{
15    type Key = K;
16    #[inline]
17    fn call_mut(&mut self, arg: A) -> Self::Key {
18        (*self)(arg)
19    }
20}
21
22/// `ChunkIndex` acts like the grouping key function for `IntoChunks`
23#[derive(Debug, Clone)]
24struct ChunkIndex {
25    size: usize,
26    index: usize,
27    key: usize,
28}
29
30impl ChunkIndex {
31    #[inline(always)]
32    fn new(size: usize) -> Self {
33        Self {
34            size,
35            index: 0,
36            key: 0,
37        }
38    }
39}
40
41impl<A> KeyFunction<A> for ChunkIndex {
42    type Key = usize;
43    #[inline(always)]
44    fn call_mut(&mut self, _arg: A) -> Self::Key {
45        if self.index == self.size {
46            self.key += 1;
47            self.index = 0;
48        }
49        self.index += 1;
50        self.key
51    }
52}
53
54#[derive(Clone)]
55struct GroupInner<K, I, F>
56where
57    I: Iterator,
58{
59    key: F,
60    iter: I,
61    current_key: Option<K>,
62    current_elt: Option<I::Item>,
63    /// flag set if iterator is exhausted
64    done: bool,
65    /// Index of group we are currently buffering or visiting
66    top_group: usize,
67    /// Least index for which we still have elements buffered
68    oldest_buffered_group: usize,
69    /// Group index for `buffer[0]` -- the slots
70    /// `bottom_group..oldest_buffered_group` are unused and will be erased when
71    /// that range is large enough.
72    bottom_group: usize,
73    /// Buffered groups, from `bottom_group` (index 0) to `top_group`.
74    buffer: Vec<vec::IntoIter<I::Item>>,
75    /// index of last group iter that was dropped,
76    /// `usize::MAX` initially when no group was dropped
77    dropped_group: usize,
78}
79
80impl<K, I, F> Debug for GroupInner<K, I, F>
81where
82    K: Debug,
83    I: Iterator + Debug,
84    I::Item: Debug,
85{
86    debug_fmt_fields!(
87        GroupInner,
88        // key, omitted because functions are almost never Debug
89        iter,
90        current_key,
91        current_elt,
92        done,
93        top_group,
94        oldest_buffered_group,
95        bottom_group,
96        buffer,
97        dropped_group
98    );
99}
100
101impl<K, I, F> GroupInner<K, I, F>
102where
103    I: Iterator,
104    F: for<'a> KeyFunction<&'a I::Item, Key = K>,
105    K: PartialEq,
106{
107    /// `client`: Index of group that requests next element
108    #[inline(always)]
109    fn step(&mut self, client: usize) -> Option<I::Item> {
110        /*
111        println!("client={}, bottom_group={}, oldest_buffered_group={}, top_group={}, buffers=[{}]",
112                 client, self.bottom_group, self.oldest_buffered_group,
113                 self.top_group,
114                 self.buffer.iter().map(|elt| elt.len()).format(", "));
115        */
116        if client < self.oldest_buffered_group {
117            None
118        } else if client < self.top_group
119            || (client == self.top_group && self.buffer.len() > self.top_group - self.bottom_group)
120        {
121            self.lookup_buffer(client)
122        } else if self.done {
123            None
124        } else if self.top_group == client {
125            self.step_current()
126        } else {
127            self.step_buffering(client)
128        }
129    }
130
131    #[inline(never)]
132    fn lookup_buffer(&mut self, client: usize) -> Option<I::Item> {
133        // if `bufidx` doesn't exist in self.buffer, it might be empty
134        let bufidx = client - self.bottom_group;
135        if client < self.oldest_buffered_group {
136            return None;
137        }
138        let elt = self.buffer.get_mut(bufidx).and_then(|queue| queue.next());
139        if elt.is_none() && client == self.oldest_buffered_group {
140            // FIXME: VecDeque is unfortunately not zero allocation when empty,
141            // so we do this job manually.
142            // `bottom_group..oldest_buffered_group` is unused, and if it's large enough, erase it.
143            self.oldest_buffered_group += 1;
144            // skip forward further empty queues too
145            while self
146                .buffer
147                .get(self.oldest_buffered_group - self.bottom_group)
148                .map_or(false, |buf| buf.len() == 0)
149            {
150                self.oldest_buffered_group += 1;
151            }
152
153            let nclear = self.oldest_buffered_group - self.bottom_group;
154            if nclear > 0 && nclear >= self.buffer.len() / 2 {
155                let mut i = 0;
156                self.buffer.retain(|buf| {
157                    i += 1;
158                    debug_assert!(buf.len() == 0 || i > nclear);
159                    i > nclear
160                });
161                self.bottom_group = self.oldest_buffered_group;
162            }
163        }
164        elt
165    }
166
167    /// Take the next element from the iterator, and set the done
168    /// flag if exhausted. Must not be called after done.
169    #[inline(always)]
170    fn next_element(&mut self) -> Option<I::Item> {
171        debug_assert!(!self.done);
172        match self.iter.next() {
173            None => {
174                self.done = true;
175                None
176            }
177            otherwise => otherwise,
178        }
179    }
180
181    #[inline(never)]
182    fn step_buffering(&mut self, client: usize) -> Option<I::Item> {
183        // requested a later group -- walk through the current group up to
184        // the requested group index, and buffer the elements (unless
185        // the group is marked as dropped).
186        // Because the `Groups` iterator is always the first to request
187        // each group index, client is the next index efter top_group.
188        debug_assert!(self.top_group + 1 == client);
189        let mut group = Vec::new();
190
191        if let Some(elt) = self.current_elt.take() {
192            if self.top_group != self.dropped_group {
193                group.push(elt);
194            }
195        }
196        let mut first_elt = None; // first element of the next group
197
198        while let Some(elt) = self.next_element() {
199            let key = self.key.call_mut(&elt);
200            if let Some(old_key) = self.current_key.take() {
201                if old_key != key {
202                    self.current_key = Some(key);
203                    first_elt = Some(elt);
204                    break;
205                }
206            }
207            self.current_key = Some(key);
208            if self.top_group != self.dropped_group {
209                group.push(elt);
210            }
211        }
212
213        if self.top_group != self.dropped_group {
214            self.push_next_group(group);
215        }
216        if first_elt.is_some() {
217            self.top_group += 1;
218            debug_assert!(self.top_group == client);
219        }
220        first_elt
221    }
222
223    fn push_next_group(&mut self, group: Vec<I::Item>) {
224        // When we add a new buffered group, fill up slots between oldest_buffered_group and top_group
225        while self.top_group - self.bottom_group > self.buffer.len() {
226            if self.buffer.is_empty() {
227                self.bottom_group += 1;
228                self.oldest_buffered_group += 1;
229            } else {
230                self.buffer.push(Vec::new().into_iter());
231            }
232        }
233        self.buffer.push(group.into_iter());
234        debug_assert!(self.top_group + 1 - self.bottom_group == self.buffer.len());
235    }
236
237    /// This is the immediate case, where we use no buffering
238    #[inline]
239    fn step_current(&mut self) -> Option<I::Item> {
240        debug_assert!(!self.done);
241        if let elt @ Some(..) = self.current_elt.take() {
242            return elt;
243        }
244        match self.next_element() {
245            None => None,
246            Some(elt) => {
247                let key = self.key.call_mut(&elt);
248                if let Some(old_key) = self.current_key.take() {
249                    if old_key != key {
250                        self.current_key = Some(key);
251                        self.current_elt = Some(elt);
252                        self.top_group += 1;
253                        return None;
254                    }
255                }
256                self.current_key = Some(key);
257                Some(elt)
258            }
259        }
260    }
261
262    /// Request the just started groups' key.
263    ///
264    /// `client`: Index of group
265    ///
266    /// **Panics** if no group key is available.
267    fn group_key(&mut self, client: usize) -> K {
268        // This can only be called after we have just returned the first
269        // element of a group.
270        // Perform this by simply buffering one more element, grabbing the
271        // next key.
272        debug_assert!(!self.done);
273        debug_assert!(client == self.top_group);
274        debug_assert!(self.current_key.is_some());
275        debug_assert!(self.current_elt.is_none());
276        let old_key = self.current_key.take().unwrap();
277        if let Some(elt) = self.next_element() {
278            let key = self.key.call_mut(&elt);
279            if old_key != key {
280                self.top_group += 1;
281            }
282            self.current_key = Some(key);
283            self.current_elt = Some(elt);
284        }
285        old_key
286    }
287}
288
289impl<K, I, F> GroupInner<K, I, F>
290where
291    I: Iterator,
292{
293    /// Called when a group is dropped
294    fn drop_group(&mut self, client: usize) {
295        // It's only useful to track the maximal index
296        if self.dropped_group == !0 || client > self.dropped_group {
297            self.dropped_group = client;
298        }
299    }
300}
301
302#[deprecated(note = "Use `ChunkBy` instead", since = "0.13.0")]
303/// See [`ChunkBy`](crate::structs::ChunkBy).
304pub type GroupBy<K, I, F> = ChunkBy<K, I, F>;
305
306/// `ChunkBy` is the storage for the lazy grouping operation.
307///
308/// If the groups are consumed in their original order, or if each
309/// group is dropped without keeping it around, then `ChunkBy` uses
310/// no allocations. It needs allocations only if several group iterators
311/// are alive at the same time.
312///
313/// This type implements [`IntoIterator`] (it is **not** an iterator
314/// itself), because the group iterators need to borrow from this
315/// value. It should be stored in a local variable or temporary and
316/// iterated.
317///
318/// See [`.chunk_by()`](crate::Itertools::chunk_by) for more information.
319#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
320pub struct ChunkBy<K, I, F>
321where
322    I: Iterator,
323{
324    inner: RefCell<GroupInner<K, I, F>>,
325    // the group iterator's current index. Keep this in the main value
326    // so that simultaneous iterators all use the same state.
327    index: Cell<usize>,
328}
329
330impl<K, I, F> Debug for ChunkBy<K, I, F>
331where
332    K: Debug,
333    I: Iterator + Debug,
334    I::Item: Debug,
335{
336    debug_fmt_fields!(ChunkBy, inner, index);
337}
338
339/// Create a new
340pub fn new<K, J, F>(iter: J, f: F) -> ChunkBy<K, J::IntoIter, F>
341where
342    J: IntoIterator,
343    F: FnMut(&J::Item) -> K,
344{
345    ChunkBy {
346        inner: RefCell::new(GroupInner {
347            key: f,
348            iter: iter.into_iter(),
349            current_key: None,
350            current_elt: None,
351            done: false,
352            top_group: 0,
353            oldest_buffered_group: 0,
354            bottom_group: 0,
355            buffer: Vec::new(),
356            dropped_group: !0,
357        }),
358        index: Cell::new(0),
359    }
360}
361
362impl<K, I, F> ChunkBy<K, I, F>
363where
364    I: Iterator,
365{
366    /// `client`: Index of group that requests next element
367    fn step(&self, client: usize) -> Option<I::Item>
368    where
369        F: FnMut(&I::Item) -> K,
370        K: PartialEq,
371    {
372        self.inner.borrow_mut().step(client)
373    }
374
375    /// `client`: Index of group
376    fn drop_group(&self, client: usize) {
377        self.inner.borrow_mut().drop_group(client);
378    }
379}
380
381impl<'a, K, I, F> IntoIterator for &'a ChunkBy<K, I, F>
382where
383    I: Iterator,
384    I::Item: 'a,
385    F: FnMut(&I::Item) -> K,
386    K: PartialEq,
387{
388    type Item = (K, Group<'a, K, I, F>);
389    type IntoIter = Groups<'a, K, I, F>;
390
391    fn into_iter(self) -> Self::IntoIter {
392        Groups { parent: self }
393    }
394}
395
396/// An iterator that yields the Group iterators.
397///
398/// Iterator element type is `(K, Group)`:
399/// the group's key `K` and the group's iterator.
400///
401/// See [`.chunk_by()`](crate::Itertools::chunk_by) for more information.
402#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
403pub struct Groups<'a, K, I, F>
404where
405    I: Iterator + 'a,
406    I::Item: 'a,
407    K: 'a,
408    F: 'a,
409{
410    parent: &'a ChunkBy<K, I, F>,
411}
412
413impl<'a, K, I, F> Debug for Groups<'a, K, I, F>
414where
415    K: Debug,
416    I: Iterator + Debug,
417    I::Item: Debug,
418{
419    debug_fmt_fields!(Groups, parent);
420}
421
422impl<'a, K, I, F> Iterator for Groups<'a, K, I, F>
423where
424    I: Iterator,
425    I::Item: 'a,
426    F: FnMut(&I::Item) -> K,
427    K: PartialEq,
428{
429    type Item = (K, Group<'a, K, I, F>);
430
431    #[inline]
432    fn next(&mut self) -> Option<Self::Item> {
433        let index = self.parent.index.get();
434        self.parent.index.set(index + 1);
435        let inner = &mut *self.parent.inner.borrow_mut();
436        inner.step(index).map(|elt| {
437            let key = inner.group_key(index);
438            (
439                key,
440                Group {
441                    parent: self.parent,
442                    index,
443                    first: Some(elt),
444                },
445            )
446        })
447    }
448}
449
450/// An iterator for the elements in a single group.
451///
452/// Iterator element type is `I::Item`.
453pub struct Group<'a, K, I, F>
454where
455    I: Iterator + 'a,
456    I::Item: 'a,
457    K: 'a,
458    F: 'a,
459{
460    parent: &'a ChunkBy<K, I, F>,
461    index: usize,
462    first: Option<I::Item>,
463}
464
465impl<'a, K, I, F> Drop for Group<'a, K, I, F>
466where
467    I: Iterator,
468    I::Item: 'a,
469{
470    fn drop(&mut self) {
471        self.parent.drop_group(self.index);
472    }
473}
474
475impl<'a, K, I, F> Debug for Group<'a, K, I, F>
476where
477    K: Debug,
478    I: Iterator + Debug,
479    I::Item: Debug,
480{
481    debug_fmt_fields!(Group, parent, index, first);
482}
483
484impl<'a, K, I, F> Iterator for Group<'a, K, I, F>
485where
486    I: Iterator,
487    I::Item: 'a,
488    F: FnMut(&I::Item) -> K,
489    K: PartialEq,
490{
491    type Item = I::Item;
492    #[inline]
493    fn next(&mut self) -> Option<Self::Item> {
494        if let elt @ Some(..) = self.first.take() {
495            return elt;
496        }
497        self.parent.step(self.index)
498    }
499}
500
501///// IntoChunks /////
502
503/// Create a new
504pub fn new_chunks<J>(iter: J, size: usize) -> IntoChunks<J::IntoIter>
505where
506    J: IntoIterator,
507{
508    IntoChunks {
509        inner: RefCell::new(GroupInner {
510            key: ChunkIndex::new(size),
511            iter: iter.into_iter(),
512            current_key: None,
513            current_elt: None,
514            done: false,
515            top_group: 0,
516            oldest_buffered_group: 0,
517            bottom_group: 0,
518            buffer: Vec::new(),
519            dropped_group: !0,
520        }),
521        index: Cell::new(0),
522    }
523}
524
525/// `IntoChunks` is the storage for a lazy chunking operation.
526///
527/// `IntoChunks` behaves just like `ChunkBy`: it is iterable, and
528/// it only buffers if several chunk iterators are alive at the same time.
529///
530/// This type implements [`IntoIterator`] (it is **not** an iterator
531/// itself), because the chunk iterators need to borrow from this
532/// value. It should be stored in a local variable or temporary and
533/// iterated.
534///
535/// Iterator element type is `Chunk`, each chunk's iterator.
536///
537/// See [`.chunks()`](crate::Itertools::chunks) for more information.
538#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
539pub struct IntoChunks<I>
540where
541    I: Iterator,
542{
543    inner: RefCell<GroupInner<usize, I, ChunkIndex>>,
544    // the chunk iterator's current index. Keep this in the main value
545    // so that simultaneous iterators all use the same state.
546    index: Cell<usize>,
547}
548
549impl<I> Debug for IntoChunks<I>
550where
551    I: Iterator + Debug,
552    I::Item: Debug,
553{
554    debug_fmt_fields!(IntoChunks, inner, index);
555}
556
557impl<I> Clone for IntoChunks<I>
558where
559    I: Clone + Iterator,
560    I::Item: Clone,
561{
562    clone_fields!(inner, index);
563}
564
565impl<I> IntoChunks<I>
566where
567    I: Iterator,
568{
569    /// `client`: Index of chunk that requests next element
570    fn step(&self, client: usize) -> Option<I::Item> {
571        self.inner.borrow_mut().step(client)
572    }
573
574    /// `client`: Index of chunk
575    fn drop_group(&self, client: usize) {
576        self.inner.borrow_mut().drop_group(client);
577    }
578}
579
580impl<'a, I> IntoIterator for &'a IntoChunks<I>
581where
582    I: Iterator,
583    I::Item: 'a,
584{
585    type Item = Chunk<'a, I>;
586    type IntoIter = Chunks<'a, I>;
587
588    fn into_iter(self) -> Self::IntoIter {
589        Chunks { parent: self }
590    }
591}
592
593/// An iterator that yields the Chunk iterators.
594///
595/// Iterator element type is `Chunk`.
596///
597/// See [`.chunks()`](crate::Itertools::chunks) for more information.
598#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
599#[derive(Clone)]
600pub struct Chunks<'a, I>
601where
602    I: Iterator + 'a,
603    I::Item: 'a,
604{
605    parent: &'a IntoChunks<I>,
606}
607
608impl<'a, I> Debug for Chunks<'a, I>
609where
610    I: Iterator + Debug,
611    I::Item: Debug,
612{
613    debug_fmt_fields!(Chunks, parent);
614}
615
616impl<'a, I> Iterator for Chunks<'a, I>
617where
618    I: Iterator,
619    I::Item: 'a,
620{
621    type Item = Chunk<'a, I>;
622
623    #[inline]
624    fn next(&mut self) -> Option<Self::Item> {
625        let index = self.parent.index.get();
626        self.parent.index.set(index + 1);
627        let inner = &mut *self.parent.inner.borrow_mut();
628        inner.step(index).map(|elt| Chunk {
629            parent: self.parent,
630            index,
631            first: Some(elt),
632        })
633    }
634}
635
636/// An iterator for the elements in a single chunk.
637///
638/// Iterator element type is `I::Item`.
639#[derive(Debug)]
640pub struct Chunk<'a, I>
641where
642    I: Iterator + 'a,
643    I::Item: 'a,
644{
645    parent: &'a IntoChunks<I>,
646    index: usize,
647    first: Option<I::Item>,
648}
649
650impl<'a, I> Drop for Chunk<'a, I>
651where
652    I: Iterator,
653    I::Item: 'a,
654{
655    fn drop(&mut self) {
656        self.parent.drop_group(self.index);
657    }
658}
659
660impl<'a, I> Iterator for Chunk<'a, I>
661where
662    I: Iterator,
663    I::Item: 'a,
664{
665    type Item = I::Item;
666    #[inline]
667    fn next(&mut self) -> Option<Self::Item> {
668        if let elt @ Some(..) = self.first.take() {
669            return elt;
670        }
671        self.parent.step(self.index)
672    }
673}