Skip to main content

miniextendr_api/altrep_data/
stream.rs

1//! Streaming ALTREP data backed by chunk-cached reader closures.
2//!
3//! These types provide ALTREP vectors where elements are loaded on-demand
4//! from a reader function in fixed-size chunks. Chunks are cached for
5//! repeated access within the same region.
6
7use std::cell::RefCell;
8use std::collections::BTreeMap;
9
10use super::{AltIntegerData, AltRealData, AltrepLen, InferBase};
11
12// region: StreamingRealData
13
14/// Streaming ALTREP for real (f64) vectors.
15///
16/// Elements are loaded on-demand via a reader closure in fixed-size chunks.
17/// Chunks are cached in a `BTreeMap` for repeated access.
18///
19/// # Reader Contract
20///
21/// The reader `F(start, buf) -> count` fills `buf` with elements starting
22/// at index `start` and returns the number of elements actually written.
23///
24/// # Example
25///
26/// ```ignore
27/// use miniextendr_api::altrep_data::StreamingRealData;
28///
29/// let data = StreamingRealData::new(1000, 64, |start, buf| {
30///     let count = buf.len().min(1000 - start);
31///     for (i, slot) in buf[..count].iter_mut().enumerate() {
32///         *slot = (start + i) as f64 * 0.1;
33///     }
34///     count
35/// });
36/// ```
37pub struct StreamingRealData<F: Fn(usize, &mut [f64]) -> usize> {
38    len: usize,
39    reader: F,
40    cache: RefCell<BTreeMap<usize, Vec<f64>>>,
41    chunk_size: usize,
42}
43
44impl<F: Fn(usize, &mut [f64]) -> usize> StreamingRealData<F> {
45    /// Create a new streaming real data source.
46    ///
47    /// - `len`: total number of elements
48    /// - `chunk_size`: number of elements per cache chunk
49    /// - `reader`: closure that fills a buffer starting at a given index
50    pub fn new(len: usize, chunk_size: usize, reader: F) -> Self {
51        Self {
52            len,
53            reader,
54            cache: RefCell::new(BTreeMap::new()),
55            chunk_size: chunk_size.max(1),
56        }
57    }
58
59    /// Load a chunk into the cache if not already present.
60    fn ensure_chunk(&self, chunk_idx: usize) {
61        let mut cache = self.cache.borrow_mut();
62        if cache.contains_key(&chunk_idx) {
63            return;
64        }
65        let start = chunk_idx * self.chunk_size;
66        let count = self.chunk_size.min(self.len.saturating_sub(start));
67        if count == 0 {
68            return;
69        }
70        let mut buf = vec![0.0f64; count];
71        let written = (self.reader)(start, &mut buf);
72        buf.truncate(written);
73        cache.insert(chunk_idx, buf);
74    }
75}
76
77impl<F: Fn(usize, &mut [f64]) -> usize> AltrepLen for StreamingRealData<F> {
78    fn len(&self) -> usize {
79        self.len
80    }
81}
82
83impl<F: Fn(usize, &mut [f64]) -> usize> AltRealData for StreamingRealData<F> {
84    fn elt(&self, i: usize) -> f64 {
85        if i >= self.len {
86            return f64::NAN;
87        }
88        let chunk_idx = i / self.chunk_size;
89        self.ensure_chunk(chunk_idx);
90        let cache = self.cache.borrow();
91        let offset = i % self.chunk_size;
92        cache
93            .get(&chunk_idx)
94            .and_then(|chunk| chunk.get(offset).copied())
95            .unwrap_or(f64::NAN)
96    }
97
98    fn get_region(&self, start: usize, len: usize, buf: &mut [f64]) -> usize {
99        let count = len.min(self.len.saturating_sub(start)).min(buf.len());
100        if count == 0 {
101            return 0;
102        }
103        (self.reader)(start, &mut buf[..count])
104    }
105}
106
107impl<F: Fn(usize, &mut [f64]) -> usize + 'static> crate::externalptr::TypedExternal
108    for StreamingRealData<F>
109{
110    const TYPE_NAME: &'static str = "StreamingRealData";
111    const TYPE_NAME_CSTR: &'static [u8] = b"StreamingRealData\0";
112    const TYPE_ID_CSTR: &'static [u8] = b"miniextendr_api::altrep::StreamingRealData\0";
113}
114
115impl<F: Fn(usize, &mut [f64]) -> usize + 'static> InferBase for StreamingRealData<F> {
116    const BASE: crate::altrep::RBase = crate::altrep::RBase::Real;
117
118    unsafe fn make_class(
119        class_name: *const i8,
120        pkg_name: *const i8,
121    ) -> crate::ffi::altrep::R_altrep_class_t {
122        unsafe {
123            crate::ffi::altrep::R_make_altreal_class(class_name, pkg_name, core::ptr::null_mut())
124        }
125    }
126
127    unsafe fn install_methods(cls: crate::ffi::altrep::R_altrep_class_t) {
128        unsafe { crate::altrep_bridge::install_base::<Self>(cls) };
129        unsafe { crate::altrep_bridge::install_vec::<Self>(cls) };
130        unsafe { crate::altrep_bridge::install_real::<Self>(cls) };
131    }
132}
133
134impl<F: Fn(usize, &mut [f64]) -> usize + 'static> crate::altrep_traits::Altrep
135    for StreamingRealData<F>
136{
137    fn length(x: crate::ffi::SEXP) -> crate::ffi::R_xlen_t {
138        let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
139        data.len() as crate::ffi::R_xlen_t
140    }
141}
142
143impl<F: Fn(usize, &mut [f64]) -> usize + 'static> crate::altrep_traits::AltVec
144    for StreamingRealData<F>
145{
146}
147
148impl<F: Fn(usize, &mut [f64]) -> usize + 'static> crate::altrep_traits::AltReal
149    for StreamingRealData<F>
150{
151    const HAS_ELT: bool = true;
152
153    fn elt(x: crate::ffi::SEXP, i: crate::ffi::R_xlen_t) -> f64 {
154        let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
155        AltRealData::elt(data, i as usize)
156    }
157
158    const HAS_GET_REGION: bool = true;
159
160    fn get_region(
161        x: crate::ffi::SEXP,
162        start: crate::ffi::R_xlen_t,
163        len: crate::ffi::R_xlen_t,
164        buf: &mut [f64],
165    ) -> crate::ffi::R_xlen_t {
166        let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
167        AltRealData::get_region(data, start as usize, len as usize, buf) as crate::ffi::R_xlen_t
168    }
169}
170// endregion
171
172// region: StreamingIntData
173
174/// Streaming ALTREP for integer (i32) vectors.
175///
176/// Elements are loaded on-demand via a reader closure in fixed-size chunks.
177/// Chunks are cached in a `BTreeMap` for repeated access.
178///
179/// # Reader Contract
180///
181/// The reader `F(start, buf) -> count` fills `buf` with elements starting
182/// at index `start` and returns the number of elements actually written.
183///
184/// # Example
185///
186/// ```ignore
187/// use miniextendr_api::altrep_data::StreamingIntData;
188///
189/// let data = StreamingIntData::new(1000, 64, |start, buf| {
190///     let count = buf.len().min(1000 - start);
191///     for (i, slot) in buf[..count].iter_mut().enumerate() {
192///         *slot = (start + i) as i32;
193///     }
194///     count
195/// });
196/// ```
197pub struct StreamingIntData<F: Fn(usize, &mut [i32]) -> usize> {
198    len: usize,
199    reader: F,
200    cache: RefCell<BTreeMap<usize, Vec<i32>>>,
201    chunk_size: usize,
202}
203
204impl<F: Fn(usize, &mut [i32]) -> usize> StreamingIntData<F> {
205    /// Create a new streaming integer data source.
206    ///
207    /// - `len`: total number of elements
208    /// - `chunk_size`: number of elements per cache chunk
209    /// - `reader`: closure that fills a buffer starting at a given index
210    pub fn new(len: usize, chunk_size: usize, reader: F) -> Self {
211        Self {
212            len,
213            reader,
214            cache: RefCell::new(BTreeMap::new()),
215            chunk_size: chunk_size.max(1),
216        }
217    }
218
219    /// Load a chunk into the cache if not already present.
220    fn ensure_chunk(&self, chunk_idx: usize) {
221        let mut cache = self.cache.borrow_mut();
222        if cache.contains_key(&chunk_idx) {
223            return;
224        }
225        let start = chunk_idx * self.chunk_size;
226        let count = self.chunk_size.min(self.len.saturating_sub(start));
227        if count == 0 {
228            return;
229        }
230        let mut buf = vec![0i32; count];
231        let written = (self.reader)(start, &mut buf);
232        buf.truncate(written);
233        cache.insert(chunk_idx, buf);
234    }
235}
236
237impl<F: Fn(usize, &mut [i32]) -> usize> AltrepLen for StreamingIntData<F> {
238    fn len(&self) -> usize {
239        self.len
240    }
241}
242
243impl<F: Fn(usize, &mut [i32]) -> usize> AltIntegerData for StreamingIntData<F> {
244    fn elt(&self, i: usize) -> i32 {
245        if i >= self.len {
246            return crate::altrep_traits::NA_INTEGER;
247        }
248        let chunk_idx = i / self.chunk_size;
249        self.ensure_chunk(chunk_idx);
250        let cache = self.cache.borrow();
251        let offset = i % self.chunk_size;
252        cache
253            .get(&chunk_idx)
254            .and_then(|chunk| chunk.get(offset).copied())
255            .unwrap_or(crate::altrep_traits::NA_INTEGER)
256    }
257
258    fn get_region(&self, start: usize, len: usize, buf: &mut [i32]) -> usize {
259        let count = len.min(self.len.saturating_sub(start)).min(buf.len());
260        if count == 0 {
261            return 0;
262        }
263        (self.reader)(start, &mut buf[..count])
264    }
265}
266
267impl<F: Fn(usize, &mut [i32]) -> usize + 'static> crate::externalptr::TypedExternal
268    for StreamingIntData<F>
269{
270    const TYPE_NAME: &'static str = "StreamingIntData";
271    const TYPE_NAME_CSTR: &'static [u8] = b"StreamingIntData\0";
272    const TYPE_ID_CSTR: &'static [u8] = b"miniextendr_api::altrep::StreamingIntData\0";
273}
274
275impl<F: Fn(usize, &mut [i32]) -> usize + 'static> InferBase for StreamingIntData<F> {
276    const BASE: crate::altrep::RBase = crate::altrep::RBase::Int;
277
278    unsafe fn make_class(
279        class_name: *const i8,
280        pkg_name: *const i8,
281    ) -> crate::ffi::altrep::R_altrep_class_t {
282        unsafe {
283            crate::ffi::altrep::R_make_altinteger_class(class_name, pkg_name, core::ptr::null_mut())
284        }
285    }
286
287    unsafe fn install_methods(cls: crate::ffi::altrep::R_altrep_class_t) {
288        unsafe { crate::altrep_bridge::install_base::<Self>(cls) };
289        unsafe { crate::altrep_bridge::install_vec::<Self>(cls) };
290        unsafe { crate::altrep_bridge::install_int::<Self>(cls) };
291    }
292}
293
294impl<F: Fn(usize, &mut [i32]) -> usize + 'static> crate::altrep_traits::Altrep
295    for StreamingIntData<F>
296{
297    fn length(x: crate::ffi::SEXP) -> crate::ffi::R_xlen_t {
298        let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
299        data.len() as crate::ffi::R_xlen_t
300    }
301}
302
303impl<F: Fn(usize, &mut [i32]) -> usize + 'static> crate::altrep_traits::AltVec
304    for StreamingIntData<F>
305{
306}
307
308impl<F: Fn(usize, &mut [i32]) -> usize + 'static> crate::altrep_traits::AltInteger
309    for StreamingIntData<F>
310{
311    const HAS_ELT: bool = true;
312
313    fn elt(x: crate::ffi::SEXP, i: crate::ffi::R_xlen_t) -> i32 {
314        let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
315        AltIntegerData::elt(data, i as usize)
316    }
317
318    const HAS_GET_REGION: bool = true;
319
320    fn get_region(
321        x: crate::ffi::SEXP,
322        start: crate::ffi::R_xlen_t,
323        len: crate::ffi::R_xlen_t,
324        buf: &mut [i32],
325    ) -> crate::ffi::R_xlen_t {
326        let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
327        AltIntegerData::get_region(data, start as usize, len as usize, buf) as crate::ffi::R_xlen_t
328    }
329}
330// endregion