1use std::fs::File;
8use std::sync::Arc;
9
10use arrow::array::{
11 Array, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
12 RecordBatch, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
13};
14use arrow::datatypes::{DataType as ArrowType, Field, Schema};
15use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
16use parquet::arrow::ArrowWriter;
17
18use super::dataframes::build_data_frame;
19use super::CallArgs;
20use crate::interpreter::value::*;
21use crate::interpreter::BuiltinContext;
22use minir_macros::interpreter_builtin;
23
24fn arrow_column_to_rvector(col: &dyn Array) -> Result<RValue, RError> {
28 match col.data_type() {
29 ArrowType::Boolean => {
30 let arr = col.as_any().downcast_ref::<BooleanArray>().ok_or_else(|| {
31 RError::new(
32 RErrorKind::Type,
33 "failed to downcast Arrow BooleanArray".to_string(),
34 )
35 })?;
36 let vals: Vec<Option<bool>> = (0..arr.len())
37 .map(|i| {
38 if arr.is_null(i) {
39 None
40 } else {
41 Some(arr.value(i))
42 }
43 })
44 .collect();
45 Ok(RValue::vec(Vector::Logical(vals.into())))
46 }
47
48 ArrowType::Int8 => {
49 let arr = col.as_any().downcast_ref::<Int8Array>().ok_or_else(|| {
50 RError::new(
51 RErrorKind::Type,
52 "failed to downcast Arrow Int8Array".to_string(),
53 )
54 })?;
55 let vals: Vec<Option<i64>> = (0..arr.len())
56 .map(|i| {
57 if arr.is_null(i) {
58 None
59 } else {
60 Some(i64::from(arr.value(i)))
61 }
62 })
63 .collect();
64 Ok(RValue::vec(Vector::Integer(vals.into())))
65 }
66
67 ArrowType::Int16 => {
68 let arr = col.as_any().downcast_ref::<Int16Array>().ok_or_else(|| {
69 RError::new(
70 RErrorKind::Type,
71 "failed to downcast Arrow Int16Array".to_string(),
72 )
73 })?;
74 let vals: Vec<Option<i64>> = (0..arr.len())
75 .map(|i| {
76 if arr.is_null(i) {
77 None
78 } else {
79 Some(i64::from(arr.value(i)))
80 }
81 })
82 .collect();
83 Ok(RValue::vec(Vector::Integer(vals.into())))
84 }
85
86 ArrowType::Int32 => {
87 let arr = col.as_any().downcast_ref::<Int32Array>().ok_or_else(|| {
88 RError::new(
89 RErrorKind::Type,
90 "failed to downcast Arrow Int32Array".to_string(),
91 )
92 })?;
93 let vals: Vec<Option<i64>> = (0..arr.len())
94 .map(|i| {
95 if arr.is_null(i) {
96 None
97 } else {
98 Some(i64::from(arr.value(i)))
99 }
100 })
101 .collect();
102 Ok(RValue::vec(Vector::Integer(vals.into())))
103 }
104
105 ArrowType::Int64 => {
106 let arr = col.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
108 RError::new(
109 RErrorKind::Type,
110 "failed to downcast Arrow Int64Array".to_string(),
111 )
112 })?;
113 let vals: Vec<Option<i64>> = (0..arr.len())
114 .map(|i| {
115 if arr.is_null(i) {
116 None
117 } else {
118 Some(arr.value(i))
119 }
120 })
121 .collect();
122 Ok(RValue::vec(Vector::Integer(vals.into())))
123 }
124
125 ArrowType::UInt8 => {
126 let arr = col.as_any().downcast_ref::<UInt8Array>().ok_or_else(|| {
127 RError::new(
128 RErrorKind::Type,
129 "failed to downcast Arrow UInt8Array".to_string(),
130 )
131 })?;
132 let vals: Vec<Option<i64>> = (0..arr.len())
133 .map(|i| {
134 if arr.is_null(i) {
135 None
136 } else {
137 Some(i64::from(arr.value(i)))
138 }
139 })
140 .collect();
141 Ok(RValue::vec(Vector::Integer(vals.into())))
142 }
143
144 ArrowType::UInt16 => {
145 let arr = col.as_any().downcast_ref::<UInt16Array>().ok_or_else(|| {
146 RError::new(
147 RErrorKind::Type,
148 "failed to downcast Arrow UInt16Array".to_string(),
149 )
150 })?;
151 let vals: Vec<Option<i64>> = (0..arr.len())
152 .map(|i| {
153 if arr.is_null(i) {
154 None
155 } else {
156 Some(i64::from(arr.value(i)))
157 }
158 })
159 .collect();
160 Ok(RValue::vec(Vector::Integer(vals.into())))
161 }
162
163 ArrowType::UInt32 => {
164 let arr = col.as_any().downcast_ref::<UInt32Array>().ok_or_else(|| {
165 RError::new(
166 RErrorKind::Type,
167 "failed to downcast Arrow UInt32Array".to_string(),
168 )
169 })?;
170 let vals: Vec<Option<i64>> = (0..arr.len())
171 .map(|i| {
172 if arr.is_null(i) {
173 None
174 } else {
175 Some(i64::from(arr.value(i)))
176 }
177 })
178 .collect();
179 Ok(RValue::vec(Vector::Integer(vals.into())))
180 }
181
182 ArrowType::UInt64 => {
183 let arr = col.as_any().downcast_ref::<UInt64Array>().ok_or_else(|| {
185 RError::new(
186 RErrorKind::Type,
187 "failed to downcast Arrow UInt64Array".to_string(),
188 )
189 })?;
190 let vals: Vec<Option<f64>> = (0..arr.len())
191 .map(|i| {
192 if arr.is_null(i) {
193 None
194 } else {
195 Some(arr.value(i) as f64)
196 }
197 })
198 .collect();
199 Ok(RValue::vec(Vector::Double(vals.into())))
200 }
201
202 ArrowType::Float32 => {
203 let arr = col.as_any().downcast_ref::<Float32Array>().ok_or_else(|| {
204 RError::new(
205 RErrorKind::Type,
206 "failed to downcast Arrow Float32Array".to_string(),
207 )
208 })?;
209 let vals: Vec<Option<f64>> = (0..arr.len())
210 .map(|i| {
211 if arr.is_null(i) {
212 None
213 } else {
214 Some(f64::from(arr.value(i)))
215 }
216 })
217 .collect();
218 Ok(RValue::vec(Vector::Double(vals.into())))
219 }
220
221 ArrowType::Float64 => {
222 let arr = col.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
223 RError::new(
224 RErrorKind::Type,
225 "failed to downcast Arrow Float64Array".to_string(),
226 )
227 })?;
228 let vals: Vec<Option<f64>> = (0..arr.len())
229 .map(|i| {
230 if arr.is_null(i) {
231 None
232 } else {
233 Some(arr.value(i))
234 }
235 })
236 .collect();
237 Ok(RValue::vec(Vector::Double(vals.into())))
238 }
239
240 ArrowType::Utf8 => {
241 let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
242 RError::new(
243 RErrorKind::Type,
244 "failed to downcast Arrow StringArray".to_string(),
245 )
246 })?;
247 let vals: Vec<Option<String>> = (0..arr.len())
248 .map(|i| {
249 if arr.is_null(i) {
250 None
251 } else {
252 Some(arr.value(i).to_string())
253 }
254 })
255 .collect();
256 Ok(RValue::vec(Vector::Character(vals.into())))
257 }
258
259 ArrowType::LargeUtf8 => {
260 let arr = col
262 .as_any()
263 .downcast_ref::<arrow::array::LargeStringArray>()
264 .ok_or_else(|| {
265 RError::new(
266 RErrorKind::Type,
267 "failed to downcast Arrow LargeStringArray".to_string(),
268 )
269 })?;
270 let vals: Vec<Option<String>> = (0..arr.len())
271 .map(|i| {
272 if arr.is_null(i) {
273 None
274 } else {
275 Some(arr.value(i).to_string())
276 }
277 })
278 .collect();
279 Ok(RValue::vec(Vector::Character(vals.into())))
280 }
281
282 other => Err(RError::new(
283 RErrorKind::Type,
284 format!(
285 "unsupported Arrow data type '{}' in Parquet file — \
286 only boolean, integer, float, and string columns are supported",
287 other
288 ),
289 )),
290 }
291}
292
293fn rvector_to_arrow_array(vec: &Vector, len: usize) -> Result<Arc<dyn Array>, RError> {
295 match vec {
296 Vector::Logical(v) => {
297 let arr = BooleanArray::from(
298 (0..len)
299 .map(|i| v.get(i).copied().flatten())
300 .collect::<Vec<Option<bool>>>(),
301 );
302 Ok(Arc::new(arr))
303 }
304 Vector::Integer(v) => {
305 let arr =
306 Int64Array::from((0..len).map(|i| v.get_opt(i)).collect::<Vec<Option<i64>>>());
307 Ok(Arc::new(arr))
308 }
309 Vector::Double(v) => {
310 let arr =
311 Float64Array::from((0..len).map(|i| v.get_opt(i)).collect::<Vec<Option<f64>>>());
312 Ok(Arc::new(arr))
313 }
314 Vector::Character(v) => {
315 let arr = StringArray::from(
316 (0..len)
317 .map(|i| v.get(i).and_then(|s| s.as_deref()))
318 .collect::<Vec<Option<&str>>>(),
319 );
320 Ok(Arc::new(arr))
321 }
322 other => Err(RError::new(
323 RErrorKind::Type,
324 format!(
325 "cannot write {} vector to Parquet — \
326 only logical, integer, double, and character vectors are supported",
327 other.type_name()
328 ),
329 )),
330 }
331}
332
333fn rvector_to_arrow_type(vec: &Vector) -> Result<ArrowType, RError> {
335 match vec {
336 Vector::Logical(_) => Ok(ArrowType::Boolean),
337 Vector::Integer(_) => Ok(ArrowType::Int64),
338 Vector::Double(_) => Ok(ArrowType::Float64),
339 Vector::Character(_) => Ok(ArrowType::Utf8),
340 other => Err(RError::new(
341 RErrorKind::Type,
342 format!(
343 "cannot determine Parquet type for {} vector — \
344 only logical, integer, double, and character vectors are supported",
345 other.type_name()
346 ),
347 )),
348 }
349}
350
351#[interpreter_builtin(name = "read.parquet", min_args = 1, namespace = "arrow")]
369fn builtin_read_parquet(
370 args: &[RValue],
371 named: &[(String, RValue)],
372 context: &BuiltinContext,
373) -> Result<RValue, RError> {
374 let call_args = CallArgs::new(args, named);
375 let path = call_args.string("file", 0)?;
376 let resolved = context.interpreter().resolve_path(&path);
377
378 let file = File::open(&resolved).map_err(|e| {
379 RError::new(
380 RErrorKind::Other,
381 format!("cannot open Parquet file '{}': {}", resolved.display(), e),
382 )
383 })?;
384
385 let mut builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
386 RError::new(
387 RErrorKind::Other,
388 format!("failed to read Parquet metadata from '{}': {}", path, e),
389 )
390 })?;
391
392 let selected_columns: Option<Vec<String>> =
394 call_args.value("columns", 1).and_then(|v| match v {
395 RValue::Null => None,
396 RValue::Vector(rv) => {
397 let chars = rv.inner.to_characters();
398 let names: Vec<String> = chars.into_iter().flatten().collect();
399 if names.is_empty() {
400 None
401 } else {
402 Some(names)
403 }
404 }
405 _ => None,
406 });
407
408 if let Some(ref cols) = selected_columns {
409 let parquet_schema = builder.parquet_schema();
410 let arrow_schema = builder.schema();
411
412 let mut indices = Vec::new();
414 let mut errors = Vec::new();
415 for col_name in cols {
416 match arrow_schema
417 .fields()
418 .iter()
419 .position(|f| f.name() == col_name)
420 {
421 Some(idx) => indices.push(idx),
422 None => errors.push(format!("column '{}' not found in Parquet file", col_name)),
423 }
424 }
425 if !errors.is_empty() {
426 return Err(RError::new(RErrorKind::Other, errors.join("; ")));
427 }
428
429 let mask = parquet::arrow::ProjectionMask::roots(parquet_schema, indices.iter().copied());
430 builder = builder.with_projection(mask);
431 }
432
433 let reader = builder.build().map_err(|e| {
434 RError::new(
435 RErrorKind::Other,
436 format!("failed to build Parquet reader for '{}': {}", path, e),
437 )
438 })?;
439
440 let mut all_batches: Vec<RecordBatch> = Vec::new();
442 for batch_result in reader {
443 let batch = batch_result.map_err(|e| {
444 RError::new(
445 RErrorKind::Other,
446 format!("error reading Parquet batch from '{}': {}", path, e),
447 )
448 })?;
449 all_batches.push(batch);
450 }
451
452 if all_batches.is_empty() {
453 return build_data_frame(vec![], 0);
455 }
456
457 let schema = all_batches[0].schema();
459 let col_names: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
460 let ncols = col_names.len();
461
462 let mut columns: Vec<(Option<String>, RValue)> = Vec::with_capacity(ncols);
464 let mut total_rows: usize = 0;
465
466 for (col_idx, col_name) in col_names.into_iter().enumerate() {
467 let mut col_values: Vec<RValue> = Vec::new();
469 for batch in &all_batches {
470 let arrow_col = batch.column(col_idx);
471 col_values.push(arrow_column_to_rvector(arrow_col.as_ref())?);
472 }
473
474 let combined = if col_values.len() == 1 {
476 col_values.into_iter().next().ok_or_else(|| {
477 RError::new(
478 RErrorKind::Other,
479 "empty column in parquet read".to_string(),
480 )
481 })?
482 } else {
483 concatenate_rvectors(col_values)?
484 };
485
486 if col_idx == 0 {
487 total_rows = combined.length();
488 }
489 columns.push((Some(col_name), combined));
490 }
491
492 build_data_frame(columns, total_rows)
493}
494
495fn concatenate_rvectors(values: Vec<RValue>) -> Result<RValue, RError> {
497 if values.is_empty() {
498 return Ok(RValue::Null);
499 }
500
501 match &values[0] {
503 RValue::Vector(rv) => match &rv.inner {
504 Vector::Logical(_) => {
505 let mut result: Vec<Option<bool>> = Vec::new();
506 for v in &values {
507 if let RValue::Vector(rv) = v {
508 if let Vector::Logical(l) = &rv.inner {
509 result.extend_from_slice(l);
510 }
511 }
512 }
513 Ok(RValue::vec(Vector::Logical(result.into())))
514 }
515 Vector::Integer(_) => {
516 let mut result: Vec<Option<i64>> = Vec::new();
517 for v in &values {
518 if let RValue::Vector(rv) = v {
519 if let Vector::Integer(l) = &rv.inner {
520 result.extend(l.iter_opt());
521 }
522 }
523 }
524 Ok(RValue::vec(Vector::Integer(result.into())))
525 }
526 Vector::Double(_) => {
527 let mut result: Vec<Option<f64>> = Vec::new();
528 for v in &values {
529 if let RValue::Vector(rv) = v {
530 if let Vector::Double(l) = &rv.inner {
531 result.extend(l.iter_opt());
532 }
533 }
534 }
535 Ok(RValue::vec(Vector::Double(result.into())))
536 }
537 Vector::Character(_) => {
538 let mut result: Vec<Option<String>> = Vec::new();
539 for v in &values {
540 if let RValue::Vector(rv) = v {
541 if let Vector::Character(l) = &rv.inner {
542 result.extend_from_slice(l);
543 }
544 }
545 }
546 Ok(RValue::vec(Vector::Character(result.into())))
547 }
548 _ => Err(RError::new(
549 RErrorKind::Type,
550 "unexpected vector type during Parquet column concatenation".to_string(),
551 )),
552 },
553 _ => Err(RError::new(
554 RErrorKind::Type,
555 "unexpected value type during Parquet column concatenation".to_string(),
556 )),
557 }
558}
559
560#[interpreter_builtin(name = "write.parquet", min_args = 2, namespace = "arrow")]
576fn builtin_write_parquet(
577 args: &[RValue],
578 named: &[(String, RValue)],
579 context: &BuiltinContext,
580) -> Result<RValue, RError> {
581 let call_args = CallArgs::new(args, named);
582
583 let df = call_args.value("x", 0).ok_or_else(|| {
584 RError::new(
585 RErrorKind::Argument,
586 "argument 'x' is missing, with no default".to_string(),
587 )
588 })?;
589
590 let list: &RList = match df {
591 RValue::List(l) => l,
592 _ => {
593 return Err(RError::new(
594 RErrorKind::Argument,
595 format!(
596 "write.parquet() requires a data.frame, got {}",
597 df.type_name()
598 ),
599 ))
600 }
601 };
602
603 let path = call_args.string("file", 1)?;
604 let resolved = context.interpreter().resolve_path(&path);
605
606 let col_names: Vec<String> = match list.get_attr("names") {
608 Some(RValue::Vector(rv)) => rv
609 .inner
610 .to_characters()
611 .into_iter()
612 .enumerate()
613 .map(|(i, s)| s.unwrap_or_else(|| format!("V{}", i + 1)))
614 .collect(),
615 _ => list
616 .values
617 .iter()
618 .enumerate()
619 .map(|(i, (name, _))| name.clone().unwrap_or_else(|| format!("V{}", i + 1)))
620 .collect(),
621 };
622
623 let mut fields: Vec<Field> = Vec::new();
625 let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
626 let mut errors: Vec<String> = Vec::new();
627
628 for (i, (_, val)) in list.values.iter().enumerate() {
629 let col_name = col_names
630 .get(i)
631 .cloned()
632 .unwrap_or_else(|| format!("V{}", i + 1));
633
634 match val {
635 RValue::Vector(rv) => {
636 let len = rv.inner.len();
637 match rvector_to_arrow_type(&rv.inner) {
638 Ok(arrow_type) => {
639 fields.push(Field::new(&col_name, arrow_type, true));
640 match rvector_to_arrow_array(&rv.inner, len) {
641 Ok(arr) => arrays.push(arr),
642 Err(e) => errors.push(format!("column '{}': {}", col_name, e)),
643 }
644 }
645 Err(e) => errors.push(format!("column '{}': {}", col_name, e)),
646 }
647 }
648 _ => {
649 errors.push(format!(
650 "column '{}': expected vector, got {}",
651 col_name,
652 val.type_name()
653 ));
654 }
655 }
656 }
657
658 if !errors.is_empty() {
659 return Err(RError::new(RErrorKind::Other, errors.join("; ")));
660 }
661
662 let schema = Arc::new(Schema::new(fields));
663 let batch = RecordBatch::try_new(schema.clone(), arrays).map_err(|e| {
664 RError::new(
665 RErrorKind::Other,
666 format!("failed to create Arrow RecordBatch: {}", e),
667 )
668 })?;
669
670 let file = File::create(&resolved).map_err(|e| {
671 RError::new(
672 RErrorKind::Other,
673 format!("cannot create Parquet file '{}': {}", resolved.display(), e),
674 )
675 })?;
676
677 let mut writer = ArrowWriter::try_new(file, schema, None).map_err(|e| {
678 RError::new(
679 RErrorKind::Other,
680 format!("failed to create Parquet writer: {}", e),
681 )
682 })?;
683
684 writer.write(&batch).map_err(|e| {
685 RError::new(
686 RErrorKind::Other,
687 format!("failed to write Parquet data: {}", e),
688 )
689 })?;
690
691 writer.close().map_err(|e| {
692 RError::new(
693 RErrorKind::Other,
694 format!("failed to finalize Parquet file: {}", e),
695 )
696 })?;
697
698 Ok(RValue::Null)
699}
700
701