miniextendr_api/altrep_data/
stream.rs1use std::cell::RefCell;
8use std::collections::BTreeMap;
9
10use super::{AltIntegerData, AltRealData, AltrepLen, InferBase};
11
12pub struct StreamingRealData<F: Fn(usize, &mut [f64]) -> usize> {
38 len: usize,
39 reader: F,
40 cache: RefCell<BTreeMap<usize, Vec<f64>>>,
41 chunk_size: usize,
42}
43
44impl<F: Fn(usize, &mut [f64]) -> usize> StreamingRealData<F> {
45 pub fn new(len: usize, chunk_size: usize, reader: F) -> Self {
51 Self {
52 len,
53 reader,
54 cache: RefCell::new(BTreeMap::new()),
55 chunk_size: chunk_size.max(1),
56 }
57 }
58
59 fn ensure_chunk(&self, chunk_idx: usize) {
61 let mut cache = self.cache.borrow_mut();
62 if cache.contains_key(&chunk_idx) {
63 return;
64 }
65 let start = chunk_idx * self.chunk_size;
66 let count = self.chunk_size.min(self.len.saturating_sub(start));
67 if count == 0 {
68 return;
69 }
70 let mut buf = vec![0.0f64; count];
71 let written = (self.reader)(start, &mut buf);
72 buf.truncate(written);
73 cache.insert(chunk_idx, buf);
74 }
75}
76
77impl<F: Fn(usize, &mut [f64]) -> usize> AltrepLen for StreamingRealData<F> {
78 fn len(&self) -> usize {
79 self.len
80 }
81}
82
83impl<F: Fn(usize, &mut [f64]) -> usize> AltRealData for StreamingRealData<F> {
84 fn elt(&self, i: usize) -> f64 {
85 if i >= self.len {
86 return f64::NAN;
87 }
88 let chunk_idx = i / self.chunk_size;
89 self.ensure_chunk(chunk_idx);
90 let cache = self.cache.borrow();
91 let offset = i % self.chunk_size;
92 cache
93 .get(&chunk_idx)
94 .and_then(|chunk| chunk.get(offset).copied())
95 .unwrap_or(f64::NAN)
96 }
97
98 fn get_region(&self, start: usize, len: usize, buf: &mut [f64]) -> usize {
99 let count = len.min(self.len.saturating_sub(start)).min(buf.len());
100 if count == 0 {
101 return 0;
102 }
103 (self.reader)(start, &mut buf[..count])
104 }
105}
106
107impl<F: Fn(usize, &mut [f64]) -> usize + 'static> crate::externalptr::TypedExternal
108 for StreamingRealData<F>
109{
110 const TYPE_NAME: &'static str = "StreamingRealData";
111 const TYPE_NAME_CSTR: &'static [u8] = b"StreamingRealData\0";
112 const TYPE_ID_CSTR: &'static [u8] = b"miniextendr_api::altrep::StreamingRealData\0";
113}
114
115impl<F: Fn(usize, &mut [f64]) -> usize + 'static> InferBase for StreamingRealData<F> {
116 const BASE: crate::altrep::RBase = crate::altrep::RBase::Real;
117
118 unsafe fn make_class(
119 class_name: *const i8,
120 pkg_name: *const i8,
121 ) -> crate::ffi::altrep::R_altrep_class_t {
122 unsafe {
123 crate::ffi::altrep::R_make_altreal_class(class_name, pkg_name, core::ptr::null_mut())
124 }
125 }
126
127 unsafe fn install_methods(cls: crate::ffi::altrep::R_altrep_class_t) {
128 unsafe { crate::altrep_bridge::install_base::<Self>(cls) };
129 unsafe { crate::altrep_bridge::install_vec::<Self>(cls) };
130 unsafe { crate::altrep_bridge::install_real::<Self>(cls) };
131 }
132}
133
134impl<F: Fn(usize, &mut [f64]) -> usize + 'static> crate::altrep_traits::Altrep
135 for StreamingRealData<F>
136{
137 fn length(x: crate::ffi::SEXP) -> crate::ffi::R_xlen_t {
138 let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
139 data.len() as crate::ffi::R_xlen_t
140 }
141}
142
143impl<F: Fn(usize, &mut [f64]) -> usize + 'static> crate::altrep_traits::AltVec
144 for StreamingRealData<F>
145{
146}
147
148impl<F: Fn(usize, &mut [f64]) -> usize + 'static> crate::altrep_traits::AltReal
149 for StreamingRealData<F>
150{
151 const HAS_ELT: bool = true;
152
153 fn elt(x: crate::ffi::SEXP, i: crate::ffi::R_xlen_t) -> f64 {
154 let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
155 AltRealData::elt(data, i as usize)
156 }
157
158 const HAS_GET_REGION: bool = true;
159
160 fn get_region(
161 x: crate::ffi::SEXP,
162 start: crate::ffi::R_xlen_t,
163 len: crate::ffi::R_xlen_t,
164 buf: &mut [f64],
165 ) -> crate::ffi::R_xlen_t {
166 let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
167 AltRealData::get_region(data, start as usize, len as usize, buf) as crate::ffi::R_xlen_t
168 }
169}
170pub struct StreamingIntData<F: Fn(usize, &mut [i32]) -> usize> {
198 len: usize,
199 reader: F,
200 cache: RefCell<BTreeMap<usize, Vec<i32>>>,
201 chunk_size: usize,
202}
203
204impl<F: Fn(usize, &mut [i32]) -> usize> StreamingIntData<F> {
205 pub fn new(len: usize, chunk_size: usize, reader: F) -> Self {
211 Self {
212 len,
213 reader,
214 cache: RefCell::new(BTreeMap::new()),
215 chunk_size: chunk_size.max(1),
216 }
217 }
218
219 fn ensure_chunk(&self, chunk_idx: usize) {
221 let mut cache = self.cache.borrow_mut();
222 if cache.contains_key(&chunk_idx) {
223 return;
224 }
225 let start = chunk_idx * self.chunk_size;
226 let count = self.chunk_size.min(self.len.saturating_sub(start));
227 if count == 0 {
228 return;
229 }
230 let mut buf = vec![0i32; count];
231 let written = (self.reader)(start, &mut buf);
232 buf.truncate(written);
233 cache.insert(chunk_idx, buf);
234 }
235}
236
237impl<F: Fn(usize, &mut [i32]) -> usize> AltrepLen for StreamingIntData<F> {
238 fn len(&self) -> usize {
239 self.len
240 }
241}
242
243impl<F: Fn(usize, &mut [i32]) -> usize> AltIntegerData for StreamingIntData<F> {
244 fn elt(&self, i: usize) -> i32 {
245 if i >= self.len {
246 return crate::altrep_traits::NA_INTEGER;
247 }
248 let chunk_idx = i / self.chunk_size;
249 self.ensure_chunk(chunk_idx);
250 let cache = self.cache.borrow();
251 let offset = i % self.chunk_size;
252 cache
253 .get(&chunk_idx)
254 .and_then(|chunk| chunk.get(offset).copied())
255 .unwrap_or(crate::altrep_traits::NA_INTEGER)
256 }
257
258 fn get_region(&self, start: usize, len: usize, buf: &mut [i32]) -> usize {
259 let count = len.min(self.len.saturating_sub(start)).min(buf.len());
260 if count == 0 {
261 return 0;
262 }
263 (self.reader)(start, &mut buf[..count])
264 }
265}
266
267impl<F: Fn(usize, &mut [i32]) -> usize + 'static> crate::externalptr::TypedExternal
268 for StreamingIntData<F>
269{
270 const TYPE_NAME: &'static str = "StreamingIntData";
271 const TYPE_NAME_CSTR: &'static [u8] = b"StreamingIntData\0";
272 const TYPE_ID_CSTR: &'static [u8] = b"miniextendr_api::altrep::StreamingIntData\0";
273}
274
275impl<F: Fn(usize, &mut [i32]) -> usize + 'static> InferBase for StreamingIntData<F> {
276 const BASE: crate::altrep::RBase = crate::altrep::RBase::Int;
277
278 unsafe fn make_class(
279 class_name: *const i8,
280 pkg_name: *const i8,
281 ) -> crate::ffi::altrep::R_altrep_class_t {
282 unsafe {
283 crate::ffi::altrep::R_make_altinteger_class(class_name, pkg_name, core::ptr::null_mut())
284 }
285 }
286
287 unsafe fn install_methods(cls: crate::ffi::altrep::R_altrep_class_t) {
288 unsafe { crate::altrep_bridge::install_base::<Self>(cls) };
289 unsafe { crate::altrep_bridge::install_vec::<Self>(cls) };
290 unsafe { crate::altrep_bridge::install_int::<Self>(cls) };
291 }
292}
293
294impl<F: Fn(usize, &mut [i32]) -> usize + 'static> crate::altrep_traits::Altrep
295 for StreamingIntData<F>
296{
297 fn length(x: crate::ffi::SEXP) -> crate::ffi::R_xlen_t {
298 let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
299 data.len() as crate::ffi::R_xlen_t
300 }
301}
302
303impl<F: Fn(usize, &mut [i32]) -> usize + 'static> crate::altrep_traits::AltVec
304 for StreamingIntData<F>
305{
306}
307
308impl<F: Fn(usize, &mut [i32]) -> usize + 'static> crate::altrep_traits::AltInteger
309 for StreamingIntData<F>
310{
311 const HAS_ELT: bool = true;
312
313 fn elt(x: crate::ffi::SEXP, i: crate::ffi::R_xlen_t) -> i32 {
314 let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
315 AltIntegerData::elt(data, i as usize)
316 }
317
318 const HAS_GET_REGION: bool = true;
319
320 fn get_region(
321 x: crate::ffi::SEXP,
322 start: crate::ffi::R_xlen_t,
323 len: crate::ffi::R_xlen_t,
324 buf: &mut [i32],
325 ) -> crate::ffi::R_xlen_t {
326 let data = unsafe { <Self as crate::altrep_data::AltrepExtract>::altrep_extract_ref(x) };
327 AltIntegerData::get_region(data, start as usize, len as usize, buf) as crate::ffi::R_xlen_t
328 }
329}
330