Skip to main content

r/interpreter/builtins/
parquet.rs

1//! Parquet I/O builtins — `read.parquet()` and `write.parquet()` for reading
2//! and writing Apache Parquet files as data frames.
3//!
4//! Feature-gated behind the `parquet` feature flag. Uses the `parquet` and
5//! `arrow` crates to convert between Arrow columnar format and R values.
6
7use std::fs::File;
8use std::sync::Arc;
9
10use arrow::array::{
11    Array, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
12    RecordBatch, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
13};
14use arrow::datatypes::{DataType as ArrowType, Field, Schema};
15use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
16use parquet::arrow::ArrowWriter;
17
18use super::dataframes::build_data_frame;
19use super::CallArgs;
20use crate::interpreter::value::*;
21use crate::interpreter::BuiltinContext;
22use minir_macros::interpreter_builtin;
23
24// region: helpers
25
26/// Convert an Arrow array column to an R vector.
27fn arrow_column_to_rvector(col: &dyn Array) -> Result<RValue, RError> {
28    match col.data_type() {
29        ArrowType::Boolean => {
30            let arr = col.as_any().downcast_ref::<BooleanArray>().ok_or_else(|| {
31                RError::new(
32                    RErrorKind::Type,
33                    "failed to downcast Arrow BooleanArray".to_string(),
34                )
35            })?;
36            let vals: Vec<Option<bool>> = (0..arr.len())
37                .map(|i| {
38                    if arr.is_null(i) {
39                        None
40                    } else {
41                        Some(arr.value(i))
42                    }
43                })
44                .collect();
45            Ok(RValue::vec(Vector::Logical(vals.into())))
46        }
47
48        ArrowType::Int8 => {
49            let arr = col.as_any().downcast_ref::<Int8Array>().ok_or_else(|| {
50                RError::new(
51                    RErrorKind::Type,
52                    "failed to downcast Arrow Int8Array".to_string(),
53                )
54            })?;
55            let vals: Vec<Option<i64>> = (0..arr.len())
56                .map(|i| {
57                    if arr.is_null(i) {
58                        None
59                    } else {
60                        Some(i64::from(arr.value(i)))
61                    }
62                })
63                .collect();
64            Ok(RValue::vec(Vector::Integer(vals.into())))
65        }
66
67        ArrowType::Int16 => {
68            let arr = col.as_any().downcast_ref::<Int16Array>().ok_or_else(|| {
69                RError::new(
70                    RErrorKind::Type,
71                    "failed to downcast Arrow Int16Array".to_string(),
72                )
73            })?;
74            let vals: Vec<Option<i64>> = (0..arr.len())
75                .map(|i| {
76                    if arr.is_null(i) {
77                        None
78                    } else {
79                        Some(i64::from(arr.value(i)))
80                    }
81                })
82                .collect();
83            Ok(RValue::vec(Vector::Integer(vals.into())))
84        }
85
86        ArrowType::Int32 => {
87            let arr = col.as_any().downcast_ref::<Int32Array>().ok_or_else(|| {
88                RError::new(
89                    RErrorKind::Type,
90                    "failed to downcast Arrow Int32Array".to_string(),
91                )
92            })?;
93            let vals: Vec<Option<i64>> = (0..arr.len())
94                .map(|i| {
95                    if arr.is_null(i) {
96                        None
97                    } else {
98                        Some(i64::from(arr.value(i)))
99                    }
100                })
101                .collect();
102            Ok(RValue::vec(Vector::Integer(vals.into())))
103        }
104
105        ArrowType::Int64 => {
106            // R has no native 64-bit integer; use Integer (our Integer is i64)
107            let arr = col.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
108                RError::new(
109                    RErrorKind::Type,
110                    "failed to downcast Arrow Int64Array".to_string(),
111                )
112            })?;
113            let vals: Vec<Option<i64>> = (0..arr.len())
114                .map(|i| {
115                    if arr.is_null(i) {
116                        None
117                    } else {
118                        Some(arr.value(i))
119                    }
120                })
121                .collect();
122            Ok(RValue::vec(Vector::Integer(vals.into())))
123        }
124
125        ArrowType::UInt8 => {
126            let arr = col.as_any().downcast_ref::<UInt8Array>().ok_or_else(|| {
127                RError::new(
128                    RErrorKind::Type,
129                    "failed to downcast Arrow UInt8Array".to_string(),
130                )
131            })?;
132            let vals: Vec<Option<i64>> = (0..arr.len())
133                .map(|i| {
134                    if arr.is_null(i) {
135                        None
136                    } else {
137                        Some(i64::from(arr.value(i)))
138                    }
139                })
140                .collect();
141            Ok(RValue::vec(Vector::Integer(vals.into())))
142        }
143
144        ArrowType::UInt16 => {
145            let arr = col.as_any().downcast_ref::<UInt16Array>().ok_or_else(|| {
146                RError::new(
147                    RErrorKind::Type,
148                    "failed to downcast Arrow UInt16Array".to_string(),
149                )
150            })?;
151            let vals: Vec<Option<i64>> = (0..arr.len())
152                .map(|i| {
153                    if arr.is_null(i) {
154                        None
155                    } else {
156                        Some(i64::from(arr.value(i)))
157                    }
158                })
159                .collect();
160            Ok(RValue::vec(Vector::Integer(vals.into())))
161        }
162
163        ArrowType::UInt32 => {
164            let arr = col.as_any().downcast_ref::<UInt32Array>().ok_or_else(|| {
165                RError::new(
166                    RErrorKind::Type,
167                    "failed to downcast Arrow UInt32Array".to_string(),
168                )
169            })?;
170            let vals: Vec<Option<i64>> = (0..arr.len())
171                .map(|i| {
172                    if arr.is_null(i) {
173                        None
174                    } else {
175                        Some(i64::from(arr.value(i)))
176                    }
177                })
178                .collect();
179            Ok(RValue::vec(Vector::Integer(vals.into())))
180        }
181
182        ArrowType::UInt64 => {
183            // UInt64 may overflow i64; store as Double
184            let arr = col.as_any().downcast_ref::<UInt64Array>().ok_or_else(|| {
185                RError::new(
186                    RErrorKind::Type,
187                    "failed to downcast Arrow UInt64Array".to_string(),
188                )
189            })?;
190            let vals: Vec<Option<f64>> = (0..arr.len())
191                .map(|i| {
192                    if arr.is_null(i) {
193                        None
194                    } else {
195                        Some(arr.value(i) as f64)
196                    }
197                })
198                .collect();
199            Ok(RValue::vec(Vector::Double(vals.into())))
200        }
201
202        ArrowType::Float32 => {
203            let arr = col.as_any().downcast_ref::<Float32Array>().ok_or_else(|| {
204                RError::new(
205                    RErrorKind::Type,
206                    "failed to downcast Arrow Float32Array".to_string(),
207                )
208            })?;
209            let vals: Vec<Option<f64>> = (0..arr.len())
210                .map(|i| {
211                    if arr.is_null(i) {
212                        None
213                    } else {
214                        Some(f64::from(arr.value(i)))
215                    }
216                })
217                .collect();
218            Ok(RValue::vec(Vector::Double(vals.into())))
219        }
220
221        ArrowType::Float64 => {
222            let arr = col.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
223                RError::new(
224                    RErrorKind::Type,
225                    "failed to downcast Arrow Float64Array".to_string(),
226                )
227            })?;
228            let vals: Vec<Option<f64>> = (0..arr.len())
229                .map(|i| {
230                    if arr.is_null(i) {
231                        None
232                    } else {
233                        Some(arr.value(i))
234                    }
235                })
236                .collect();
237            Ok(RValue::vec(Vector::Double(vals.into())))
238        }
239
240        ArrowType::Utf8 => {
241            let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
242                RError::new(
243                    RErrorKind::Type,
244                    "failed to downcast Arrow StringArray".to_string(),
245                )
246            })?;
247            let vals: Vec<Option<String>> = (0..arr.len())
248                .map(|i| {
249                    if arr.is_null(i) {
250                        None
251                    } else {
252                        Some(arr.value(i).to_string())
253                    }
254                })
255                .collect();
256            Ok(RValue::vec(Vector::Character(vals.into())))
257        }
258
259        ArrowType::LargeUtf8 => {
260            // LargeUtf8 uses i64 offsets; same string values
261            let arr = col
262                .as_any()
263                .downcast_ref::<arrow::array::LargeStringArray>()
264                .ok_or_else(|| {
265                    RError::new(
266                        RErrorKind::Type,
267                        "failed to downcast Arrow LargeStringArray".to_string(),
268                    )
269                })?;
270            let vals: Vec<Option<String>> = (0..arr.len())
271                .map(|i| {
272                    if arr.is_null(i) {
273                        None
274                    } else {
275                        Some(arr.value(i).to_string())
276                    }
277                })
278                .collect();
279            Ok(RValue::vec(Vector::Character(vals.into())))
280        }
281
282        other => Err(RError::new(
283            RErrorKind::Type,
284            format!(
285                "unsupported Arrow data type '{}' in Parquet file — \
286                 only boolean, integer, float, and string columns are supported",
287                other
288            ),
289        )),
290    }
291}
292
293/// Convert an R vector to an Arrow ArrayRef for writing.
294fn rvector_to_arrow_array(vec: &Vector, len: usize) -> Result<Arc<dyn Array>, RError> {
295    match vec {
296        Vector::Logical(v) => {
297            let arr = BooleanArray::from(
298                (0..len)
299                    .map(|i| v.get(i).copied().flatten())
300                    .collect::<Vec<Option<bool>>>(),
301            );
302            Ok(Arc::new(arr))
303        }
304        Vector::Integer(v) => {
305            let arr =
306                Int64Array::from((0..len).map(|i| v.get_opt(i)).collect::<Vec<Option<i64>>>());
307            Ok(Arc::new(arr))
308        }
309        Vector::Double(v) => {
310            let arr =
311                Float64Array::from((0..len).map(|i| v.get_opt(i)).collect::<Vec<Option<f64>>>());
312            Ok(Arc::new(arr))
313        }
314        Vector::Character(v) => {
315            let arr = StringArray::from(
316                (0..len)
317                    .map(|i| v.get(i).and_then(|s| s.as_deref()))
318                    .collect::<Vec<Option<&str>>>(),
319            );
320            Ok(Arc::new(arr))
321        }
322        other => Err(RError::new(
323            RErrorKind::Type,
324            format!(
325                "cannot write {} vector to Parquet — \
326                 only logical, integer, double, and character vectors are supported",
327                other.type_name()
328            ),
329        )),
330    }
331}
332
333/// Map an R vector type to an Arrow DataType for schema construction.
334fn rvector_to_arrow_type(vec: &Vector) -> Result<ArrowType, RError> {
335    match vec {
336        Vector::Logical(_) => Ok(ArrowType::Boolean),
337        Vector::Integer(_) => Ok(ArrowType::Int64),
338        Vector::Double(_) => Ok(ArrowType::Float64),
339        Vector::Character(_) => Ok(ArrowType::Utf8),
340        other => Err(RError::new(
341            RErrorKind::Type,
342            format!(
343                "cannot determine Parquet type for {} vector — \
344                 only logical, integer, double, and character vectors are supported",
345                other.type_name()
346            ),
347        )),
348    }
349}
350
351// endregion
352
353// region: read.parquet
354
355/// Read a Parquet file into a data.frame.
356///
357/// Converts Arrow column types to R vectors:
358/// - Int8/Int16/Int32/Int64 -> Integer
359/// - UInt8/UInt16/UInt32 -> Integer
360/// - UInt64/Float32/Float64 -> Double
361/// - Boolean -> Logical
362/// - Utf8/LargeUtf8 -> Character
363/// - null values -> NA
364///
365/// @param file character scalar: path to the Parquet file
366/// @param columns character vector: optional column names to read (default: all)
367/// @return data.frame with columns from the Parquet file
368#[interpreter_builtin(name = "read.parquet", min_args = 1, namespace = "arrow")]
369fn builtin_read_parquet(
370    args: &[RValue],
371    named: &[(String, RValue)],
372    context: &BuiltinContext,
373) -> Result<RValue, RError> {
374    let call_args = CallArgs::new(args, named);
375    let path = call_args.string("file", 0)?;
376    let resolved = context.interpreter().resolve_path(&path);
377
378    let file = File::open(&resolved).map_err(|e| {
379        RError::new(
380            RErrorKind::Other,
381            format!("cannot open Parquet file '{}': {}", resolved.display(), e),
382        )
383    })?;
384
385    let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
386        RError::new(
387            RErrorKind::Other,
388            format!("failed to read Parquet metadata from '{}': {}", path, e),
389        )
390    })?;
391
392    // Handle optional column selection
393    let selected_columns: Option<Vec<String>> =
394        call_args.value("columns", 1).and_then(|v| match v {
395            RValue::Null => None,
396            RValue::Vector(rv) => {
397                let chars = rv.inner.to_characters();
398                let names: Vec<String> = chars.into_iter().flatten().collect();
399                if names.is_empty() {
400                    None
401                } else {
402                    Some(names)
403                }
404            }
405            _ => None,
406        });
407
408    if let Some(ref cols) = selected_columns {
409        let parquet_schema = builder.parquet_schema();
410        let arrow_schema = builder.schema();
411
412        // Map column names to root column indices
413        let mut indices = Vec::new();
414        let mut errors = Vec::new();
415        for col_name in cols {
416            match arrow_schema
417                .fields()
418                .iter()
419                .position(|f| f.name() == col_name)
420            {
421                Some(idx) => indices.push(idx),
422                None => errors.push(format!("column '{}' not found in Parquet file", col_name)),
423            }
424        }
425        if !errors.is_empty() {
426            return Err(RError::new(RErrorKind::Other, errors.join("; ")));
427        }
428
429        let mask = parquet::arrow::ProjectionMask::roots(parquet_schema, indices.iter().copied());
430        builder = builder.with_projection(mask);
431    }
432
433    let reader = builder.build().map_err(|e| {
434        RError::new(
435            RErrorKind::Other,
436            format!("failed to build Parquet reader for '{}': {}", path, e),
437        )
438    })?;
439
440    // Collect all batches
441    let mut all_batches: Vec<RecordBatch> = Vec::new();
442    for batch_result in reader {
443        let batch = batch_result.map_err(|e| {
444            RError::new(
445                RErrorKind::Other,
446                format!("error reading Parquet batch from '{}': {}", path, e),
447            )
448        })?;
449        all_batches.push(batch);
450    }
451
452    if all_batches.is_empty() {
453        // Return empty data frame
454        return build_data_frame(vec![], 0);
455    }
456
457    // Get column names from the schema of the first batch
458    let schema = all_batches[0].schema();
459    let col_names: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
460    let ncols = col_names.len();
461
462    // For each column, concatenate values across all batches
463    let mut columns: Vec<(Option<String>, RValue)> = Vec::with_capacity(ncols);
464    let mut total_rows: usize = 0;
465
466    for (col_idx, col_name) in col_names.into_iter().enumerate() {
467        // Collect all arrays for this column across batches
468        let mut col_values: Vec<RValue> = Vec::new();
469        for batch in &all_batches {
470            let arrow_col = batch.column(col_idx);
471            col_values.push(arrow_column_to_rvector(arrow_col.as_ref())?);
472        }
473
474        // Concatenate the column values
475        let combined = if col_values.len() == 1 {
476            col_values.into_iter().next().ok_or_else(|| {
477                RError::new(
478                    RErrorKind::Other,
479                    "empty column in parquet read".to_string(),
480                )
481            })?
482        } else {
483            concatenate_rvectors(col_values)?
484        };
485
486        if col_idx == 0 {
487            total_rows = combined.length();
488        }
489        columns.push((Some(col_name), combined));
490    }
491
492    build_data_frame(columns, total_rows)
493}
494
495/// Concatenate multiple R vectors of the same type into one.
496fn concatenate_rvectors(values: Vec<RValue>) -> Result<RValue, RError> {
497    if values.is_empty() {
498        return Ok(RValue::Null);
499    }
500
501    // Determine type from the first value
502    match &values[0] {
503        RValue::Vector(rv) => match &rv.inner {
504            Vector::Logical(_) => {
505                let mut result: Vec<Option<bool>> = Vec::new();
506                for v in &values {
507                    if let RValue::Vector(rv) = v {
508                        if let Vector::Logical(l) = &rv.inner {
509                            result.extend_from_slice(l);
510                        }
511                    }
512                }
513                Ok(RValue::vec(Vector::Logical(result.into())))
514            }
515            Vector::Integer(_) => {
516                let mut result: Vec<Option<i64>> = Vec::new();
517                for v in &values {
518                    if let RValue::Vector(rv) = v {
519                        if let Vector::Integer(l) = &rv.inner {
520                            result.extend(l.iter_opt());
521                        }
522                    }
523                }
524                Ok(RValue::vec(Vector::Integer(result.into())))
525            }
526            Vector::Double(_) => {
527                let mut result: Vec<Option<f64>> = Vec::new();
528                for v in &values {
529                    if let RValue::Vector(rv) = v {
530                        if let Vector::Double(l) = &rv.inner {
531                            result.extend(l.iter_opt());
532                        }
533                    }
534                }
535                Ok(RValue::vec(Vector::Double(result.into())))
536            }
537            Vector::Character(_) => {
538                let mut result: Vec<Option<String>> = Vec::new();
539                for v in &values {
540                    if let RValue::Vector(rv) = v {
541                        if let Vector::Character(l) = &rv.inner {
542                            result.extend_from_slice(l);
543                        }
544                    }
545                }
546                Ok(RValue::vec(Vector::Character(result.into())))
547            }
548            _ => Err(RError::new(
549                RErrorKind::Type,
550                "unexpected vector type during Parquet column concatenation".to_string(),
551            )),
552        },
553        _ => Err(RError::new(
554            RErrorKind::Type,
555            "unexpected value type during Parquet column concatenation".to_string(),
556        )),
557    }
558}
559
560// endregion
561
562// region: write.parquet
563
564/// Write a data.frame to a Parquet file.
565///
566/// Converts R vectors to Arrow column types:
567/// - Logical -> Boolean
568/// - Integer -> Int64
569/// - Double -> Float64
570/// - Character -> Utf8
571///
572/// @param x data.frame to write
573/// @param file character scalar: output file path
574/// @return NULL (invisibly)
575#[interpreter_builtin(name = "write.parquet", min_args = 2, namespace = "arrow")]
576fn builtin_write_parquet(
577    args: &[RValue],
578    named: &[(String, RValue)],
579    context: &BuiltinContext,
580) -> Result<RValue, RError> {
581    let call_args = CallArgs::new(args, named);
582
583    let df = call_args.value("x", 0).ok_or_else(|| {
584        RError::new(
585            RErrorKind::Argument,
586            "argument 'x' is missing, with no default".to_string(),
587        )
588    })?;
589
590    let list: &RList = match df {
591        RValue::List(l) => l,
592        _ => {
593            return Err(RError::new(
594                RErrorKind::Argument,
595                format!(
596                    "write.parquet() requires a data.frame, got {}",
597                    df.type_name()
598                ),
599            ))
600        }
601    };
602
603    let path = call_args.string("file", 1)?;
604    let resolved = context.interpreter().resolve_path(&path);
605
606    // Extract column names
607    let col_names: Vec<String> = match list.get_attr("names") {
608        Some(RValue::Vector(rv)) => rv
609            .inner
610            .to_characters()
611            .into_iter()
612            .enumerate()
613            .map(|(i, s)| s.unwrap_or_else(|| format!("V{}", i + 1)))
614            .collect(),
615        _ => list
616            .values
617            .iter()
618            .enumerate()
619            .map(|(i, (name, _))| name.clone().unwrap_or_else(|| format!("V{}", i + 1)))
620            .collect(),
621    };
622
623    // Build Arrow schema and arrays
624    let mut fields: Vec<Field> = Vec::new();
625    let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
626    let mut errors: Vec<String> = Vec::new();
627
628    for (i, (_, val)) in list.values.iter().enumerate() {
629        let col_name = col_names
630            .get(i)
631            .cloned()
632            .unwrap_or_else(|| format!("V{}", i + 1));
633
634        match val {
635            RValue::Vector(rv) => {
636                let len = rv.inner.len();
637                match rvector_to_arrow_type(&rv.inner) {
638                    Ok(arrow_type) => {
639                        fields.push(Field::new(&col_name, arrow_type, true));
640                        match rvector_to_arrow_array(&rv.inner, len) {
641                            Ok(arr) => arrays.push(arr),
642                            Err(e) => errors.push(format!("column '{}': {}", col_name, e)),
643                        }
644                    }
645                    Err(e) => errors.push(format!("column '{}': {}", col_name, e)),
646                }
647            }
648            _ => {
649                errors.push(format!(
650                    "column '{}': expected vector, got {}",
651                    col_name,
652                    val.type_name()
653                ));
654            }
655        }
656    }
657
658    if !errors.is_empty() {
659        return Err(RError::new(RErrorKind::Other, errors.join("; ")));
660    }
661
662    let schema = Arc::new(Schema::new(fields));
663    let batch = RecordBatch::try_new(schema.clone(), arrays).map_err(|e| {
664        RError::new(
665            RErrorKind::Other,
666            format!("failed to create Arrow RecordBatch: {}", e),
667        )
668    })?;
669
670    let file = File::create(&resolved).map_err(|e| {
671        RError::new(
672            RErrorKind::Other,
673            format!("cannot create Parquet file '{}': {}", resolved.display(), e),
674        )
675    })?;
676
677    let mut writer = ArrowWriter::try_new(file, schema, None).map_err(|e| {
678        RError::new(
679            RErrorKind::Other,
680            format!("failed to create Parquet writer: {}", e),
681        )
682    })?;
683
684    writer.write(&batch).map_err(|e| {
685        RError::new(
686            RErrorKind::Other,
687            format!("failed to write Parquet data: {}", e),
688        )
689    })?;
690
691    writer.close().map_err(|e| {
692        RError::new(
693            RErrorKind::Other,
694            format!("failed to finalize Parquet file: {}", e),
695        )
696    })?;
697
698    Ok(RValue::Null)
699}
700
701// endregion